#!/usr/bin/env python
""" host_demux.py Demux Data on HOST Computer
- data is stored locally, either from mgtdram/ftp or fiber-optic AFHBA404
- channelize the data
- optionally store file-per-channel
- optionally plot in pykst
- @@todo store to MDSplus as segments.
example usage::
./host_demux.py --save=DATA --nchan=32 --nblks=-1 --pchan=none acq2106_067
# load all blocks, save per channel to subdirectory DATA/data_CC.dat
./host_demux.py --nchan=32 --nblks=4 --pchan=1:8 acq2106_067
# plot channels 1:8, 4 blocks
./host_demux.py --nchan=32 --nblks=-1 --pchan=1,2 acq2106_067
# plot channels 1,2, ALL blocks
# works for 8GB data, best to LIMIT the number of channels ..
./host_demux.py --nchan=96 --src=/data/ACQ400DATA/1 \
--egu=1 --xdt=2e-6 --cycle=1:4 --pchan=1:2 \
acq2106_061
# plot AFHBA404 data from PORT1
# plot egu (V vs s), specify interval, plot 4 cycles, plot 2 channels
# uut
use of --src
--src=/data # valid for FTP upload data
--src=/data/ACQ400DATA/1 # valid for SFP data, port 1
--src=afhba.0.log # one big raw file, eg from LLC
./host_demux.py --nchan 128 --pchan 1,33,65,97 --src=/path-to/afhba.0.log acq2106_110
# plot data from LLC, 128 channels, show one "channel" from each site.
# 97 was actually the LSB of TLATCH.
.. rst-class:: hidden
usage::
host_demux.py [-h] [--nchan NCHAN] [--nblks NBLKS] [--save SAVE]
[--src SRC] [--pchan PCHAN]
uut
host demux, host side data handling
positional arguments:
uut uut
optional arguments:
-h, --help show this help message and exit
--nchan NCHAN
--nblks NBLKS
--save SAVE save channelized data to dir
--src SRC data source root
--pchan PCHAN channels to plot
--egu EGU plot egu (V vs s)
--xdt XDT 0: use interval from UUT, else specify interval
--double_up Use for ACQ480 two lines per channel mode
if --src is a file, use it directly
dir/nnnn
if --src is a directory, first check if it has cycles:
dir/NNNNN/nnnnn*
else iterate files in dir
dir/nnnnn*
TO DEMUX ON WINDOWS TO STORE CHANNELISED DATA:
python .\host_demux.py --save=1 --src="[dir]" --pchan=none acq2106_114
Make sure that the muxed data is in D:\\[dir]\[UUT name]
Where [dir] is the location of the data.
Demuxed data will be written to D:\\demuxed\[UUT name]
To plot subsampled data on windows:
python .\host_demux.py --src=Projects --nchan=8 \
--pchan 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16 --plot_mpl=1:1000:1 acq2106_120
"""
import numpy as np
import os
import re
import argparse
import subprocess
import acq400_hapi
import time
import matplotlib.pyplot as plt
has_pykst = False
if os.name != "nt":
try:
import pykst
has_pykst = True
except ImportError:
print("WARNING: failed to import pykst, no kst plots")
[docs]def channel_required(args, ch):
# print("channel_required {} {}".format(ch, 'in' if ch in args.pc_list else 'out', args.pc_list))
return args.save != None or args.double_up or ch in args.pc_list
[docs]def create_npdata(args, nblk, nchn):
channels = []
for counter in range(nchn):
if channel_required(args, counter):
channels.append(np.zeros((int(nblk)*args.NSAM), dtype=args.np_data_type))
else:
channels.append(np.zeros(16, dtype=args.np_data_type))
# print "length of data = ", len(total_data)
# print "npdata = ", npdata
return channels
[docs]def make_cycle_list(args):
if args.cycle == None:
cyclist = os.listdir(args.uutroot)
cyclist.sort()
return cyclist
else:
rng = args.cycle.split(':')
if len(rng) > 1:
cyclist = [ '{:06d}'.format(c) for c in range(int(rng[0]), int(rng[1])+1) ]
elif len(args.cycle) == 6:
cyclist = [ args.cycle ]
else:
cyclist = [ '{:06d}'.format(int(args.cycle)) ]
return cyclist
[docs]def get_file_names(args):
fnlist = list()
# matches BOTH 0.?? for AFHBA an 0000 for FTP
datapat = re.compile('[.0-9]{4}$')
has_cycles = True
for cycle in make_cycle_list(args):
if cycle == "err.log":
continue
try:
uutroot = r'{}/{}'.format(args.uutroot, cycle)
print("debug")
ls = os.listdir(uutroot)
print("uutroot = ", uutroot)
except:
uutroot = args.uutroot
ls = os.listdir(uutroot)
has_cycles = False
ls.sort()
for n, file in enumerate(ls):
if datapat.match(file):
fnlist.append(r'{}/{}'.format(uutroot, file) )
else:
print("no match {}".format(file))
if not has_cycles:
break
return fnlist
[docs]def read_data(args, NCHAN):
# global NSAM
data_files = get_file_names(args)
for n, f in enumerate(data_files):
print(f)
if NCHAN % 3 == 0:
print("collect in groups of 3 to keep alignment")
GROUP = 3
else:
GROUP = 1
if args.NSAM == 0:
args.NSAM = GROUP*os.path.getsize(data_files[0])/args.WSIZE/NCHAN
print("NSAM set {}".format(args.NSAM))
NBLK = len(data_files)
if args.nblks > 0 and NBLK > args.nblks:
NBLK = args.nblks
data_files = [ data_files[i] for i in range(0,NBLK) ]
print("NBLK {} NBLK/GROUP {} NCHAN {}".format(NBLK, NBLK/GROUP, NCHAN))
raw_channels = create_npdata(args, NBLK/GROUP, NCHAN)
blocks = 0
i0 = 0
iblock = 0
for blknum, blkfile in enumerate(data_files):
if blocks >= NBLK:
break
if blkfile != "analysis.py" and blkfile != "root":
print(blkfile, blknum)
# concatenate 3 blocks to ensure modulo 3 channel align
if iblock == 0:
data = np.fromfile(blkfile, dtype=args.np_data_type)
else:
data = np.append(data, np.fromfile(blkfile, dtype=args.np_data_type))
iblock += 1
if iblock < GROUP:
continue
i1 = i0 + args.NSAM
for ch in range(NCHAN):
if channel_required(args, ch):
raw_channels[ch][i0:i1] = (data[ch::NCHAN])
# print x
i0 = i1
blocks += 1
iblock = 0
print("length of data = ", len(raw_channels))
print("length of data[0] = ", len(raw_channels[0]))
print("length of data[1] = ", len(raw_channels[1]))
return raw_channels
[docs]def read_data_file(args, NCHAN):
# NCHAN = args.nchan
data = np.fromfile(args.src, dtype=args.np_data_type)
nsam = len(data)/NCHAN
raw_channels = create_npdata(args, nsam, NCHAN)
for ch in range(NCHAN):
if channel_required(args, ch):
raw_channels[ch] = data[ch::NCHAN]
return raw_channels
[docs]def save_data(args, raw_channels):
if os.name == "nt": # if system is windows.
path = r'{}:\\demuxed\{}'.format(args.drive_letter, args.uut[0]) # raw string literal so we can use \ in path.
if not os.path.exists(path):
os.makedirs(path)
args.saveroot = path # set args.saveroot to windows style dir.
else:
subprocess.call(["mkdir", "-p", args.saveroot])
uutname = args.uut[0]
for enum, channel in enumerate(raw_channels):
data_file = open("{}/{}_{:02d}.dat".format(args.saveroot, uutname, enum+1), "wb+")
channel.tofile(data_file, '')
print("data saved to directory: {}".format(args.saveroot))
with open("{}/format".format(args.saveroot), 'w') as fmt:
fmt.write("# dirfile format file for {}\n".format(uutname))
for enum, channel in enumerate(raw_channels):
fmt.write("{}_{:02d}.dat RAW s 1\n".format(uutname, enum+1))
return raw_channels
[docs]def decode_dac_data(x):
return np.left_shift(x, 12)
[docs]def decode_pass(x):
return x
[docs]class ColumnHandler:
[docs] def __init__(self, fmt, col):
self.fmt = fmt
self.col = col
[docs] def decode(self, x):
return x
[docs] def title(self, x):
return self.fmt.format(self.col)
[docs] def tostr(self):
return self.fmt.format(self.col)
[docs]class PacketID_ColumnHandler(ColumnHandler):
[docs] def __init__(self, col):
super().__init__("Packet ID {}", col)
[docs]class Payload_ColumnHandler(ColumnHandler):
[docs] def __init__(self, col, ch):
super().__init__("Packet Payload {} ch"+"{:02d}".format(ch)+" ID {}", col)
[docs] def decode(self, x):
return decode_dac_data(x)
[docs] def title(self, x):
return self.fmt.format(self.col,np.right_shift(x, 20))
[docs] def tostr(self):
return self.fmt.format(self.col, 'id')
columns = {}
[docs]def add_titlebox(ax, text):
ax.text(.85, .6, text,
horizontalalignment='center',
transform=ax.transAxes,
bbox=dict(facecolor='white', alpha=0.6),
fontsize=12.5)
return ax
[docs]def make_columns():
global columns
for ch in range(0, 8):
columns[ch] = ColumnHandler("ADC I {}", ch+1)
for ch in range(8,16):
columns[ch] = ColumnHandler("ADC V {}", ch%8+1)
for ch in range(16,24):
columns[ch] = ColumnHandler("ADC T1 {}", ch%8+1)
for ch in range(24,32):
columns[ch] = ColumnHandler("ADC T2 {}", ch%8+1)
for ch in range(0,8):
columns[ch+32] = ColumnHandler("SUM {}", ch%8+1)
columns[40] = PacketID_ColumnHandler(1)
for ch in range(0,10):
columns[41+ch] = Payload_ColumnHandler(1, ch+1)
columns[51] = PacketID_ColumnHandler(2)
for ch in range(0,10):
columns[52+ch] = Payload_ColumnHandler(2, ch+1)
columns[62] = ColumnHandler("FLAGS {}", 1)
columns[63] = ColumnHandler("FLAGS {}", 2)
for col in range(64,80):
columns[col] = ColumnHandler("SPAD{} {}".format(col-64, "COUNT" if col-64 == 0 else ""), col)
[docs]def show_columns():
print("{:10} {:10} {}".format("ID1", "Offset", "Description"))
for k, v in sorted(columns.items()):
print("{:8}{:02d} {:8}{:02d} {}".format(' ', int(k)+1, ' ', int(k), v.tostr()))
[docs]def plot_mpl(args, raw_channels):
global columns
print("Plotting with MatPlotLib. Subrate = {}".format(args.step))
#real_len = len(raw_channels[0]) # this is the real length of the channel data
num_of_ch = len(args.pc_list)
f, plots = plt.subplots(num_of_ch, 1)
for num, sp in enumerate(args.pc_list):
plots[num].plot(columns[sp].decode(raw_channels[sp][args.start:args.stop:args.step]))
add_titlebox(plots[num], "col {:2d} {}".format(sp, columns[sp].title(raw_channels[sp][args.start])))
plt.show()
return None
[docs]def plot_data_kst(args, raw_channels):
client = pykst.Client("NumpyVector")
llen = len(raw_channels[0])
if args.egu == 1:
if args.xdt == 0:
print("WARNING ##### NO CLOCK RATE PROVIDED. TIME SCALE measured by system.")
raw_input("Please press enter if you want to continue with innacurate time base.")
time1 = float(args.the_uut.s0.SIG_CLK_S1_FREQ.split(" ")[-1])
xdata = np.linspace(0, llen/time1, num=llen)
else:
xdata = np.linspace(0, llen*args.xdt, num=llen)
xname= 'time'
yu = 'V'
xu = 's'
else:
xname = 'idx'
yu = 'code'
xu = 'sample'
xdata = np.arange(0, llen).astype(np.float64)
V1 = client.new_editable_vector(xdata, name=xname)
for ch in [ int(c) for c in args.pc_list]:
channel = raw_channels[ch]
ch1 = ch+1
yu1 = yu
if args.egu:
try:
# chan2volts ch index from 1:
channel = args.the_uut.chan2volts(ch1, channel)
except IndexError:
yu1 = 'code'
print("ERROR: no calibration for CH{:02d}".format(ch1))
# label 1.. (human)
V2 = client.new_editable_vector(channel.astype(np.float64), name="{}:CH{:02d}".format(re.sub(r"_", r"-", args.uut[0]), ch1))
c1 = client.new_curve(V1, V2)
p1 = client.new_plot()
p1.set_left_label(yu1)
p1.set_bottom_label(xu)
p1.add(c1)
[docs]def plot_data(args, raw_channels):
if args.plot_mpl:
# if arg set then plot with matplotlib instead.
args.start = 1
args.stop = None
args.step = 1
try:
args.start, args.stop, args.step = [None if len(x)==0 else int(x) for x in args.plot_mpl.split(":")]
except ValueError:
pass
plot_mpl(args, raw_channels)
return None
if has_pykst:
plot_data_kst(args, raw_channels)
else:
print("SORRY, kst automation via pykst not available. Please install pykst, or use kst DirFile importer")
[docs]def process_data(args):
NCHAN = args.nchan
if args.double_up:
NCHAN = args.nchan * 2
print("nchan = ", args.nchan)
raw_data = read_data(args, NCHAN) if not os.path.isfile(args.src) else read_data_file(args, NCHAN)
if args.double_up:
raw_data = double_up(args, raw_data)
if args.save != None:
save_data(args, raw_data)
if len(args.pc_list) > 0:
plot_data(args, raw_data)
[docs]def make_pc_list(args):
# ch in 1.. (human)
if args.pchan == 'none':
return list()
if args.pchan == 'all':
return list(range(0,args.nchan))
elif len(args.pchan.split(':')) > 1:
lr = args.pchan.split(':')
x1 = 1 if lr[0] == '' else int(lr[0])
x2 = args.nchan+1 if lr[1] == '' else int(lr[1])+1
return list(range(x1, x2))
else:
return args.pchan.split(',')
[docs]def run_main(args):
args.WSIZE = 2
args.NSAM = 0
if args.data_type == 16:
args.np_data_type = np.int16
args.WSIZE = 2
else:
args.np_data_type = np.int32
args.WSIZE = 4
if os.name == "nt": # do this if windows system.
args.uutroot = r"{}:\{}\{}".format(args.drive_letter, args.src, args.uut[0])
elif os.path.isdir(args.src):
args.uutroot = r"{}/{}".format(args.src, args.uut[0])
print("uutroot {}".format(args.uutroot))
elif os.path.isfile(args.src):
args.uutroot = "{}".format(os.path.dirname(args.src))
if args.save != None:
if args.save.startswith("/"):
args.saveroot = args.save
else:
if os.name != "nt":
args.saveroot = r"{}/{}".format(args.uutroot, args.save)
# ch 0.. (comp)
args.pc_list = [ int(i)-1 for i in make_pc_list(args)]
print("args.pc_list {}".format(args.pc_list))
if args.egu:
print("get egu from UUT {}".format(args.egu))
args.the_uut = acq400_hapi.Acq2106(args.egu)
make_columns()
if args.show_columns:
show_columns()
else:
process_data(args)
[docs]def get_parser():
parser = argparse.ArgumentParser(description='CPSC2 host demux and plot')
parser.add_argument('--nchan', type=int, default=80)
parser.add_argument('--nblks', type=int, default=-1)
parser.add_argument('--save', type=str, default=None, help='save channelized data to dir')
parser.add_argument('--src', type=str, default='/data', help='data source root')
parser.add_argument('--cycle', type=str, default=None, help='cycle from rtm-t-stream-disk')
parser.add_argument('--pchan', type=str, default=':', help='channels to plot')
parser.add_argument('--egu', type=str, default=0, help='plot egu (V vs s) .. --egu UUT')
parser.add_argument('--xdt', type=float, default=0, help='0: use interval from UUT, else specify interval ')
parser.add_argument('--data_type', type=int, default=16, help='Use int16 or int32 for data demux.')
parser.add_argument('--double_up', type=int, default=0, help='Use for ACQ480 two lines per channel mode')
parser.add_argument('--plot_mpl', type=str, default="1:1000:1", help='Use MatPlotLib to plot subrate data args: start:stop[:step]')
parser.add_argument('--drive_letter', type=str, default="D", help="Which drive letter to use when on windows.")
parser.add_argument('--show_columns', type=int, default=0)
return parser
if __name__ == '__main__':
run_main(get_parser().parse_args())