#!/usr/bin/env python
'''make_waves ...
make a related set of waveforms, store as raw binary, one file per channel
Created on 8 Jun 2021
@author: pgm
'''
import numpy as np
import argparse
import sys
import time
import matplotlib.pyplot as plt
import acq400_hapi
[docs]def ramp_wave(args):
return np.linspace(-args.amp, args.amp, num=args.pattern_len)
[docs]def sine_wave(args):
return np.sin(np.linspace(0, 2*np.pi, num=args.pattern_len))
[docs]def sinc_wave(args):
raw = np.sinc(np.linspace(-np.pi, np.pi, num=args.pattern_len))
left = 0
while raw[left] < 0:
raw[left] = 0
left+=1
right = len(raw)-1
while raw[right] < 0:
raw[right] = 0
right -= 1
return raw
[docs]def np_type(args):
return np.int32 if args.res != 16 else np.int16
[docs]def make_waves(args, iarg):
print("make_waves")
args.fn = eval(args.fxn+"_wave")
ch0 = args.fn(args)
print("shape ch0 {}".format(ch0.shape))
args.chx = np.zeros([args.len, args.nchan])
for ch in range(0, args.nchan):
offset = (ch - args.nchan/2)*args.offset_per_channel
prefix = np.zeros(args.stagger*ch)
args.chx[:,ch] = np.append(np.append(prefix, ch0), np.zeros(args.len-len(prefix)-args.pattern_len)) + offset
[docs]def store_files(args, iarg):
print("store_files")
if args.root:
for ch in range(0, args.nchan):
offset = (ch - args.nchan/2)*args.offset_per_channel
args.chx[:,ch].tofile("{}/ch{:02d}_{}_{}+{:.2}V.dat".
format(args.root, ch, args.len, args.amp, offset))
[docs]def scale_raw(args, iarg):
scale = (1 << (args.res-1))/args.vmax
rawv = (args.chx * scale)
args.raw = (args.chx * scale).astype(np_type(args))
print("scale_raw {} {} {}".format(args.raw.shape, args.raw.size, args.raw.dtype))
[docs]def store_raw(args, iarg):
print("store_raw {} {} {}".format(args.raw.shape, args.raw.size, args.raw.dtype))
args.raw.tofile("{}/myfile.raw".format(args.root))
[docs]def load_files(args, iarg):
print("load_files @@todo")
[docs]def load_raw(args, iarg):
print("load_raw")
args.raw = np.fromfile("{}/myfile.raw".format(args.root), np_type(args)).reshape((-1,args.nchan))
[docs]def plot1(args, iarg):
print("shape of plot data {}".format(args.raw.shape))
plt.plot(args.raw)
plt.show()
[docs]class ExitException(BaseException):
pass
[docs]def load_uut(args, iarg):
uut = None
args.continuous = False
args.autorearm = False
print("load_uut {}".format(" ".join(args.ops[iarg:])))
for ix, pram in enumerate(args.ops[iarg+1:]):
print("hello {} {}".format(ix, pram))
if ix == 0:
print("uut set {}".format(pram))
uut = acq400_hapi.factory(pram)
continue
if pram == "continuous":
args.continuous = True
if pram == "autorearm":
args.autorearm = True
if uut:
if args.continuous:
uut.s0.awg = 'SOFT_TRIGGER=0'
if (acq400_hapi.intpv(uut.s1.AWG_ACTIVE)) == 1:
print("AWG already running: call a halt")
uut.s1.AWG_MODE_ABO = '1'
time.sleep(1)
while acq400_hapi.intpv(uut.s1.AWG_MODE_ABO) == 1:
time.sleep(0.2)
args.rawx = args.raw
if args.expand_to:
wordsize = (4 if args.res != 16 else 2)
sizeof_raw = args.raw.size * wordsize
reps = args.expand_to // sizeof_raw
print("source {} X {} => {} (buffer {})".
format(sizeof_raw, reps, reps*sizeof_raw, args.expand_to))
dist_bufferlen = reps*sizeof_raw/4
print("dist_bufferlen set {}".format(dist_bufferlen))
uut.s0.dist_bufferlen_play = dist_bufferlen
for exp in range(1, reps):
args.rawx = np.append(args.rawx, args.raw)
print("expand {} {} {} bytes {}".
format(args.rawx.shape, args.rawx.size, args.rawx.dtype, args.rawx.size*wordsize))
uut.load_awg(args.rawx, autorearm=args.autorearm, continuous=args.continuous)
if args.soft_trigger:
uut.s0.soft_trigger = '1'
else:
print("ERROR: uut not specified")
print("exit")
raise ExitException
[docs]def is_param(args, iarg):
print("is_param")
OPS = { "generate": make_waves,
"store_files": store_files, "load_files": load_files,
"store_raw": store_raw, "load_raw": load_raw,
"scale_raw": scale_raw,
"plot1": plot1,
"load_uut": load_uut, "UUT": is_param, "continuous": is_param, "autorearm": is_param }
[docs]def get_parser():
parser = argparse.ArgumentParser(description='Make set of waveforms')
parser.add_argument('--nchan', default=16, type=int, help="number of channels in set")
parser.add_argument('--len', default=100000, type=int, help="number of samples in set")
parser.add_argument('--pulse', default=0, type=int, help="length of pulse inside set")
parser.add_argument('--stagger', default=0, type=int, help="length of pulse inside set")
parser.add_argument('--amp', default=1.0, type=float, help="amplitude in volts")
parser.add_argument('--ncycles', default=8, type=int, help="number of waveform cycles in set")
parser.add_argument('--offset_per_channel', default=0.0, type=float, help="offset in volts *ch")
parser.add_argument('--res', default=16, type=int, help="word size in bits")
parser.add_argument('--vmax', default=10.0, type=float, help="full scale voltage (always symmetrical twos comp")
parser.add_argument('--fxn', default='ramp', help="function to execute ramp or sin")
parser.add_argument('--root', default='DATA', help="offset in volts *ch")
parser.add_argument('--merge', default=1, help="merge data into single binary")
parser.add_argument('--expand_to', default=4*0x400000, help="expand to fit binary block size")
parser.add_argument('--soft_trigger', default=1, type=int, help="auto soft trigger on load")
parser.add_argument('ops', nargs='+', help="operations: one or more of "+" ".join(OPS)+" # for UUT, substitute UUT name")
return parser
[docs]def run_main(args):
if args.pulse:
args.pattern_len = args.pulse
else:
args.pattern_len = args.len
for iarg, op in enumerate(args.ops):
try:
OPS[op](args, iarg)
except ExitException:
print("quitting time")
sys.exit(0)
except Exception as e:
print("rejected {} {}".format(op, e))
sys.exit(0)
# execution starts here
if __name__ == '__main__':
run_main(get_parser().parse_args())