#!/usr/bin/env python3
""" host_demux.py Demux Data on HOST Computer
- data is stored locally, either from mgtdram/ftp or fiber-optic AFHBA404
- channelize the data
- optionally store file-per-channel
- optionally plot in pykst if available
- @@todo store to MDSplus as segments.
example usage::
./host_demux.py --save=DATA --nchan=32 --nblks=-1 --pchan=none acq2106_067
# load all blocks, save per channel to subdirectory DATA/data_CC.dat
./host_demux.py --nchan=32 --nblks=4 --pchan=1:8 acq2106_067
# plot channels 1:8, 4 blocks
./host_demux.py --nchan=32 --nblks=-1 --pchan=1,2 acq2106_067
# plot channels 1,2, ALL blocks
# works for 8GB data, best to LIMIT the number of channels ..
./host_demux.py --nchan=96 --src=/data/ACQ400DATA/1 \
--egu=1 --xdt=2e-6 --cycle=1:4 --pchan=1:2 \
acq2106_061
# plot AFHBA404 data from PORT1
# plot egu (V vs s), specify interval, plot 4 cycles, plot 2 channels
# uut
use of --src
--src=/data # valid for FTP upload data
--src=/data/ACQ400DATA/1 # valid for SFP data, port 1
--src=afhba.0.log # one big raw file, eg from LLC
./host_demux.py --nchan 128 --pchan 1,33,65,97 --src=/path-to/afhba.0.log acq2106_110
# plot data from LLC, 128 channels, show one "channel" from each site.
# 97 was actually the LSB of TLATCH.
.. rst-class:: hidden
usage::
host_demux.py [-h] [--nchan NCHAN] [--nblks NBLKS] [--save SAVE]
[--src SRC] [--pchan PCHAN]
uut
host demux, host side data handling
positional arguments:
uut uut
optional arguments:
-h, --help show this help message and exit
--nchan NCHAN
--nblks NBLKS
--save SAVE save channelized data to dir
--src SRC data source root
--pchan PCHAN channels to plot
--egu EGU plot egu (V vs s)
--xdt XDT 0: use interval from UUT, else specify interval
--double_up Use for ACQ480 two lines per channel mode
if --src is a file, use it directly
dir/nnnn
if --src is a directory, first check if it has cycles:
dir/NNNNN/nnnnn*
else iterate files in dir
dir/nnnnn*
TO DEMUX ON WINDOWS TO STORE CHANNELISED DATA:
python .\host_demux.py --save=1 --src="[dir]" --pchan=none acq2106_114
Make sure that the muxed data is in D:\\[dir]\[UUT name]
Where [dir] is the location of the data.
Demuxed data will be written to D:\\demuxed\[UUT name]
To plot subsampled data on windows:
python .\host_demux.py --src=Projects --nchan=8 \
--pchan 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16 --plot_mpl=1:1000:1 acq2106_120
"""
import numpy as np
import os
import re
import argparse
import subprocess
import acq400_hapi
import acq400_hapi.channel_handlers as CH
import time
import matplotlib
import matplotlib.pyplot as plt
import logging
# if default plot fails (eg r740), try this:
#matplotlib.use('TkAgg')
if os.getenv('HAPI_MATPLOTLIB') is not None:
matplotlib.use(os.getenv('HAPI_MATPLOTLIB'))
has_pykst = False
if os.name != "nt":
try:
import pykst
has_pykst = True
print("INFO: pykst selected as default plot")
except ImportError:
pass
logging.getLogger('matplotlib.font_manager').disabled = True
[docs]def channel_required(args, ch):
# print("channel_required {} {}".format(ch, 'in' if ch in args.pc_list else 'out', args.pc_list))
return args.save != None or args.double_up or ch in list(pc.ic for pc in args.pc_list)
[docs]def create_npdata(args, nblk, nchn):
channels = []
for counter in range(nchn):
if channel_required(args, counter):
channels.append(np.zeros((int(nblk)*args.NSAM), dtype=args.np_data_type))
else:
channels.append(np.zeros(16, dtype=args.np_data_type))
# print "length of data = ", len(total_data)
# print "npdata = ", npdata
return channels
[docs]def make_cycle_list(args):
if args.cycle == None:
cyclist = os.listdir(args.uutroot)
cyclist.sort()
return cyclist
else:
rng = args.cycle.split(':')
if len(rng) > 1:
cyclist = [ '{:06d}'.format(c) for c in range(int(rng[0]), int(rng[1])+1) ]
elif len(args.cycle) == 6:
cyclist = [ args.cycle ]
else:
cyclist = [ '{:06d}'.format(int(args.cycle)) ]
return cyclist
[docs]def get_file_names(args):
fnlist = list()
# matches BOTH 0.?? for AFHBA an 0000 for FTP
datapat = re.compile('[.0-9]{4,5}$')
datapat_dat = re.compile('[.0-9]{4,5}.dat$')
has_cycles = True
for cycle in make_cycle_list(args):
if cycle == "err.log":
continue
try:
uutroot = r'{}/{}'.format(args.uutroot, cycle)
print("debug")
ls = os.listdir(uutroot)
print("uutroot = ", uutroot)
except:
uutroot = args.uutroot
ls = os.listdir(uutroot)
has_cycles = False
ls.sort()
for n, file in enumerate(ls):
if datapat.match(file) or datapat_dat.match(file):
fnlist.append(r'{}/{}'.format(uutroot, file) )
else:
print("no match {}".format(file))
if not has_cycles:
break
return fnlist
[docs]def read_data(args, NCHAN):
# global NSAM
data_files = get_file_names(args)
for n, f in enumerate(data_files):
print(f)
if NCHAN % 3 == 0:
print("collect in groups of 3 to keep alignment")
GROUP = 3
else:
GROUP = 1
if args.NSAM == 0:
args.NSAM = GROUP*os.path.getsize(data_files[0])//args.WSIZE//NCHAN
print("NSAM set {}".format(args.NSAM))
NBLK = len(data_files)
if args.nblks > 0 and NBLK > args.nblks:
NBLK = args.nblks
data_files = [ data_files[i] for i in range(0,NBLK) ]
print("NBLK {} NBLK/GROUP {} NCHAN {}".format(NBLK, NBLK/GROUP, NCHAN))
raw_channels = create_npdata(args, NBLK/GROUP, NCHAN)
blocks = 0
i0 = 0
iblock = 0
for blknum, blkfile in enumerate(data_files):
if blocks >= NBLK:
break
if blkfile != "analysis.py" and blkfile != "root":
if blknum == 0:
args.src = blkfile
# print("iblock={} blkfile={}, blknum={}".format(iblock, blkfile, blknum))
# concatenate 3 blocks to ensure modulo 3 channel align
if iblock == 0:
data = np.fromfile(blkfile, dtype=args.np_data_type)
else:
data = np.append(data, np.fromfile(blkfile, dtype=args.np_data_type))
iblock += 1
if iblock < GROUP:
continue
i1 = i0 + args.NSAM
for ch in range(NCHAN):
if channel_required(args, ch):
raw_channels[ch][i0:i1] = (data[ch::NCHAN])
# print x
i0 = i1
blocks += 1
iblock = 0
args.src = "{}..{}".format(args.src, os.path.basename(blkfile))
print("length of data = ", len(raw_channels))
print("length of data[0] = ", len(raw_channels[0]))
print("length of data[1] = ", len(raw_channels[1]))
return raw_channels
[docs]def read_data_file(args, NCHAN):
# NCHAN = args.nchan
data = np.fromfile(args.src, dtype=args.np_data_type)
nsam = len(data)/NCHAN
raw_channels = create_npdata(args, nsam, NCHAN)
for ch in range(NCHAN):
if channel_required(args, ch):
raw_channels[ch] = data[ch::NCHAN]
return raw_channels
[docs]def save_dirfile(args, raw_channels):
uutname = args.uut
for ch0, channel in enumerate(raw_channels):
ch1 = ch0+1
if args.schan:
if ch1 not in args.schan:
continue
data_file = open("{}/{}_{:02d}.dat".format(args.saveroot, uutname, ch1), "wb+")
channel.tofile(data_file, '')
print("data saved to directory: {}".format(args.saveroot))
with open("{}/format".format(args.saveroot), 'w') as fmt:
fmt.write("# dirfile format file for {}\n".format(uutname))
for enum, channel in enumerate(raw_channels):
fmt.write("{}_{:02d}.dat RAW s 1\n".format(uutname, enum+1))
[docs]def save_numpy(args, raw_channels):
npfile = "{}/{}.npy".format(args.saveroot, args.uut)
cooked = cook_data(args, raw_channels)
print("save_numpy {} {}".format(npfile, cooked))
np.save(npfile, cooked)
[docs]def save_data(args, raw_channels):
if os.name == "nt": # if system is windows.
path = r'{}:\\demuxed\{}'.format(args.drive_letter, args.uut) # raw string literal so we can use \ in path.
if not os.path.exists(path):
os.makedirs(path)
args.saveroot = path # set args.saveroot to windows style dir.
else:
subprocess.call(["mkdir", "-p", args.saveroot])
if args.save == 'npy':
save_numpy(args, raw_channels)
else:
save_dirfile(args, raw_channels)
return raw_channels
[docs]def plot_mpl(args, raw_channels):
#real_len = len(raw_channels[0]) # this is the real length of the channel data
nplot = len(args.pc_list)
total_plots = int(nplot / args.traces_per_plot)
if nplot % args.traces_per_plot:
total_plots += 1
if total_plots > 1:
fig, plots = plt.subplots(total_plots, 1, sharex=True)
else:
fig, p1 = plt.subplots()
plots = (p1,)
fig.suptitle("{} src {}".format(args.uut, args.src))
xx = None
for pln, ch_handler in enumerate(args.pc_list):
pln = int(pln / args.traces_per_plot)
yy, meta, step = ch_handler(raw_channels, args.pses)
if xx is None:
xx = np.array([ (x+args.pses[0])*args.pses[2] for x in range(0, len(yy))])
meta = meta.split(' ')
plots[pln].set_ylabel(meta[0])
plots[pln].set_title(plots[pln].get_title() + f' {meta[-1]}')
if step:
plots[pln].step(xx, yy, linewidth=0.75)
else:
plots[pln].plot(xx, yy, linewidth=0.75)
plots[pln].grid("True", linewidth=0.2)
plots[pln].ticklabel_format(style='plain') # to prevent scientific notation
plots[pln].set_xlabel("Samples")
plt.subplots_adjust(hspace=(total_plots - 1 ) * 0.15)
plt.show()
return None
[docs]def process_cmdline_cfg(args):
''' return list of channel_handler objects built from command line args'''
print_ic = [ int(i)-1 for i in make_pc_list(args)]
pl = ()
if args.egu == 1:
pl = list(CH.ch_egu(ic, args) for ic in print_ic)
else:
pl = list(CH.ch_raw(ic) for ic in print_ic)
if args.tai_vernier:
pl = pl.extend(CH.ch_tai_vernier(args.tai_vernier-1))
return pl
[docs]def plot_data_kst(args, raw_channels):
client = pykst.Client("NumpyVector")
llen = len(raw_channels[0])
if args.egu == 1:
if args.xdt == 0:
print("WARNING ##### NO CLOCK RATE PROVIDED. TIME SCALE measured by system.")
raw_input("Please press enter if you want to continue with innacurate time base.")
time1 = float(args.the_uut.s0.SIG_CLK_S1_FREQ.split(" ")[-1])
xdata = np.linspace(0, llen/time1, num=llen)
else:
xdata = np.linspace(0, llen*args.xdt, num=llen)
xname= 'time'
yu = 'V'
xu = 's'
else:
xname = 'idx'
yu = 'code'
xu = 'sample'
xdata = np.arange(0, llen).astype(np.float64)
V1 = client.new_editable_vector(xdata, name=xname)
for ch in [ int(c) for c in args.pc_list]:
channel = raw_channels[ch]
ch1 = ch+1
yu1 = yu
if args.egu:
try:
# chan2volts ch index from 1:
channel = args.the_uut.chan2volts(ch1, channel)
except IndexError:
yu1 = 'code'
print("ERROR: no calibration for CH{:02d}".format(ch1))
# label 1.. (human)
V2 = client.new_editable_vector(channel.astype(np.float64), name="{}:CH{:02d}".format(re.sub(r"_", r"-", args.uut), ch1))
c1 = client.new_curve(V1, V2)
p1 = client.new_plot()
p1.set_left_label(yu1)
p1.set_bottom_label(xu)
p1.add(c1)
[docs]def plot_data(args, raw_channels):
if has_pykst:
plot_data_kst(args, raw_channels)
else:
plot_mpl(args, raw_channels)
return None
# double_up : data is presented as [ ch010, ch011, ch020, ch021 ] .. so zip them together..
[docs]def double_up(args, d1):
d2 = []
for ch in range(args.nchan):
ch2 = ch * 2
ll = d1[ch2]
#print ll.shape
rr = d1[ch2+1]
#print rr.shape
mm = np.column_stack((ll, rr))
#print mm.shape
mm1 = np.reshape(mm, (len(ll)*2, 1))
#print mm1.shape
d2.append(mm1)
return d2
[docs]def stack_480_shuffle(args, raw_data):
r2 = []
for i1 in args.stack_480_cmap:
r2.append(raw_data[i1])
return r2
[docs]def cook_data(args, raw_channels):
clidata = {}
for num, ch_handler in enumerate(args.pc_list):
yy, ylabel, step = ch_handler(raw_channels, args.pses)
clidata[ylabel] = yy
return clidata
[docs]def process_data(args):
NCHAN = args.nchan
if args.double_up:
NCHAN = args.nchan * 2
print("nchan = ", args.nchan)
raw_data = read_data(args, NCHAN) if not os.path.isfile(args.src) else read_data_file(args, NCHAN)
if args.double_up:
raw_data = double_up(args, raw_data)
if args.stack_480:
raw_data = stack_480_shuffle(args, raw_data)
if args.callback:
args.callback(cook_data(args, raw_data))
if args.save != None:
save_data(args, raw_data)
if args.plot and len(args.pc_list) > 0:
plot_data(args, raw_data)
[docs]def make_pc_list(args):
# ch in 1.. (human)
if args.pchan == 'none':
return list()
if args.pchan == 'all':
return list(range(1,args.nchan+1))
elif len(args.pchan.split(':')) > 1:
lr = args.pchan.split(':')
x1 = 1 if lr[0] == '' else int(lr[0])
x2 = args.nchan+1 if lr[1] == '' else int(lr[1])+1
return list(range(x1, x2))
else:
return args.pchan.split(',')
[docs]def calc_stack_480(args):
args.double_up = 0
if not args.stack_480:
return
if args.stack_480 == '2x4':
args.stack_480_cmap = ( 0, 1, 4, 5, 2, 3, 6, 7 )
args.double_up = 1
args.nchan = 8
elif args.stack_480 == '2x8':
args.nchan = 16
args.stack_480_cmap = (
0, 1, 2, 3, 8, 9, 10, 11, 4, 5, 6, 7, 12, 13, 14, 15 )
elif args.stack_480 == '4x8':
args.nchan = 32
args.stack_480_cmap = (
0, 1, 2, 3, 8, 9, 10, 11, 4, 5, 6, 7, 12, 13, 14, 15,
16, 17, 18, 19, 24, 25, 26, 27, 20, 21, 22, 23, 28, 29, 30, 31 )
elif args.stack_480 == '6x8':
args.nchan = 48
args.stack_480_cmap = (
0, 1, 2, 3, 8, 9, 10, 11, 4, 5, 6, 7, 12, 13, 14, 15,
16, 17, 18, 19, 24, 25, 26, 27, 20, 21, 22, 23, 28, 29, 30, 31,
32, 33, 34, 35, 40, 41, 42, 43, 36, 37, 38, 39, 44, 45, 46, 47 )
else:
print("bad option {}".format(args.stack_480))
quit()
print("args.stack_480_cmap: {}".format(args.stack_480_cmap))
[docs]def getRootFromAfhba404(args):
for conn in acq400_hapi.afhba404.get_connections().values():
if conn.uut == args.uut:
return f'/mnt/afhba.{conn.dev}/{args.uut}'
print(f'getRootFromAhfba404 ERROR no connection for {args.uut}')
exit(1)
[docs]def run_main(args):
args.uut = args.uuts[0]
if args.double_up == 0:
calc_stack_480(args)
args.WSIZE = 2
args.NSAM = 0
if args.data_type == None or args.nchan == None or args.egu == 1:
try:
args.the_uut = acq400_hapi.factory(args.uut)
except:
print(f'ERROR: unable to instantiate UUT {args.uut}')
print("maybe it's not there and we wouldn't need if --data_type, --nchan are defined and egu=0")
exit(1)
if args.data_type == None:
args.data_type = 32 if int(args.the_uut.s0.data32) else 16
if args.nchan == None:
args.nchan = int(args.the_uut.s0.NCHAN)
if args.data_type == 16:
args.np_data_type = np.int16
args.WSIZE = 2
elif args.data_type == 8:
args.np_data_type = np.int8
args.WSIZE = 1
else:
args.np_data_type = np.int32
args.WSIZE = 4
print("data_type {} np {}".format(args.data_type, args.np_data_type))
if args.src == "@afhba404":
args.uutroot = getRootFromAfhba404(args)
print(f'found uutroot for {args.uut} on {args.uutroot}')
elif os.name == "nt": # do this if windows system.
args.src = args.src.replace("/","\\")
if os.path.isdir(args.src):
args.uutroot = args.src
elif os.path.isfile(args.src):
args.uutroot = os.path.dirname(args.src)
else:
print(f'Error: unable to locate data, src {args.src}')
exit(1)
elif os.path.isdir(args.src):
args.uutroot = os.path.join(args.src, args.uut)
if not os.path.isdir(args.uutroot):
args.uutroot = args.src
elif os.path.isfile(args.src):
args.uutroot = os.path.dirname(args.src)
else:
print(f'Error: unable to locate data, src {args.src}')
exit(1)
if args.save != None:
path_prefixes = ('/', './', '../')
if args.save.startswith(path_prefixes):
args.saveroot = args.save
else:
if os.name != "nt":
args.saveroot = r"{}/{}".format(args.uutroot, args.save)
args.pses = [ int(x) for x in args.pses.split(':') ]
if args.pcfg:
args.pc_list = CH.process_pcfg(args)
else:
args.pc_list = process_cmdline_cfg(args)
process_data(args)
[docs]def list_of_ints(string):
return list(map(int, string.split(',')))
[docs]def get_parser(parser=None):
if not parser:
is_client = True
parser = argparse.ArgumentParser(description='Host side data demuxing and plotting')
else:
is_client = False
parser.add_argument('--nchan', type=int, default=None)
parser.add_argument('--nblks', type=int, default=-1)
parser.add_argument('--save', type=str, default=None, help='save channelized data to dir')
parser.add_argument('--src', type=str, default='/data', help='data source root')
parser.add_argument('--cycle', type=str, default=None, help='cycle from rtm-t-stream-disk')
parser.add_argument('--pchan', type=str, default=':', help='channels to plot')
parser.add_argument('--pses', type=str, default='0:-1:1', help="plot start end stride, default: 0:-1:1")
parser.add_argument('--tai_vernier', type=int, default=None, help='decode this channel as tai_vernier')
parser.add_argument('--egu', type=int, default=0, help='plot egu (V vs s)')
parser.add_argument('--xdt', type=float, default=0, help='0: use interval from UUT, else specify interval ')
parser.add_argument('--data_type', type=int, default=None, help='Use int16 or int32 for data demux.')
parser.add_argument('--double_up', type=int, default=0, help='Use for ACQ480 two lines per channel mode')
parser.add_argument('--plot_mpl', type=int, default=0, help='Use MatPlotLib to plot subrate data. (legacy option)')
parser.add_argument('--plot', type=int, default=1, help="plot data when set")
parser.add_argument('--stack_480', type=str, default=None, help='Stack : 2x4, 2x8, 4x8, 6x8')
parser.add_argument('--drive_letter', type=str, default="D", help="Which drive letter to use when on windows.")
parser.add_argument('--pcfg', default=None, type=str, help="plot configuration file, overrides pchan")
parser.add_argument('--callback', default=None, help="callback for external automation")
parser.add_argument('--traces_per_plot', default=1, type=int, help="traces_per_plot")
parser.add_argument('--schan', default=None, type=list_of_ints, help="channels to save ie 1,49,50")
if is_client:
parser.add_argument('uuts', nargs='+',help='uut - for auto configuration data_type, nchan, egu or just a label')
return parser
if __name__ == '__main__':
run_main(get_parser().parse_args())