#!/usr/bin/env python
"""quest upload test
where UUT1 is the ip-address or host name of first uut
example test client runs captures in a loop on one or more uuts
pre-requisite: UUT's are configured and ready to make a transient
capture
eg clk is running. soft trg enabled
eg transient length set.
runs one capture, uploads the data and plots with matplotlib
tested with 2 x 8 channels UUT's (ACQ1014)
matplot will get very congested with more channels.
this is really meant as a demonstration of capture, load to numpy,
it's not really intended as a scope UI.
example::
./quest_upload.py --POST=60000 --CLKDIV=100 --capture=1 --plot_data=0 \
--save_data=magdata_0001 --channels=1,2,3,4,32 192.168.1.210
.. rst-class:: hidden
usage: acq400_upload.py [-h] [--soft_trigger SOFT_TRIGGER]
[--trace_upload TRACE_UPLOAD] [--save_data SAVE_DATA]
[--plot_data PLOT_DATA] [--capture CAPTURE]
[--remote_trigger REMOTE_TRIGGER]
[--channels CHANNELS]
uuts [uuts ...]
acq400 upload
positional arguments:
uuts uut[s]
optional arguments:
-h, --help show this help message and exit
--soft_trigger SOFT_TRIGGER help use soft trigger on capture
--trace_upload TRACE_UPLOAD 1: verbose upload
--save_data SAVE_DATA store data to specified directory
--plot_data PLOT_DATA 1: plot data
--capture CAPTURE 1: capture data, 0: wait for someone else to capture, -1: just upload
--remote_trigger REMOTE_TRIGGER your function to fire trigger
--channels CHANNELS comma separated channel list
--CLKDIV set clock divider (10=1M)
--POST set number POST trigger samples
"""
import sys
import acq400_hapi
import numpy as np
try:
import matplotlib.pyplot as plt
plot_ok = 1
except RuntimeError as e:
print("Sorry, plotting not available {}".format(e))
plot_ok = 0
import os
import argparse
import re
import time
from subprocess import call
[docs]class ActionScript:
[docs] def __init__(self, script_and_args):
self.sas = script_and_args.split()
print("ActionScript creates {}".format(self.sas))
def __call__(self):
print("ActionScript: call()")
call(self.sas)
[docs]def upload(args):
uuts = [acq400_hapi.Acq400(u) for u in args.uuts]
# assume one uut
uut = uuts[0]
acq400_hapi.cleanup.init()
shot_controller = acq400_hapi.ShotController(uuts)
for u in uuts:
if args.POST:
u.s0.transient = "PRE=%d POST=%d SOFT_TRIGGER=%d" % (0, args.POST, 1)
if args.CLKDIV:
u.s1.CLKDIV = args.CLKDIV
# make it EXT TRIGGER every time
u.s1.TRG_DX = 'd0'
if args.remote_trigger:
trigger_action = ActionScript(args.remote_trigger)
st = None
else:
trigger_action = None
st = SOFT_TRIGGER
try:
if args.capture > 0:
shot_controller.run_shot(soft_trigger = st, remote_trigger = trigger_action)
elif args.capture == 0:
state = '99'
while state != '0':
state = uuts[0].s0.state.split()[0]
print("state:{}".format(state))
if state == '1':
if trigger_action:
trigger_action()
elif st:
uut.s0.soft_trigger = '1'
time.sleep(1)
if args.trace_upload:
for u in uuts:
u.trace = 1
channel_set = eval(args.channels)
chx, ncol, nchan, nsam = shot_controller.read_channels(channel_set)
if args.save_data:
volts = [ uut.chan2volts(channel_set[chn], chx[0][chn]) for chn in range(0,nchan) ]
# save_data is the file name magdata_NNN
with open("{}.csv".format(args.save_data), 'w') as fid:
for sample in range(0, nsam):
fid.write('%.6f,' % (sample*1e-6))
for chn in range(0, nchan):
# for chn in range(0, 2):
# fid.write({:4},'.format(volts[chn][sample]))
fid.write('%.4f%c' % ((volts[chn][sample]), ',' if chn < nchan-1 else '\n'))
# plot ex: 2 x 8 ncol=2 nchan=8
# U1 U2 FIG
# 11 21 1 2
# 12 22 3 4
# 13 23
# ...
# 18 28 15 16
if plot_ok and args.plot_data:
for col in range(ncol):
for chn in range(0,nchan):
# channel index from 1 in API
volts = uut.chan2volts(chn+1, chx[col][chn])
fignum = 1 + col + chn*ncol
plt.subplot(nchan, ncol, fignum)
plt.plot(volts)
plt.show()
except acq400_hapi.cleanup.ExitCommand:
print("ExitCommand raised and caught")
finally:
print("Finally, going down")
SOFT_TRIGGER=int(os.getenv("SOFT_TRIGGER", "1"))
TRACE_UPLOAD=int(os.getenv("TRACE_UPLOAD", "0"))
SAVEDATA=os.getenv("SAVEDATA", None)
PLOTDATA=int(os.getenv("PLOTDATA", "0"))
CAPTURE=int(os.getenv("CAPTURE", "0"))
CHANNELS=os.getenv("CHANNELS", "()")
[docs]def uniq(inp):
out = []
for x in inp:
if x not in out:
out.append(x)
return out
[docs]def get_parser():
parser = argparse.ArgumentParser(description='upload quest')
parser.add_argument('--soft_trigger', default=SOFT_TRIGGER, type=int, help="help use soft trigger on capture")
parser.add_argument('--trace_upload', default=TRACE_UPLOAD, type=int, help="1: verbose upload")
parser.add_argument('--save_data', default=SAVEDATA, type=str, help="store data to specified directory")
parser.add_argument('--plot_data', default=PLOTDATA, type=int, help="1: plot data")
parser.add_argument('--capture', default=CAPTURE, type=int, help="1: capture data, 0: wait for someone else to capture, -1: just upload")
parser.add_argument('--remote_trigger', default=None, type=str, help="your function to fire trigger")
parser.add_argument('--channels', default=CHANNELS, type=str, help="comma separated channel list")
parser.add_argument('--CLKDIV', default=10, type=int, help="sample rate = 10MHz / CLKDIV")
parser.add_argument('--POST', default=100000, type=int, help="set number of post-shot samples")
parser.add_argument('uuts', nargs = '+', help="uut[s]")
return parser
[docs]def run_main(args):
# deduplicate (yes, some non-optimal apps call with duplicated uuts, wastes time)
args.uuts = uniq(args.uuts)
# encourage single ints to become a list
if re.search(r'^\d$', args.channels) is not None:
args.channels += ','
upload(args)
# execution starts here
if __name__ == '__main__':
run_main(get_parser().parse_args())