Source code for user_apps.acq400.acq400_stream_multi

#!/usr/bin/env python3

"""
This is a script intended to connect to a UUT and stream data from port 4210.

The data that has been streamed is not demuxed and so if it is to be used then it has to be demuxed first.
Something like:

    >>> data = numpy.fromfile("0000", dtype="<datatype>")
    >>> plt.plot(data[::<number of channels>])
    >>> plt.show()

    
Some usage examples are included below:

1: Acquire files of size 1024kb up to a total of 4096kb:


    >>> python acq400_stream.py --verbose=1 --filesize=1M --totaldata=4M <module ip or name>

2: Acquire a single file of size 4096kb:


    >>> python acq400_stream.py --verbose=1 --filesize=4M --totaldata=4M <module ip or name>

3: Acquire files of size 1024 for 10 seconds:


    >>> python acq400_stream.py --verbose=1 --filesize=1M --runtime=10 <module ip or name>

4: Acquire data for 5 seconds and write the data all to a single file:


    >>> python acq400_stream.py --verbose=1 --filesize=9999M --runtime=5 <module ip or name>

.. rst-class:: hidden

    usage::
        acq400_stream.py [-h] [--filesize FILESIZE] [--totaldata TOTALDATA]
                            [--root ROOT] [--runtime RUNTIME] [--verbose VERBOSE]
                            uuts [uuts ...]

    acq400 stream

    positional arguments:
    uuts                  uuts

    optional arguments:
    -h, --help            show this help message and exit
    --filesize FILESIZE   Size of file to store in KB. If filesize > total data\
        then no data will be stored.
    --totaldata TOTALDATA  Total amount of data to store in KB
    --root ROOT           Location to save files
    --runtime RUNTIME     How long to stream data for
    --verbose VERBOSE     Prints status messages as the stream is running

"""

import acq400_hapi
import numpy as np
import os
import time
import argparse
import sys
import signal
import shutil
from acq400_hapi.acq400_print import DISPLAY

import multiprocessing as MP
import threading

[docs]def make_data_dir(directory, verbose): if verbose > 2: print("make_data_dir {}".format(directory)) try: os.makedirs(directory) except Exception: if verbose > 2: print("Directory already exists") pass
[docs]def remove_stale_data(args): for uut in args.uuts: path = os.path.join(args.root, uut) if os.path.exists(path): if args.force_delete: pass else: answer = input(f"Stale data detected. Delete all contents in {path}? y/n ") if answer != 'y': continue if args.verbose: print("removing {}".format(path)) shutil.rmtree(path)
[docs]def self_burst_trigger_callback(uut, job): def cb(fn): if job: os.system(f'{job} {fn}') for line in sys.stdin: if line.startswith('q'): uut.s0.set_abort = 1 uut.close() sys.exit(0) else: break uut.s0.soft_trigger = 1 return cb
[docs]def self_start_trigger_callback(uut): def cb(): print("self_start_trigger_callback") while uut.s0.state.split(' ')[0] != '1': time.sleep(0.5) uut.s0.soft_trigger = 1 return cb
[docs]class StreamsOne:
[docs] class pipe_conn:
[docs] def __init__(self, pipe): self.pipe = pipe self.status = { 'state' : None, 'stopped' : False, }
[docs] def send(self): self.pipe.send(self.status)
[docs] def set(self, key, value): self.status[key] = value
[docs] def __init__ (self, args, uut_name, halt, pipe, delay): self.args = args self.uut_name = uut_name self.halt = halt self.delay = delay self.status = self.pipe_conn(pipe) self.previous = None self.log_file = os.path.join(args.root, f"{uut_name}_times.log") open(self.log_file, 'w').close()
[docs] def logtime(self, t0, t1): if not self.previous: self.previous = t1 with open(self.log_file, 'a') as f: f.write(f"{int((t1 - t0) * 1000)} {int((t1 - self.previous) * 1000 )}\n") self.previous = t1 return t1
[docs] def update_status_forever(self): while True: self.status.set('state', acq400_hapi.pv(self.uut.s0.CONTINUOUS_STATE)) self.status.send() time.sleep(1)
[docs] def stop_proccess(self, reason): self.status.set('stopped', True) self.uut.stream_close() self.halt.wait() exit(reason)
[docs] def run(self, callback=None): self.uut = acq400_hapi.factory(self.uut_name) threading.Thread(target=self.update_status_forever, daemon=True).start() time.sleep(self.delay) cycle = -1 fnum = 999 # force initial directory create data_bytes = 0 files = 0 signal.signal(signal.SIGINT, signal.SIG_IGN) if callback is None: callback = lambda _clidata: False if self.args.burst_on_demand: self.uut.s1.rgm='3,1,1' bod_def = self.args.burst_on_demand.split(',') bod_len = int(bod_def[0]) bod_job = None if len(bod_def) == 2: bod_job = bod_def[1] self.uut.s1.RTM_TRANSLEN = bod_len self.args.filesamples = bod_len if self.args.trigger_from_here != 0: callback = self_burst_trigger_callback(self.uut, bod_job) self.thread = threading.Thread(target=self_start_trigger_callback(self.uut)) self.thread.daemon = True self.thread.start() try: if int(self.uut.s0.data32): data_size = 4 wordsizetype = "<i4" # 32 bit little endian else: wordsizetype = "<i2" # 16 bit little endian data_size = 2 except AttributeError: print("Attribute error detected. No data32 attribute - defaulting to 16 bit") wordsizetype = "<i2" # 16 bit little endian data_size = 2 netssb = int(self.uut.s0.ssb) if self.args.subset: c1,clen = [ int(x) for x in self.args.subset.split(',')] netssb = clen * data_size if self.args.filesamples: self.args.filesize = self.args.filesamples*netssb blen = self.args.filesize//data_size if self.args.burst_on_demand and self.args.verbose: print(f'burst_on_demand RTM_TRANSLEN={self.args.burst_on_demand} netssb={netssb} filesize={self.args.filesize} blen={blen}') t_run = 0 fn = "no-file" data_file = None for buf in self.uut.stream(recvlen=blen, data_size=data_size): if self.halt.is_set(): self.stop_proccess(f"{self.uut_name} Stopped") if data_bytes == 0: t0 = time.time() else: t_run = self.logtime(t0, time.time()) - t0 data_bytes += len(buf) * data_size if len(buf) == 0: print("Zero length buffer, quit") return self.status.set('runtime', f"{t_run:.0f}s") self.status.set('total bytes', f"{data_bytes}") self.status.set('rate', f"{data_bytes / t_run / 0x100000 if t_run else 0:.2f}MB/s") self.status.set('files', f"{files}") if not self.args.nowrite: if fnum >= self.args.files_per_cycle: fnum = 0 cycle += 1 if data_file: data_file.close() data_file = None root = os.path.join(self.args.root, self.uut_name, "{:06d}".format(cycle)) make_data_dir(root, self.args.verbose) if not self.args.combine: fn = os.path.join(root, f"{fnum:04d}.dat") with open(fn, 'wb') as data_file: buf.tofile(data_file) data_file = None files += 1 else: if not data_file: fn = os.path.join(root, f"{0:04d}-{self.args.files_per_cycle:04d}.dat") data_file = open(fn, "wb") files += 1 buf.tofile(data_file) if self.args.verbose == 0: pass elif self.args.verbose == 1: pass if not self.args.display and self.args.verbose > 2: if t_run > 0: print("{:8.3f} {} files {:4d} total bytes: {:10d} data bytes: {} rate: {:.2f} MB/s". format(t_run, fn, files, int(data_bytes), int(data_bytes), data_bytes/t_run/0x100000)) fnum += 1 if callback(fn) or t_run >= self.args.runtime or data_bytes > self.args.totaldata: break self.stop_proccess(f"{self.uut_name} Finished")
[docs]def status_cb(): print("Another one")
[docs]def run_stream_run(args): def wrapper(args, uut, halt, pipe, delay): streamer = StreamsOne(args, uut, halt, pipe, delay) streamer.run() recvs = {} pss = {} delay = 2 halt = MP.Event() for uut in args.uuts: recv, pipe = MP.Pipe() recvs[uut] = recv pss[uut] = MP.Process(target=wrapper, args=(args, uut, halt, pipe, delay,), daemon=False) pss[uut].start() delay = 0 D = DISPLAY() uut_status = {} start_time = time.time() try: while True: stopped = 0 for uut_name, ps in pss.items(): while recvs[uut_name].poll(): try: uut_status[uut_name] = recvs[uut_name].recv() except EOFError: try: uut_status[uut_name]['state'] = 'DEAD' except: pass D.add_line("") D.add_line(f"{{BOLD}}Stream Multi {{RESET}}Runtime: {round(time.time() - start_time)}s") for uut, status in uut_status.items(): if status['stopped']: stopped += 1 D.add(f"{{REVERSE}}{uut}{{RESET}} ") for key, value in status.items(): D.add(f"{{BOLD}}{key}{{RESET}}[{value}] ") D.end() if stopped == len(pss): halt.set() D.render(False) break if args.display: D.render() D.buffer = '' time.sleep(1) except KeyboardInterrupt: D.render_interrupted() halt.set() print('Keyboard Interrupt') print('Done')
[docs]def run_stream_prep(args): if args.filesize > args.totaldata: args.filesize = args.totaldata remove_stale_data(args) if args.root and not os.path.exists(args.root): os.makedirs(args.root) return args
[docs]def get_parser(parser=None): if not parser: is_client = True parser = argparse.ArgumentParser(description='Stream data from multiple UUTs') parser.add_argument('--callback', default=None, help='not for users, client programs can install a callback here') else: is_client = False #parser.add_argument('--filesize', default=1048576, type=int, # help="Size of file to store in KB. If filesize > total data then no data will be stored.") parser.add_argument('--burst_on_demand', default=None, type=str, help="Burst Size in Samples[,./plotjob]") parser.add_argument('--trigger_from_here', default=0, type=int, help="action soft trigger from this application") parser.add_argument('--subset', default=None, help='subset command if present eg 1,5 :: strips first 5 channels') parser.add_argument('--filesize', default=0x100000, action=acq400_hapi.intSIAction, decimal=False, help="file size in bytes") parser.add_argument('--filesamples', default=None, action=acq400_hapi.intSIAction, decimal=False, help="file size in samples (overrides filesize)") parser.add_argument('--files_per_cycle', default=100, type=int, help="files per cycle (directory)") parser.add_argument('--force_delete', default=0, type=int, help="silently delete any existing data files") parser.add_argument('--nowrite', default=0, help="do not write file") parser.add_argument('--totaldata', default=10000000000, action=acq400_hapi.intSIAction, decimal = False) #parser.add_argument('--totaldata', default=4194304, type=int, help="Total amount of data to store in KB") parser.add_argument('--root', default="", type=str, help="Location to save files. Default dir is UUT name.") parser.add_argument('--runtime', default=1000000, type=int, help="How long to stream data for") parser.add_argument('--verbose', default=0, type=int, help='Prints status messages as the stream is running') parser.add_argument('--display', default=1, type=int, help='Render display') parser.add_argument('--combine', default=0, type=int, help='Combine all cycle files into one') if is_client: parser.add_argument('uuts', nargs='+', help="uuts") return parser
[docs]def run_stream(args): run_stream_prep(args) run_stream_run(args)
if __name__ == '__main__': run_stream(get_parser().parse_args())