#!/usr/bin/env python
"""
acq400.py interface to one acq400 appliance instance
- enumerates all site services, available as uut.sX.knob
- monitors transient status on uut, provides blocking events
- read_channels() reads all data from channel data service.
- simple property interface allows natural "script-like" usage
- eg::
uut1.s0.set_arm = 1
- equivalent to running this on a logged in shell session on the UUT::
set.site1 set_arm=1
"""
import threading
import re
import os
import errno
import signal
import sys
if __name__ == '__main__':
import netclient
else:
from . import netclient
import numpy as np
import socket
import timeit
import time
[docs]class DataNotAvailableError(Exception):
pass
[docs]class AcqPorts:
"""uut server port constants"""
TSTAT = 2235
STREAM = 4210
SITE0 = 4220
SEGSW = 4250
SEGSR = 4251
DPGSTL = 4521
GPGSTL= 4541
GPGDUMP = 4543
WRPG = 4606
DIO482_PG_STL = 45001
DIO482_PG_DUMP = DIO482_PG_STL+2
BOLO8_CAL = 45072
DATA0 = 53000
DATAT = 53333
MULTI_EVENT_TMP = 53555
MULTI_EVENT_DISK = 53556
DATA_SPY = 53667
LIVETOP = 53998
ONESHOT = 53999
AWG_ONCE = 54201
AWG_AUTOREARM = 54202
AWG_CONTINUOUS = 54205
AWG_STREAM = 54207
MGTDRAM = 53993
MGTDRAM_PULL_DATA = 53991
SLOWMON = 53666
[docs]class AcqSites:
"""uut site constants"""
SITE0 = 0
SITE1 = 1
SITE2 = 2
SITE3 = 3
SITE4 = 4
SITE5 = 5
SITE6 = 6
SITE_CA = 13
SITE_CB = 12
SITE_CC = 11
SITE_DSP = 14
SITE_HUDP = 10
[docs]class SF:
"""uut system state constants"""
STATE = 0
PRE = 1
POST = 2
ELAPSED = 3
DEMUX = 5
[docs]class STATE:
"""transient state constants"""
IDLE = 0
ARM = 1
RUNPRE = 2
RUNPOST = 3
POPROCESS = 4
CLEANUP = 5
[docs] @staticmethod
def str(st):
if st==STATE.IDLE:
return "IDLE"
if st==STATE.ARM:
return "ARM"
if st==STATE.RUNPRE:
return "RUNPRE"
if st==STATE.RUNPOST:
return "RUNPOST"
if st==STATE.POPROCESS:
return "POPROCESS"
if st==STATE.CLEANUP:
return "CLEANUP"
return "UNDEF"
[docs]class Signals:
EXT_TRG_DX = 'd0'
INT_TRG_DX = 'd1'
MB_CLK_DX = 'd1'
[docs]class StreamClient(netclient.Netclient):
"""handles live streaming data"""
[docs] def __init__(self, addr):
print("worktodo")
[docs]class RawClient(netclient.Netclient):
""" handles raw data from any service port"""
[docs] def __init__(self, addr, port):
"""init RawClient
Args:
addr (str) : ip or hostname
port (int): service port see AcqPorts
"""
netclient.Netclient.__init__(self, addr, port)
[docs] def read(self, nelems, data_size=2):
"""read data from channel data server
Args:
nelems (int): data elements
data_size (int, optional): data size in bytes 2|4 short or int. Defaults to 2.
Returns:
ndarray: channel data
"""
_dtype = np.dtype('i4' if data_size == 4 else 'i2') # hmm, what if unsigned?
buf = bytearray(nelems*data_size)
try:
view = memoryview(buf).cast('B')
except:
# python 2.7 hack
view = memoryview(buf)
pos = 0
while len(view):
nrx = self.sock.recv_into(view)
if nrx == 0:
break # end of file
view = view[nrx:]
pos += nrx
if pos > 0 and pos < nelems:
print("WARNING: early termination at {}/{}".format(pos, nelems))
return np.frombuffer(buf[:pos], _dtype)
[docs] def get_blocks(self, nelems, data_size=2):
block = np.array([1])
while len(block) > 0:
block = self.read(nelems, data_size=data_size)
if len(block) > 0:
yield block
[docs]class MgtDramPullClient(RawClient):
[docs] def __init__(self, addr):
RawClient.__init__(self, addr, AcqPorts.MGTDRAM_PULL_DATA)
[docs]class ChannelClient(RawClient):
"""handles post shot data for one channel.
Args:
addr (str) : ip address or hostname
ch (int) : channel number 1..N
"""
[docs] def __init__(self, addr, ch):
RawClient.__init__(self, addr, AcqPorts.DATA0+ch)
[docs]class ExitCommand(Exception):
pass
[docs]def signal_handler(signal, frame):
raise ExitCommand()
[docs]class Statusmonitor:
""" monitors the status channel
Efficient event-driven monitoring in a separate thread
"""
st_re = re.compile(r"([0-9]) ([0-9]+) ([0-9]+) ([0-9]+) ([0-9])+" )
st_shot_re = re.compile(r"SHOT=([0-9]),([0-9]+),([0-9]+),([0-9]+)")
st_failed_to_find_event_re = re.compile(r"ERROR EVENT NOT FOUND")
st_timer_re = re.compile(r"Timer::report\(([0-9]+)\) ([A-Z]{3}) ([0-9]+) msec")
def __repr__(self):
return repr(self.logclient)
[docs] def st_monitor(self):
self.data_valid = "UNKNOWN"
while self.quit_requested == False:
try:
st = self.logclient.poll()
except OSError as err:
if self.quit_requested:
return
else:
raise err
if self.trace > 1:
print("%s <%s>" % (repr(self), st))
match = self.st_failed_to_find_event_re.search(st)
if match:
self.data_valid = "ERROR EVENT NOT FOUND"
match = self.st_timer_re.search(st)
if match:
print("TIMER: {} {} {} ms".format(match.group(2), match.group(1), match.group(3)))
if match.group(2) == "ROI":
self.search_roi_count += 1
elif match.group(2) == "ALL":
self.search_all_count += 1
else:
print("ERROR bad match {}".format(match.group(1)))
match = self.st_shot_re.search(st)
if match:
status1 = [int(x) for x in match.groups()]
if status1[0] == 1:
self.data_valid = "ARM"
elif status1[0] == 0:
if self.data_valid == "ARM" and status1[1] > 0 and status1[1] == status1[3]:
self.data_valid = "DATA_VALID"
continue
match = self.st_re.search(st)
# status is a match. need to look at group(0). It's NOT a LIST!
if match:
statuss = match.groups()
status1 = [int(x) for x in statuss]
if self.trace > 1:
print("%s <%s" % (repr(self), status1))
if self.status != None:
if self.status[SF.STATE] != status1[SF.STATE]:
self.state_changed.set()
# print("Status check %s %s" % (self.status0[0], status[0]))
if self.status[SF.STATE] != 0 and status1[SF.STATE] == 0:
if self.trace:
print("%s STOPPED!" % (self.uut))
self.stopped.set()
self.armed.clear()
# print("status[0] is %d" % (status[0]))
if status1[SF.STATE] == 1:
if self.trace:
print("%s ARMED!" % (self.uut))
self.armed.set()
self.stopped.clear()
if self.status[SF.STATE] == 0 and status1[SF.STATE] > 1:
if self.trace:
print("ERROR: %s skipped ARM %d -> %d" % (self.uut, self.status[0], status1[0]))
self.quit_requested = True
os.kill(self.main_pid, signal.SIGINT)
sys.exit(1)
self.status = status1
[docs] def get_state(self):
return self.status[SF.STATE]
[docs] def get_pre(self):
return self.status[SF.PRE]
[docs] def get_post(self):
return self.status[SF.POST]
[docs] def get_total(self):
return self.get_pre() + self.get_post()
[docs] def get_elapsed(self):
return self.status[SF.ELAPSED]
[docs] def wait_event(self, ev, descr=""):
# print("wait_%s 02 %d" % (descr, ev.is_set()))
while ev.wait(0.1) == False and not self.break_requested:
if self.quit_requested:
print("QUIT REQUEST call exit %s" % (descr))
sys.exit(1)
# print("wait_%s 88 %d" % (descr, ev.is_set()))
ev.clear()
# print("wait_%s 99 %d" % (descr, ev.is_set()))
return self.get_state()
[docs] def wait_armed(self):
"""blocks until uut is ARMED"""
self.wait_event(self.armed, "armed")
[docs] def wait_stopped(self):
"""blocks until uut is STOPPED"""
self.wait_event(self.stopped, "stopped")
[docs] def wait_state_changed(self):
"""blocks until state has changed"""
self.wait_event(self.state_changed, "state_change")
trace = int(os.getenv("STATUSMONITOR_TRACE", "0"))
[docs] def __init__(self, _uut, _status):
self.break_requested = False
self.quit_requested = False
self.trace = Statusmonitor.trace
self.uut = _uut
self.main_pid = os.getpid()
self.status = _status
self.stopped = threading.Event()
self.armed = threading.Event()
self.state_changed = threading.Event()
self.logclient = netclient.Logclient(_uut, AcqPorts.TSTAT)
self.st_thread = threading.Thread(target=self.st_monitor)
self.st_thread.setDaemon(True)
self.st_thread.start()
self.data_valid = "UNKNOWN"
self.search_roi_count = 0
self.search_all_count = 0
[docs]class NullFilter:
def __call__ (self, st):
print(st)
null_filter = NullFilter()
[docs]class ProcessMonitor:
st_rex = ( re.compile(r"^END" ), re.compile(r"^real"))
[docs] def st_monitor(self):
while self.quit_requested == False:
st = self.logclient.poll()
self.output_filter(st)
for re in self.st_rex:
if re.search(st):
self.quit_requested = True
break
[docs] def __init__(self, _uut, _monport, _filter, set_arm):
"""init ProcessMonitor
Args:
_uut (acq400): uut instance
_monport (int): monitor port
_filter (func): filtering function
set_arm (bool): set arm
"""
self.quit_requested = False
self.output_filter = _filter
self.logclient = netclient.Logclient(_uut.uut, _monport)
self.logclient.termex = re.compile("(\n)")
self.st_thread = threading.Thread(target=self.st_monitor)
self.st_thread.setDaemon(True)
self.st_thread.start()
if set_arm:
_uut.s0.BLT_SET_ARM = '1'
[docs]class Acq400:
"""Host-side proxy for Acq400 uut.
acq400 discovers and maintains all site servers, maintains a monitor and \
handles multiple channel post shot upload
"""
[docs] def init_site_client(self, site):
svc = netclient.Siteclient(self.uut, AcqPorts.SITE0+site)
self.svc["s%d" % site] = svc
self.modules[site] = svc
if self.awg_site == 0 and svc.module_name.startswith("ao"):
self.awg_site = site
self.mod_count += 1
[docs] @classmethod
def create_uuts(cls, uut_names):
""" create_uuts(): factory .. create them in parallel
*** Experimental Do Not Use ***
"""
uuts = []
uut_threads = {}
for uname in uut_names:
uut_threads[uname] = \
threading.Thread(\
target=lambda u, l: l.append(cls(u)), \
args=(uname, uuts))
for uname in uut_names:
uut_threads[uname].start()
for t in uut_threads:
uut_threads[t].join(10.0)
return uuts
uuts_methods = {} # for cloning by new
uuts = {} # for re-use by factory
[docs] def __init__(self, _uut, monitor=True, s0_client=None):
"""init acq400
Args:
_uut (srt): uut hostname or ip-address
monitor (bool, optional): start statusmonitor. Defaults to True.
s0_client (netclient.Siteclient, optional): existing siteclient. Defaults to None.
"""
try:
self.__dict__ = Acq400.uuts_methods[_uut]
return
except KeyError:
pass
self.verbose = int(os.getenv("ACQ400_VERBOSE", "0"))
self.NL = re.compile(r"(\n)")
self.uut = _uut
self.trace = 0
self.save_data = None
self.svc = {}
self.modules = {}
self.mod_count = 0
# channel index from 1,..
self.cal_eslo = [0, ]
self.cal_eoff = [0, ]
self.mb_clk_min = 4000000
s0 = self.svc["s0"] = s0_client if s0_client else netclient.Siteclient(self.uut, AcqPorts.SITE0)
sl = s0.SITELIST.split(",")
sl.pop(0)
self.awg_site = 0
site_enumerators = {}
for sm in sl:
site_enumerators[sm] = \
threading.Thread(target=self.init_site_client,\
args=(int(sm.split("=").pop(0)),)\
)
for sm in sl:
site_enumerators[sm].start()
for sm in sl:
# print("join {}".format(site_enumerators[sm]))
site_enumerators[sm].join(10.0)
self.sites = [int(s.split('=')[0]) for s in sl]
if monitor:
# init _status so that values are valid even if this Acq400 doesn't run a shot ..
try: _status = [int(x) for x in s0.state.replace('STX ', '').split(" ")]
except: _status = [0,0,0,0,0]
self.statmon = Statusmonitor(self.uut, _status)
Acq400.uuts_methods[_uut] = self.__dict__ # store the dict for reuse by __init__
Acq400.uuts[_uut] = self # store the object for reuse by factory()
[docs] def close(self):
"""Closes uut connection gracefully"""
self.statmon.quit_reqested = True
try:
self.statmon.logclient.close()
except Exception as e:
print(f"error closing statmon log client {e}")
try:
for k, s in self.svc.items():
s.close()
except Exception as e:
print(f"error closing svc item {k}:{s} {e}")
try:
del Acq400.uuts[self.uut]
del Acq400.uuts_methods[self.uut]
except KeyError:
print("ERROR {} instance not in cache".format(self.uut))
# def __del__(self):
# print("__del__ {}".format(self.uut))
# try:
# self.close()
# except Exception as e:
# print(f"Error closing Acq400 object ... {e}")
def __getattr__(self, name):
if self.svc.get(name) != None:
return self.svc.get(name)
else:
msg = "'{0}' object has no attribute '{1}'"
raise AttributeError(msg.format(type(self).__name__, name))
[docs] def state(self):
return self.statmon.status[SF.STATE]
[docs] def post_samples(self):
return self.statmon.status[SF.POST]
[docs] def pre_samples(self):
return self.statmon.status[SF.PRE]
[docs] def elapsed_samples(self):
return self.statmon.status[SF.ELAPSED]
[docs] def demux_status(self):
return self.statmon.status[SF.DEMUX]
[docs] def samples(self):
return self.pre_samples() + self.post_samples()
[docs] def get_aggregator_sites(self):
return self.s0.aggregator.split(' ')[1].split('=')[1].split(',')
[docs] def fetch_all_calibration(self):
"""Gets uut calibration and stores in instance"""
# print("Fetching calibration data")
self.cal_eslo = [0.]
self.cal_eoff = [0.]
for m in (self.modules[int(c)] for c in self.get_aggregator_sites()):
self.cal_eslo.extend(m.AI_CAL_ESLO.split(' ')[3:])
self.cal_eoff.extend(m.AI_CAL_EOFF.split(' ')[3:])
[docs] def scale_raw(self, raw, volts=False):
for (sx, m) in list(self.modules.items()):
if m.MODEL.startswith("ACQ43"):
rshift = 8
elif m.data32 == '1':
# volts calibration is normalised to 24b
if m.adc_18b == '1':
rshift = 14 - (8 if volts else 0)
else:
rshift = 16 - (8 if volts else 0)
else:
rshift = 0
break
return np.right_shift(raw, rshift)
[docs] def chan2volts(self, chan, raw):
"""returns calibrated volts for channel
Args:
chan (int): channel index
raw (ndarray): uncalibrated data array
Returns:
ndarray: calibrated data array
"""
if len(self.cal_eslo) == 1:
self.fetch_all_calibration()
eslo = float(self.cal_eslo[chan])
eoff = float(self.cal_eoff[chan])
if self.verbose > 1 or (self.verbose and chan < 4):
print("chan {} v = {}*{} + {}".format(chan, raw[0], eslo, eoff))
return np.add(np.multiply(raw, eslo), eoff)
[docs] def read_chan(self, chan, nsam = 0, data_size = None):
"""Reads a channels data
Args:
chan (int): channel number
nsam (int, optional): Number of samples. Defaults to 0.
data_size (int, optional): data size in bytes. Defaults to None.
Returns:
ndarray
"""
if data_size == None:
data_size = 4 if self.s0.data32 == '1' else 2
if chan == 0:
nsam = int(self.s0.raw_data_size) // data_size
if chan != 0 and nsam == 0:
nsam = self.pre_samples()+self.post_samples()
cc = ChannelClient(self.uut, chan)
ccraw = cc.read(nsam, data_size=data_size)
cc.close()
if self.save_data:
try:
os.makedirs(self.save_data)
except OSError as exception:
if exception.errno != errno.EEXIST:
raise
with open("%s/%s_CH%02d" % (self.save_data, self.uut, chan), 'wb') as fid:
ccraw.tofile(fid, '')
return ccraw
[docs] def read_decims(self, nsam = 0):
if nsam == 0:
nsam = self.pre_samples()+self.post_samples()
cc = ChannelClient(self.uut, AcqPorts.DATAT-AcqPorts.DATA0)
ccraw = cc.read(nsam, data_size=1)
if self.save_data:
try:
os.makedirs(self.save_data)
except OSError as exception:
if exception.errno != errno.EEXIST:
raise
with open("%s/%s_DEC" % (self.save_data, self.uut, chan), 'wb') as fid:
ccraw.tofile(fid, '')
return ccraw
[docs] def nchan(self):
"""gets total number of channels
can include scratchpad
Returns:
int: number of channels
"""
try:
return self._nchan
except:
self._nchan = int(self.s0.NCHAN)
return self._nchan
[docs] def data_size(self):
"""gets data size
Returns:
int: data size in bytes
"""
return 4 if self.s0.data32 == '1' else 2
[docs] def uut_demux_enabled(self):
"""returns demux status"""
ts = self.s0.transient
rc = bool(int(self.s0.transient.split("DEMUX=",1)[1][0]))
#print ("uut_demux_enabled(): transient {} decoded DEMUX={} rc={}".format(ts, ts.split("DEMUX=",1)[1][0], rc))
return rc
[docs] def read_channels(self, channels=(), nsam=0, localdemux=False):
"""read selected channels post shot data.
uut_demux_enabled() == False: UUT did NOT demux the data, data order [sample][channel]
uut_demux_enabled() == True: UUT did demux the data, data order [channel][sample]
DEMUX is a synonym for uut_demux_enabled()..
channels=(0) : return all (bulk) data, likely in raw[sample][channel] format.
channels=(0) localdemux=True: return all (bulk) in demux[channel][sample] format.
channels=N or channels=(N,N1,N2 ..) # N >= 1 && N <= NCHAN
There are 4 cases:
channels=(1,2...nchan) and (remote) DEMUX=1: SIMPLE, pull each channel at a time from the UUT.
channels=(0) and DEMUX=0 : SIMPLE: return the raw data.
channels=(1,2..) and DEMUX=1 : demux locally.
channels=(0) and DEMUX=0 and localdemux : demux locally, return bulk data [channels][samnples]
Args:
channels (tuple, optional): tuple contining channels to read . Defaults to all.
nsam (int, optional): Number of samples. Defaults to 0.
localdemux (bool, optional): demux data. Defaults to False.
Returns:
ndarray: channel data
"""
if channels == ():
channels = list(range(1, self.nchan()+1))
elif type(channels) == int:
channels = (channels,)
bulk_channels = channels == (0,)
chx = []
data_size = 4 if self.s0.data32 == '1' else 2
if self.uut_demux_enabled():
try:
nbytes = int(self.s1.ch_data_size)
if nbytes == 0:
raise DataNotAvailableError
except AttributeError:
print("using older firmware, unable to determine ch_data_size, keep going")
localdemux = False
else:
try:
nbytes = int(self.s0.raw_data_size)
if nbytes == 0:
raise DataNotAvailableError
except AttributeError:
print("using older firmware, unable to determine raw_data_size, keep going")
if not bulk_channels:
localdemux = True
if not localdemux:
# print("read_channels() NOT localdemux")
# likely: either pull channelized demux data or pull a full set of raw data.
for ch in channels:
if self.trace:
print("%s CH%02d start.." % (self.uut, ch))
start = timeit.default_timer()
chx.append(self.read_chan(ch, nsam, data_size=data_size))
if self.trace:
tt = timeit.default_timer() - start
print("%s CH%02d complete.. %.3f s %.2f MB/s" %
(self.uut, ch, tt, len(chx[-1])*2/1000000/tt))
else:
# only do local demux if client wants it and the data needs it..
data = np.array(self.read_chan(0, nsam, data_size=data_size))
data = data.reshape((-1, self.nchan()))
if bulk_channels:
# print("read_channels() local demux, bulk data")
channels = np.arange(0, self.nchan())
else:
# print("read_channels() local demux, selected channels")
channels = np.array(channels)-1
data = data[:,channels].transpose()
chx = [ item for item in data ]
return chx
[docs] def read_transient_timebase(self, nsamples, pre=0):
try:
fs = freq(self.s1.ACQ480_OSR)
except:
fs = freq(self.s0.SIG_CLK_S1_FREQ)
if fs > 1e6:
isi = 1 / np.round(fs / 1e6, 2) * 1e-6 # interval in seconds
else:
isi = 1 / np.round(fs / 1e3, 2) * 1e-3 # interval in seconds
return np.linspace(-pre*isi, (nsamples-pre)*isi, nsamples)
# DEPRECATED
[docs] def load_segments(self, segs):
with netclient.Netclient(self.uut, AcqPorts.SEGSW) as nc:
for seg in segs:
nc.sock.send((seg+"\n").encode())
# DEPRECATED
[docs] def show_segments(self):
with netclient.Netclient(self.uut, AcqPorts.SEGSR) as nc:
while True:
buf = nc.sock.recv(1024)
if buf:
print(buf)
else:
break
[docs] def clear_counters(self):
for s in self.svc:
self.svc[s].sr('*RESET=1')
[docs] def set_sync_routing_master(self, clk_dx="d1", trg_dx="d0"):
self.s0.SIG_SYNC_OUT_CLK = "CLK"
self.s0.SIG_SYNC_OUT_CLK_DX = clk_dx
self.s0.SIG_SYNC_OUT_TRG = "TRG"
self.s0.SIG_SYNC_OUT_TRG_DX = trg_dx
[docs] def set_sync_routing_slave(self):
self.set_sync_routing_master()
self.s0.SIG_SRC_CLK_1 = "HDMI"
self.s0.SIG_SRC_TRG_0 = "HDMI"
[docs] def set_sync_routing(self, role):
# deprecated
# set sync mode on HDMI daisychain
# valid roles: master or slave
if role == "master":
self.set_sync_routing_master()
elif role == "slave":
self.set_sync_routing_slave()
else:
raise ValueError("undefined role {}".format(role))
[docs] def set_mb_clk(self, hz=4000000, src="zclk", fin=1000000):
hz = int(hz)
if src == "zclk":
self.s0.SIG_ZCLK_SRC = "INT33M"
self.s0.SYS_CLK_FPMUX = "ZCLK"
self.s0.SIG_CLK_MB_FIN = 33333000
elif src == "xclk":
self.s0.SYS_CLK_FPMUX = "XCLK"
self.s0.SIG_CLK_MB_FIN = 32768000
else:
self.s0.SYS_CLK_FPMUX = "FPCLK"
self.s0.SIG_CLK_MB_FIN = fin
if hz >= self.mb_clk_min:
self.s0.SIG_CLK_MB_SET = hz
self.s1.CLKDIV = '1'
else:
for clkdiv in range(1,2000):
if hz*clkdiv >= self.mb_clk_min:
self.s0.SIG_CLK_MB_SET = hz*clkdiv
self.s1.CLKDIV = clkdiv
return
raise ValueError("frequency out of range {}".format(hz))
[docs] def load_stl(self, stl, port, trace = False, wait_eof = True, wait_eol = True):
"""Send a STL file to the specified port
Args:
stl (str): stl string each line seperated by newlines
port (int): port num see AcqPorts
trace (bool, optional): print each line sent. Defaults to False.
wait_eof (bool, optional): wait for end of file. Defaults to True.
wait_eol (bool, optional): wait for end of line. Defaults to True.
"""
termex = re.compile("\n")
with netclient.Netclient(self.uut, port) as nc:
lines = stl.split("\n")
for ll in lines:
if trace:
print("> {}".format(ll))
if len(ll) < 2:
if trace:
print("skip blank")
continue
if ll.startswith('#'):
if trace:
print("skip comment")
continue
nc.sock.send((ll+"\n").encode())
if wait_eol:
rx = nc.sock.recv(4096)
if trace:
print("< {}".format(rx))
nc.sock.send("EOF\n".encode())
nc.sock.shutdown(socket.SHUT_WR)
while wait_eof:
rx = nc.sock.recv(4096)
if trace:
print("< {}".format(rx))
if (str(rx).find("EOF")) != -1:
break
wait_end = wait_eof
[docs] def load_gpg(self, stl, trace = False):
"""Send stl to GPG port
Args:
stl (str): stl string each line seperated by newlines
trace (bool, optional): print each line sent. Defaults to False.
"""
self.load_stl(stl, AcqPorts.GPGSTL, trace)
[docs] def load_dpg(self, stl, trace = False):
"""Send stl to DPG port
Args:
stl (str): stl string each line seperated by newlines
trace (bool, optional): print each line sent. Defaults to False.
"""
self.load_stl(stl, AcqPorts.DPGSTL, trace, wait_eol=False)
[docs] def load_wrpg(self, stl, trace = False):
self.load_stl(stl, AcqPorts.WRPG, trace)
[docs] def load_dio482pg(self, site, stl, trace = False):
self.load_stl(stl, AcqPorts.DIO482_PG_STL+int(site)*10, trace)
[docs] def set_DO(self, site, dox, value = 'P'):
self.svc["s{}".format(site)].set_knob("DO_{}".format(dox), value)
[docs] class AwgBusyError(Exception):
[docs] def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
[docs] def load_awg(self, data, autorearm=False, continuous=False, repeats=1, port=None):
"""Load and config a AWG pattern
Args:
data (str): AWG pattern
autorearm (bool, optional): Rearm and wait after run. Defaults to False.
continuous (bool, optional): Run pattern continuously. Defaults to False.
repeats (int, optional): Number of pattern repetitions. Defaults to 1.
"""
if self.awg_site > 0:
if self.modules[self.awg_site].task_active == '1':
raise self.AwgBusyError("awg busy")
if port is None:
port = AcqPorts.AWG_CONTINUOUS if continuous else \
AcqPorts.AWG_AUTOREARM if autorearm else AcqPorts.AWG_ONCE
with netclient.Netclient(self.uut, port) as nc:
while repeats:
nc.sock.send(data)
repeats -= 1
nc.sock.shutdown(socket.SHUT_WR)
while True:
rx = nc.sock.recv(128)
if self.trace and rx:
print("<{}".format(rx))
if not rx or rx.startswith(b"DONE"):
break
nc.sock.close()
[docs] def run_service(self, port, eof="EOF", prompt='>'):
"""Run a service on the uut
Args:
port (int): service port see AcqPorts
eof (str, optional): end connection on character. Defaults to "EOF".
prompt (str, optional): prompt character. Defaults to '>'.
Returns:
str: service transcript
"""
txt = ""
with netclient.Netclient(self.uut, port) as nc:
while True:
rx = nc.receive_message(self.NL, 256)
txt += rx
txt += "\n"
print("{}{}".format(prompt, rx))
if rx.startswith(eof):
break
nc.sock.shutdown(socket.SHUT_RDWR)
nc.sock.close()
return txt
[docs] def run_oneshot(self):
with netclient.Netclient(self.uut, AcqPorts.ONESHOT) as nc:
while True:
rx = nc.receive_message(self.NL, 256)
print("{}> {}".format(self.s0.HN, rx))
if rx.startswith("SHOT_COMPLETE"):
break
nc.sock.shutdown(socket.SHUT_RDWR)
nc.sock.close()
[docs] def run_livetop(self):
with netclient.Netclient(self.uut, AcqPorts.LIVETOP) as nc:
while True:
rx = nc.receive_message(self.NL, 256)
print(rx)
if rx.startswith("TIMED_OUT"):
break
nc.sock.shutdown(socket.SHUT_RDWR)
nc.sock.close()
[docs] def disable_trigger(self):
#master.s0.SIG_SRC_TRG_0 = 'NONE'
#master.s0.SIG_SRC_TRG_1 = 'NONE'
self.s0.SIG_SRC_TRG_0 = 'NONE'
self.s0.SIG_SRC_TRG_1 = 'NONE'
[docs] def enable_trigger(self, trg_0='EXT', trg_1='STRIG'):
if trg_0 is not None:
self.s0.SIG_SRC_TRG_0 = trg_0
if trg_1 is not None:
self.s0.SIG_SRC_TRG_1 = trg_1
[docs] def configure_post(self, role, trigger=[1,1,1], post=100000):
""" Configure UUT for a regular transient capture
Args:
role (str): uut role ``master`` or ``slave``
trigger (list, optional): Trigger trinary. Defaults to [1,1,1].
post (int, optional): post samples. Defaults to 100k.
"""
print(trigger)
self.s0.transient = "PRE=0 POST={} SOFT_TRIGGER={}".format(post, trigger[1])
self.s1.TRG = 1
if role == "slave" or trigger[1] == 0:
self.s1.TRG_DX = 0
else:
self.s1.TRG_DX = 1
self.s1.TRG_SENSE = trigger[2]
self.s1.EVENT0 = 0
self.s1.EVENT0_DX = 0
self.s1.EVENT0_SENSE = 0
self.s1.RGM = 0
self.s1.RGM_DX = 0
self.s1.RGM_SENSE = 0
self.s1.RGM = 0 # Make sure RGM mode is turned off.
self.s0.SIG_EVENT_SRC_0 = 0
return None
[docs] def configure_pre_post(self, role, trigger=[1,1,1], event=[1,1,1], pre=50000, post=100000):
"""Configure UUT for pre/post mode.
Default setup: soft trigger starts the data flow and trigger the event on a hard external trigger.
Args:
role (str): uut role ``master`` or ``slave``
trigger (list, optional): Trigger trinary. Defaults to [1,1,1].
event (list, optional): Event trinary. Defaults to [1,1,1].
pre (int, optional): pre samples. Defaults to 50k.
post (int, optional): post samples. Defaults to 100k.
"""
if pre > post:
print("PRE samples cannot be greater than POST samples. Config not set.")
return None
trg = 1 if trigger[1] == 1 else 0
self.s0.transient = "PRE={} POST={} SOFT_TRIGGER={}".format(pre, post, trg)
self.s1.TRG = trigger[0]
if role == "slave" or trigger[1] == 0:
self.s1.TRG_DX = 0
else:
self.s1.TRG_DX = 1
self.s1.TRG_SENSE = trigger[2]
self.s1.EVENT0 = event[0]
self.s1.EVENT0_DX = event[1]
self.s1.EVENT0_SENSE = event[2]
self.s1.RGM = 0
self.s1.RGM_DX = 0
self.s1.RGM_SENSE = 0
self.s1.RGM = 0 # Make sure RGM mode is turned off.
self.s0.SIG_EVENT_SRC_0 = 0
return None
[docs] def get_demux_state(self):
"""Returns the current state of demux
Beware: if demux is set after the shot then this function will return the new state. There is no way to
determine what the state was during the previous shot.
"""
transient = self.s0.transient
demux_state = transient.split("DEMUX=",1)[1][0]
return int(demux_state)
[docs] def pull_plot(self, channels=(), demux=-1):
"""This function returns an array of the specified channels and plots the data.
Pulls data from 53000 or 5300X and will return the corresponding data from each 5300X port \
(if demux is on) or will return the corresponding data filtered from 53000 if demux is off.
Args:
channels (tuple, optional): tuple containing channel indexes. Defaults to ().
demux (int, optional): 1 demux on 0 demux off -1 autodetect. Defaults to -1.
"""
data = []
if demux == -1:
demux = self.get_demux_state()
if demux == 1:
data = self.read_channels(channels)
elif demux == 0:
mux_data = self.read_muxed_data()
print("mux data = ", mux_data)
nchan = self.nchan()
if channels == ():
channels = list(range(1,nchan+1))
for ch in channels:
print("Channel - ", ch)
data.append(mux_data[ch-1::nchan])
import matplotlib.pyplot as plt
for channel in data:
plt.plot(channel)
plt.grid(True)
plt.show()
return data
[docs] def read_muxed_data(self):
"""returns data from port 53000.
Only use if demux is turned off.
"""
data = self.read_channels((0), -1)
return data[0]
[docs] def pull_data(self):
"""Gets data from all AI channels
Returns:
ndarray: channel data
"""
demux_state = self.get_demux_state()
channels = list(range(1, self.get_ai_channels()+1))
nchan = channels[-1]
if demux_state == 1:
data = self.read_channels(channels, -1)
elif demux_state == 0:
data = []
mux_data = self.read_muxed_data()
for ch in channels:
data.append(mux_data[ch-1::nchan])
return data
[docs] def get_ai_channels(self):
"""Gets total number of AI channels
nchan can sometimes include scratchpad
Returns:
int: total AI channels
"""
ai_channels = 0
site_types = self.get_site_types()
for ai_site in site_types["AISITES"]:
ai_site = "s{}".format(ai_site)
ai_channels += int(getattr(getattr(self, ai_site), "NCHAN"))
return ai_channels
[docs] def get_site_types(self):
"""gets all sites grouped by site type
Returns:
dict: AISITES, AOSITES, and DIOSITES
"""
AISITES = []
AOSITES = []
DIOSITES = []
for site in [1,2,3,4,5,6]:
try:
module_name = eval('self.s{}.module_name'.format(site))
if module_name.startswith('acq'):
AISITES.append(site)
elif module_name.startswith('ao'):
AOSITES.append(site)
elif module_name.startswith('dio'):
DIOSITES.append(site)
except Exception:
continue
site_types = { "AISITES": AISITES, "AOSITES": AOSITES, "DIOSITES": DIOSITES }
return site_types
[docs] def get_es_indices(self, file_path="default", nchan="default", human_readable=0, return_hex_string=0):
"""Returns the location of event samples.
Args:
file_path (str, optional): data source. Can load from file. Defaults to from uut.
nchan (int, optional): total chans use when loading from file. Defaults to from uut.
human_readable (int, optional): returns hex interpretations of the event sample data. Defaults to 0.
return_hex_string (int, optional): if 1 and human_readable 1 returns single string containing all of \
the event samples. Defaults to 0.
Returns:
list: [ [Event sample indices], [Event sample data] ]
"""
indices = []
event_samples = []
nchan = self.nchan() if nchan == "default" else nchan
if file_path == "default":
data = self.read_muxed_data()
data = np.array(data)
if data.dtype == np.int16:
# convert shorts back to raw bytes and then to longs.
data = np.frombuffer(data.tobytes(), dtype=np.uint32)
else:
data = np.fromfile(file_path, dtype=np.uint32)
if int(self.s0.data32) == 0:
nchan = nchan / 2 # "effective" nchan has halved if data is shorts.
nchan = int(nchan)
for index, sample in enumerate(data[0::nchan]):
# if sample == np.int32(0xaa55f154): # aa55
if sample == np.uint32(0xaa55f154): # aa55
indices.append(index)
event_samples.append(data[index*nchan:index*nchan + nchan])
if human_readable == 1:
# Change decimal to hex.
ii = 0
while ii < len(event_samples):
if type(event_samples[ii]) == np.ndarray:
event_samples[ii] = event_samples[ii].tolist()
for indice, channel in enumerate(event_samples[ii]):
event_samples[ii][indice] = '0x{0:08X}'.format(channel)
ll = int(len(event_samples[ii])/int(len(self.get_aggregator_sites())))
# print(event_samples[ii])
event_samples[ii] = [event_samples[ii][i:i + ll] for i in range(0, len(event_samples[ii]), ll)]
ii += 1
if return_hex_string == 1:
# Make a single string containing the hex values.
es_string = ""
for num, sample in enumerate(event_samples):
for i in range(len(sample[0])):
for x in event_samples[num]:
es_string = es_string + str(x[i]) + " "
es_string = es_string + "\n"
es_string = es_string + "\n"
event_samples = es_string
return [indices, event_samples]
[docs] def stream(self, recvlen=4096*32, port=AcqPorts.STREAM, data_size=2):
"""Runs stream and yields data buffers
Args:
recvlen (_type_, optional): buffer size. Defaults to 4096*32.
port (_type_, optional): uut port. Defaults to AcqPorts.STREAM value.
data_size (int, optional): data size in bytes. Defaults to 2.
Yields:
ndarray: data buffer
"""
dtype = np.dtype('i4' if data_size == 4 else 'i2') # hmm, what if unsigned?
self.stream_nc = netclient.Netclient(self.uut, port)
buf = bytearray(recvlen*data_size)
while self.stream_nc:
view = memoryview(buf).cast('B')
pos = 0
while len(view) and self.stream_nc:
nrx = self.stream_nc.sock.recv_into(view)
if nrx == 0:
yield np.frombuffer(buf[:pos], dtype)
pos = 0
else:
view = view[nrx:]
pos += nrx
yield np.frombuffer(buf[:pos], dtype)
[docs] def stream_close(self):
if self.stream_nc:
self.stream_nc.close()
self.stream_nc = None
else:
print("stream_close(), sorry not possible to close it down ..")
# if spad is set, it's a synthetic spad, not part of ssb
[docs] def stream_slowmon(self, nspad=None):
ssb = int(self.s0.ssb)
data_sz = 4 if int(self.s0.data32) else 2
if not nspad:
nspad = int(self.s0.spad.split(',')[1])
spad_sz = nspad*4
nchan = (ssb - spad_sz)//data_sz
else:
main_nspad = int(self.s0.spad.split(',')[1])
nchan = (ssb - main_nspad*4)//data_sz
ssb = nchan*data_sz + nspad*4
ch_dtype = np.dtype('i4' if data_sz == 4 else 'i2')
sp_dtype = np.dtype('u4')
spad_off = nchan*data_sz//4
self.slowmon_nc = netclient.Netclient(self.uut, AcqPorts.SLOWMON)
buf = bytearray(ssb)
view = memoryview(buf).cast('B')
while self.slowmon_nc:
view = memoryview(buf).cast('B')
ib = 0
while ib < ssb and self.slowmon_nc:
nrx = self.slowmon_nc.sock.recv_into(view)
#print("stream_slowmon: nrx:{} ib:{} len(view) {}".format(nrx, ib, len(view)))
if nrx == 0:
print("stream_slowmon: nrx:{} ib:{} len(view) {}".format(nrx, ib, len(view)))
#sys.exit(-1)
else:
view = view[nrx:]
ib += nrx
chx = np.frombuffer(buf, ch_dtype)[:nchan]
spx = np.frombuffer(buf, sp_dtype)[spad_off:]
yield(chx, spx)
[docs] def slowmon_close(self):
if self.slowmon_nc:
self.slowmon_nc.close()
self.slowmon_nc = None
else:
print("slowmon_close(), sorry not possible to close it down ..")
def pv(_pv):
return _pv.split(" ")[1]
def freq(sig):
#deprecated
return float(pv(sig))
[docs]def freqpv(sig):
return float(pv(sig))
[docs]def floatpv(_pv):
return float(pv(_pv))
def intpv(_pv):
return int(pv(_pv))
[docs]def activepv(_pv):
return int(float(pv(_pv))) > 0
[docs]class Acq2106(Acq400):
"""Acq2106 specialization of Acq400
Defines features specific to ACQ2106
"""
[docs] def __init__(self, _uut, monitor=True, s0_client=None, has_dsp=False, has_comms=True, has_wr=False, has_hudp=False):
"""init acq2106
Args:
_uut (srt): uut hostname or ip-address
monitor (bool, optional): start statusmonitor. Defaults to True.
s0_client (netclient.Siteclient, optional): existing siteclient. Defaults to None.
has_dsp (bool, optional): if uut has dsp. Defaults to False.
has_comms (bool, optional): if uut has comms. Defaults to True.
has_wr (bool, optional): if uut has white rabbit. Defaults to False.
has_hudp (bool, optional): if uut has hudp. Defaults to False.
"""
Acq400.__init__(self, _uut, monitor=monitor, s0_client=s0_client)
self.mb_clk_min = 100000
sn_map = ()
if has_comms:
for mgt in self.svc['s0'].has_mgt.split(' '):
sn_map += ((self.get_mgt_site(mgt)),)
if has_wr:
sn_map += (('cC', AcqSites.SITE_CC), )
if has_dsp:
sn_map += (('s14', AcqSites.SITE_DSP),)
if has_hudp:
sn_map += (('s10', AcqSites.SITE_HUDP), )
for ( service_name, site ) in sn_map:
try:
self.svc[service_name] = netclient.Siteclient(self.uut, AcqPorts.SITE0+site)
except socket.error:
print("uut {} site {} not populated".format(_uut, site))
self.mod_count += 1
[docs] def get_mgt_site(self, mgt):
"""gets mgt site str from num
Args:
mgt (int): mgt site num
Returns:
tuple: (site string, site num)
"""
comms_config = {
11 : ('cC', AcqSites.SITE_CC),
12 : ('cB', AcqSites.SITE_CB),
13 : ('cA', AcqSites.SITE_CA),
}
return comms_config[int(mgt)]
[docs] def set_mb_clk(self, hz=4000000, src="zclk", fin=1000000):
print("set_mb_clk {} {} {}".format(hz, src, fin))
Acq400.set_mb_clk(self, hz, src, fin)
try:
self.s0.SYS_CLK_DIST_CLK_SRC = 'Si5326'
except AttributeError:
print("SYS_CLK_DIST_CLK_SRC, deprecated")
self.s0.SYS_CLK_OE_CLK1_ZYNQ = '1'
[docs] def set_sync_routing_slave(self):
Acq400.set_sync_routing_slave(self)
self.s0.SYS_CLK_OE_CLK1_ZYNQ = '1'
[docs] def set_master_trg(self, trg, edge = "rising", enabled=True):
if trg == "fp":
self.s0.SIG_SRC_TRG_0 = "EXT" if enabled else "HOSTB"
elif trg == "int":
self.s0.SIG_SRC_TRG_1 = "STRIG"
[docs] def set_MR(self, enable, evsel0=4, evsel1=5, MR10DEC=8):
if enable:
self.s1.ACQ480_MR_EVSEL_0 = 'd{}'.format(evsel0)
self.s1.ACQ480_MR_EVSEL_1 = 'd{}'.format(evsel1)
self.s1.ACQ480_MR_10DEC = 'dec{}'.format(MR10DEC)
self.s1.ACQ480_MR_EN = '1'
else:
self.s1.ACQ480_MR_EN = '0'
[docs] def wr_PPS_active(self):
if self.cC.WR_PPS_ACTIVE.split(' ')[1] == '1.0':
return True
else:
pps0 = self.cC.WR_PPS_COUNT.split(' ')[1]
time.sleep(2)
pps1 = self.cC.WR_PPS_COUNT.split(' ')[1]
return pps0 != pps1
[docs] def get_sys_info(self):
"""Gets uut system information
Returns:
str: system info
"""
line = "{: <6}{: <16}{: <35}{: <16}\n"
from datetime import datetime
info = ""
info += "="*72 + "\n"
info += datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "\n\n"
info += "FPGA: {}\n".format(self.s0.fpga_version)
info += "FW: {}\n\n".format(self.s0.software_version)
info += "MB Info:\n"
info += "{: <57}{: <16}\n".format("MODEL", "SERIAL")
info += "{: <57}{: <16}\n\n".format(self.s0.MODEL, self.s0.SERIAL)
info += "-" * 72 + "\n\n"
info += "Site Info:\n"
info += line.format("SITE", "MODEL", "PART_NUM", "SERIAL")
for site in self.sites:
info += line.format(site,
self.svc["s{}".format(site)].MODEL,
self.svc["s{}".format(site)].PART_NUM,
self.svc["s{}".format(site)].SERIAL)
info += "="*72
return info
[docs]class Acq2106_Mgtdram8(Acq2106):
"""Mgtdram8 specialization of Acq2106
Defines features specific to Acq2106_Mgtdram8
"""
MGT_BLOCK_BYTES = 0x400000
MGT_BLOCK_MULTIPLE = 16
[docs] def __init__(self, uut, monitor=True, s0_client=None):
"""init Acq2106_Mgtdram8
Args:
_uut (srt): uut hostname or ip-address
monitor (bool, optional): start statusmonitor. Defaults to True.
s0_client (netclient.Siteclient, optional): existing siteclient. Defaults to None.
"""
Acq2106.__init__(self, uut, monitor=monitor, s0_client=s0_client, has_dsp=True)
[docs] def run_mgt(self, filter = null_filter, set_arm=True):
pm = ProcessMonitor(self, AcqPorts.MGTDRAM, filter, set_arm)
while pm.quit_requested != True:
time.sleep(1)
[docs] def create_mgtdram_pull_client(self):
return MgtDramPullClient(self.uut)
[docs]class Acq2106_TIGA(Acq2106):
"""TIGA specialization of Acq2106
Defines features specific to Acq2106_TIGA
"""
[docs] def __init__(self, uut, monitor=True, s0_client=None):
"""init Acq2106_TIGA
Args:
_uut (srt): uut hostname or ip-address
monitor (bool, optional): start statusmonitor. Defaults to True.
s0_client (netclient.Siteclient, optional): existing siteclient. Defaults to None.
"""
Acq2106.__init__(self, uut, monitor=monitor, s0_client=s0_client, has_wr=True)
self.pg_sites = [ sx for sx in range(1,6+1) if sx in self.sites and self.svc["s{}".format(sx)].MTYPE == '7B' ]
[docs] def load_dio482pg(self, site, stl, trace = False):
self.load_stl(stl, AcqPorts.DIO482_PG_STL+site*10, trace)
[docs] def set_DO(self, site, dox, value = 'P'):
self.svc["s{}".format(site)].set_knob("DO_{}".format(dox), value)
[docs]def run_unit_test():
SERVER_ADDRESS = '10.12.132.22'
if len(sys.argv) > 1:
SERVER_ADDRESS = sys.argv[1]
print("create Acq400 %s" %(SERVER_ADDRESS))
uut = Acq400(SERVER_ADDRESS)
print("MODEL %s" %(uut.s0.MODEL))
print("SITELIST %s" %(uut.s0.SITELIST))
print("MODEL %s" %(uut.s1.MODEL))
print("Module count %d" % (uut.mod_count))
print("POST SAMPLES %d" % uut.post_samples())
for sx in sorted(uut.svc):
print("SITE:%s MODEL:%s" % (sx, uut.svc[sx].sr("MODEL")))
[docs]def sigsel(enable=1, dx=1, site=None, edge=1):
if not site is None:
return "{},{},{}".format(enable, site+1, edge)
else:
return "{},{},{}".format(enable, dx, edge)
[docs]def factory(_uut):
"""deduce what sort of uut this is and invoke the appropriate class
Preferred to init hapi instance
Args:
_uut (str): uut hostname or ip-address
Returns:
acq400: uut hapi instance
"""
try:
cached = Acq400.uuts[_uut]
return cached
except KeyError:
pass
s0 = netclient.Siteclient(_uut, AcqPorts.SITE0)
acq2106_models = ('acq2106', 'acq2206', 'z7io')
if not s0.MODEL.startswith(acq2106_models):
return Acq400(_uut, s0_client=s0)
# here with acq2106
try:
if hasattr(s0, 'is_tiga') and s0.is_tiga != "none":
return Acq2106_TIGA(_uut, s0_client=s0)
if hasattr(s0, 'has_mgt') and hasattr(s0, 'has_mgtdram'):
if s0.has_mgt != "none" and s0.has_mgtdram != "none":
return Acq2106_Mgtdram8(_uut, s0_client=s0)
has_dsp = False
if hasattr(s0, 'has_dsp'):
has_dsp = s0.has_dsp != "none"
has_wr = False
if hasattr(s0, 'has_wr'):
has_wr = s0.has_wr != "none"
has_sfp = False
if hasattr(s0, 'has_mgt'):
has_sfp = s0.has_mgt != "none"
has_hudp = False
if hasattr(s0, 'has_hudp'):
has_hudp = s0.has_hudp != "none"
return Acq2106(_uut, s0_client=s0, has_dsp=has_dsp, has_comms=has_sfp, has_wr=has_wr, has_hudp=has_hudp)
except:
''' nothing special, make it a default class with existing s0
'''
print("Factory method encountered a problem. Most likely target system"
"\nFW is older than necessary. Using default acq2106 settings.\n")
return Acq2106(_uut, s0_client=s0)
[docs]def get_hapi():
''' find instance of hapi '''
return os.path.dirname(os.path.dirname(__file__))
[docs]def freq(sig):
return float(sig.split(" ")[1])
[docs]def intpv(pv):
return int(pv.split(" ")[1])
[docs]def pv(pv):
return pv.split(" ")[1]
if __name__ == '__main__':
run_unit_test()