#!/usr/bin/env python
""" acq2106_hts High Throughput Streaming
- data on local SFP/AFHBA
- control on Ethernet
- replaces AFHBA404/scripts/hts-test-harness-*
example usage::
# act on acq2106_061, run for 3600s
./acq2106_hts.py --trg=notouch --secs=3600 acq2106_061
# capture 2000 buffers, to one cycle of files on ramdisk. (data is overwritten, but it's unlikely your computer will run out of memory)
# "RECYCLE mode"
./user_apps/acq2106/acq2106_hts.py --nbuffers=2000 --datahandler='./scripts/run-stream-ramdisk {} {}' acq2106_364
# capture 2000 buffers, to NBUFFERS cycles of files, storing all 2000 buffers on ramdisk.
# WARNING: we assume that your HOSTPC has enough memory to do this! 2000 buffers of 1MB => 2GB, that's probably OK.
# But, 20k buffers would need a computer with serious memory (32GB) and 200kbuffers is probably infeasible, instead streaming to a fast disk system is recommended.
./user_apps/acq2106/acq2106_hts.py --nbuffers=2000 --datahandler='./scripts/run-stream-ramdisk {} {} 0' acq2106_364
acq2106_hts.py will quit on the first of either elapsed_seconds > secs or buffers == nbuffers
Recommendation: --secs is really a timeout, use --nbuffers for exact data length
.. rst-class:: hidden
usage::
acq2106_hts.py [-h] [--pre PRE] [--clk CLK] [--trg TRG] [--sim SIM]
[--trace TRACE] [--nowait NOWAIT] [--secs SECS]
[--spad SPAD] [--commsA COMMSA] [--commsB COMMSB]
[--lport LPORT] [--hexdump HEXDUMP]
[--decimate DECIMATE] [--datahandler DATAHANDLER]
[--nbuffers NBUFFERS]
uut [uut ...]
configure acq2106 High Throughput Stream
positional arguments:
uut uut
optional arguments:
-h, --help show this help message and exit
--pre PRE pre-trigger samples
--clk CLK int|ext|zclk|xclk,fpclk,SR,[FIN]
--trg TRG int|ext,rising|falling
--sim SIM s1[,s2,s3..] list of sites to run in simulate mode
--trace TRACE 1 : enable command tracing
--nowait NOWAIT start the shot but do not wait for completion
--secs SECS capture seconds [default:0 inifinity]
--spad SPAD scratchpad, eg 1,16,0
--commsA COMMSA custom list of sites for commsA
--commsB COMMSB custom list of sites for commsB
--lport LPORT local port on ahfba
--hexdump HEXDUMP generate hexdump format string
--decimate DECIMATE decimate arm data path
--datahandler DATAHANDLER program to stream the data
--nbuffers NBUFFERS set capture length in buffers
--secs : maximum run time for acq2106_hts, only starts counting once data is flowing
--nbuffers : data handler will stream max this number of buffers (1MB or 4MB)
"""
import sys
import acq400_hapi
import argparse
import time
import os
from acq400_hapi import intSI as intSI
[docs]def read_knob(knob):
with open(knob, 'r') as f:
return f.read()
[docs]def config_shot(uut, args):
acq400_hapi.Acq400UI.exec_args(uut, args)
uut.s0.run0 = "{} {}".format(uut.s0.sites, args.spad)
if args.decimate != None:
uut.s0.decimate = args.decimate
[docs]def hexdump_string(uut, chan, sites, spad):
nspad = 0 if spad == None else int(spad.split(',')[1])
print("hexdump_string {} {} {}".format(chan, sites, nspad))
dumpstr = ("hexdump -ve '\"%10_ad,\" ")
for svc in ( uut.svc['s{}'.format(s)] for s in sites.split(',')):
d32 = svc.data32 == '1'
fmt = '" " {}/{} "%0{}x," '.format(svc.NCHAN, 4 if d32 else 2, 8 if d32 else 4)
dumpstr += fmt
if nspad:
fmt = '{}/4 "%08x," '.format(nspad)
dumpstr += fmt
dumpstr += '"\\n"\''
print(dumpstr)
with open("hexdump{}".format(chan), "w") as fp:
fp.write("{} $*\n".format(dumpstr))
os.chmod("hexdump{}".format(chan), 0o777)
[docs]def init_comms(uut, args):
if args.spad != None:
uut.s0.spad = args.spad
# use spare spad elements as data markers
for sp in ('1', '2', '3', '4' , '5', '6', '7'):
uut.s0.sr("spad{}={}".format(sp, sp*8))
if args.commsA != "none":
uut.cA.spad = 0 if args.spad == None else 1
csites = uut.s0.sites if args.commsA == 'all' else args.commsA
uut.cA.aggregator = "sites={} on".format(csites)
if args.hexdump:
hexdump_string(uut, "A", csites, args.spad)
if args.commsB != "none":
uut.cB.spad = 0 if args.spad == None else 1
csites = uut.s0.sites if args.commsB == 'all' else args.commsB
uut.cB.aggregator = "sites={} on".format(csites)
if args.hexdump:
hexdump_string(uut, "B", csites, args.spad)
[docs]def init_work(uut, args):
print("init_work")
[docs]def get_data_pid(args):
return int(read_knob("/dev/rtm-t.{}.ctrl/streamer_pid".format(args.lport)))
[docs]def start_shot(uut, args):
uut.s0.streamtonowhered = "start"
[docs]def stop_shot(uut, args):
print("stop_shot")
uut.s0.streamtonowhered = "stop"
time.sleep(1)
pid = get_data_pid(args)
if pid != 0:
os.system('sudo kill -9 {}'.format(pid))
[docs]def get_state(args):
job = read_knob("/proc/driver/afhba/afhba.{}/Job".format(args.lport))
env = {}
for pp in job.split():
k, v = pp.split('=')
env[k] = v
args.job_state = env
[docs]def wait_completion(uut, args):
ts = 0
STATFMT = "Rate %d NBUFS %d Time ... %8d / %8d %s"
try:
while ts < int(args.secs):
get_state(args)
buf_rate = int(args.job_state['rx_rate'])
rx = int(args.job_state['rx'])
if args.datahandler != None:
pid = get_data_pid(args)
if pid == 0:
print("\ndatahandler has dropped out at NBUFS {}/{} {}".format(
rx, args.nbuffers, "COMPLETE" if rx>=args.nbuffers else "ERROR" ))
break
sys.stdout.write( (STATFMT+"\r") % (buf_rate, rx, ts, int(args.secs), acq400_hapi.Propellor.spin()))
sys.stdout.flush()
time.sleep(1)
if buf_rate > 0:
ts += 1
else:
if ts > 0:
sys.stdout.write(("\n" + STATFMT + "\n") % (buf_rate, rx, ts, int(args.secs), "STOPPED?"))
break
except KeyboardInterrupt:
pass
stop_shot(uut, args)
[docs]def run_shot(args):
uut = acq400_hapi.factory(args.uut[0])
if args.datahandler != None:
cmd = args.datahandler.format(args.lport, args.nbuffers)
print("datahandler command {}".format(cmd))
os.system(cmd)
pollcat = 0
pid = get_data_pid(args)
while pid == 0:
time.sleep(1)
pollcat += 1
if pollcat > 2:
print("polling for datahandler active")
pid = get_data_pid(args)
print("datahandler pid {}".format(pid))
config_shot(uut, args)
init_comms(uut, args)
init_work(uut, args)
start_shot(uut, args)
if args.nowait == 0:
wait_completion(uut, args)
[docs]def get_parser():
parser = argparse.ArgumentParser(description='High Throughput Stream using AFHBA')
acq400_hapi.Acq400UI.add_args(parser, transient=False)
parser.add_argument('--nowait', default=0, help='start the shot but do not wait for completion')
parser.add_argument('--secs', default=999999, help="capture seconds [default:0 inifinity]")
parser.add_argument('--spad', default=None, help="scratchpad, eg 1,16,0")
parser.add_argument('--commsA', default="all", help='custom list of sites for commsA')
parser.add_argument('--commsB', default="none", help='custom list of sites for commsB')
parser.add_argument('--lport', default=0, help='local port on ahfba')
parser.add_argument('--hexdump', default=0, help="generate hexdump format string")
parser.add_argument('--decimate', default=None, help='decimate arm data path')
parser.add_argument('--datahandler', default=None, help='program to stream the data')
parser.add_argument('--nbuffers', type=int, default=9999999999, help='set capture length in buffers')
parser.add_argument('uut', nargs='+', help="uut ")
return parser
# execution starts here
if __name__ == '__main__':
run_shot(get_parser().parse_args())