Source code for user_apps.analysis.analyse_burst

#!/usr/bin/env python3
'''
Created on 20 Jul 2023

@author: pgm
'''

import numpy as np
import matplotlib.pyplot as plt
import argparse

import hashlib

from prettytable import PrettyTable

ES_MAGIC0   = 0xaa55f154
ES_MAGIC1   = 0xaa55f15f
ES_SAMPLE   = 4
ES_CLK      = 5
ES_SAMPLE2  = 6
ES_CLK2     = 7

ES_MAGIC0_FIELDS = ( 0,  1,  2,  3,  8,  9, 10, 11)
ES_MAGIC1_FIELDS = (16, 17, 18, 19, 24, 25, 26, 27, 
                    32, 33, 34, 35, 40, 41, 42, 43,
                    48, 49, 50, 51, 56, 57, 58, 59)




[docs]class ES_STATS: the_stats = [] the_raw_ix = [] the_sample_counts = [] the_clk_counts = []
[docs] def __init__(self, _es_fields): self.iraw = _es_fields[0] self.sample = _es_fields[1] self.clk = self.clk = _es_fields[2] self.my_id = len(ES_STATS.the_stats) if self.my_id == 0: self.d_iraw = 0 self.d_sample = 0 self.d_clk = 0 else: es1 = ES_STATS.the_stats[-1] self.d_iraw = self.iraw - es1.iraw self.d_sample = self.sample - es1.sample self.d_clk = self.clk - es1.clk ES_STATS.the_stats.append(self)
[docs] def print(self): print(f'ES_STATS {self.my_id} {self.iraw} {self.sample} {self.clk}')
[docs] def print_all(): t = PrettyTable(['ii', 'iraw', 'sample', 'clk', 'delta_iraw', 'delta_sample', 'delta_clk'], border=False) for ii, ess in enumerate(ES_STATS.the_stats): t.add_row((ii, ess.iraw, ess.sample, ess.clk, ess.d_iraw, ess.d_sample, ess.d_clk)) print(t)
[docs] def is_valid(): model_d_sample = ES_STATS.the_stats[1].d_sample model_d_clk = ES_STATS.the_stats[1].d_clk sample_fail_count = 0 clk_fail_count = 0 for ii, es in enumerate(ES_STATS.the_stats[2:]): if es.d_sample != model_d_sample: print(f'ERROR: ES fail at {ii} NSAM: expected{model_d_sample} got {es.d_sample}') sample_fail_count += 1 if es.d_clk > model_d_clk and (es.d_clk - model_d_clk) > 1: print(f'ERROR: ES fail > at {ii} diff: {es.d_clk - model_d_clk} CLK: expected{model_d_clk} got {es.d_clk}') clk_fail_count += 1 if es.d_clk < model_d_clk and (model_d_clk - es.d_clk) > 1: print(f'ERROR: ES fail < at {ii} diff: {model_d_clk - es.d_clk} CLK: expected{model_d_clk} got {es.d_clk}') clk_fail_count += 1 return (sample_fail_count == 0 and clk_fail_count == 0, ii, sample_fail_count, clk_fail_count)
# ES_STATS.the_stats[:-1] final burst may not be complete, ignore it.
[docs] def get_raw_ix(): if len(ES_STATS.the_raw_ix) < len(ES_STATS.the_stats): ES_STATS.the_raw_ix = [ es.iraw for es in ES_STATS.the_stats[:-1]] return ES_STATS.the_raw_ix
[docs] def get_sample_counts(): if len(ES_STATS.the_sample_counts) < len(ES_STATS.the_stats): ES_STATS.the_sample_counts = [ es.sample for es in ES_STATS.the_stats[:-1]] return ES_STATS.the_sample_counts
[docs] def get_clk_counts(): if len(ES_STATS.the_clk_counts) < len(ES_STATS.the_stats): ES_STATS.the_clk_counts = [ es.clk for es in ES_STATS.the_stats[:-1]] return ES_STATS.the_clk_counts
[docs] def get_blen(): return ES_STATS.the_stats[1].d_sample
[docs]def is_valid_es(iraw, es): # return if this is an ES, load it and return True FAIL fast! # we pre-filtered es[0] == ES_MAGIC0, so we expect all PASS for ii in ES_MAGIC0_FIELDS[1:]: if es[ii] != ES_MAGIC0: print(f'ES match fail at {iraw},{ii} expect:{ES_MAGIC0:08x} got:{es[ii]:08x}') return False for ii in ES_MAGIC1_FIELDS: if es[ii] != ES_MAGIC1: print(f'ES match fail at {iraw},{ii} expect:{ES_MAGIC1:08x} got:{es[ii]:08x}') return False if es[ES_SAMPLE] != es[ES_SAMPLE2]: print(f'ES match fail at {iraw},{ii} SAMPLE changed {es[ES_SAMPLE]} {es[ES_SAMPLE2]}') return False if es[ES_CLK] != es[ES_CLK2]: print(f'ES match fail at {iraw},{ii} CLK changed {es[ES_CLK]} {es[ES_CLK2]}') return False ES_STATS((iraw, es[ES_SAMPLE], es[ES_CLK])) return True
DATA='nodata'
[docs]def analyse_es(args, raw_es): print(f'raw_es.shape:{raw_es.shape}') esx = np.nonzero(raw_es[:,0] == ES_MAGIC0) print(f'type esx {type(esx)}') valid_es = [] for ix in esx[0]: # print(f'ix={ix}') es = raw_es[ix,:] # print(f'es.shape:{es.shape}') if args.verbose > 1: with np.printoptions(formatter={'int':hex}): print(es) if not is_valid_es(ix, es): print(f"Warning: invalid es at {ix}") if args.verbose: ES_STATS.print_all() es_valid = ES_STATS.is_valid() if es_valid[0]: print(f'{DATA} ES Analysis: {es_valid[1]} PASS') else: print(f'{DATA} ES Analysis: {es_valid[1]} PASS {es_valid[2]} sample FAIL {es_valid[3]} clk FAIL')
[docs]def sample_count_plot(ax): ax.set_title(f'Plot of burst first sample count ES[{ES_SAMPLE}]') ax.plot(ES_STATS.get_sample_counts())
[docs]def timing_plot(ax): #ax1.figure() #ax1.title(f'Plot of burst start time in sample clocks\n{DATA}') #ax1.ylabel('clocks') #ax1.xlabel('burst number') ax.set_title(f'Plot of burst start time in sample clocks ES[{ES_CLK}]') ax.plot(ES_STATS.get_clk_counts())
[docs]def stack_plot(raw_adc, ch, ax, label='', delta=False, stackoff=0): print(f'stack_plot {ax}') blen = ES_STATS.get_blen() raw_ix = ES_STATS.get_raw_ix() nburst = len(raw_ix) print(f'PLOT nburst {len(raw_ix)} burst_len {blen} ch {ch}') x = range(1, blen) #plt.figure() #plt.title(f'{label} Stack plot of {nburst} bursts\n{DATA}') ax.set_title(f'{label} Stack plot of {nburst} bursts') #plt.ylabel('ADC codes') #plt.xlabel('samples in burst') for ii, brst in enumerate(raw_ix): try: y = raw_adc[brst+1:brst+blen,ch]+ stackoff*ii if delta: if ii == 0: y0 = y y = y - y0 if len(x) == len(y): ax.plot(x, y, label=f'{ii}') except ValueError: pass
[docs]def correlate(raw_adc, ch0, _atol, _rtol): ref = ch0[0] matches = {} ref = {} blen = ES_STATS.get_blen() raw_ix = ES_STATS.get_raw_ix() for ic in ch0: matches[ic] = [] for ib, brst in enumerate(raw_ix): for icn, ic in enumerate(ch0): # print(f'{ib},{icn} ch:{ic+1} brst:{brst}') try: y = raw_adc[brst+1:brst+blen, ic] if ib==0: ref[ic] = y _match = np.allclose(y, ref[ic], atol=_atol, rtol=_rtol) if not _match: print(f' {ib},{icn} MATCH FAIL {ic}\n{_match}') # print(f'[{ib}],[{ic}] : {match}') matches[ic].append(_match) except ValueError: pass all_good = all([all(matches[ic]) for ic in ch0]) print('CORRELATE {}'.format('PASS: All Channels Match' if all_good else 'FAIL: Not all channels match')) t = PrettyTable([f'{ch+1:02d}' for ch in ch0], border=False) t.add_row(['T' if all(matches[ic]) else 'F' for ic in ch0]) print(t)
MAX_FALLING_FIDUCIALS=2 # our FG falling edge is really quite .. slow
[docs]def analyse_fiducial(args, raw_adc): f0 = args.fiducial_plot-1 blen = ES_STATS.get_blen() raw_ix = ES_STATS.get_raw_ix() means = [] falling_value_count = 0 for ib, brst in enumerate(raw_ix): # print(f'{ib},{brst} {f0}') means.append(np.mean(raw_adc[brst+1:brst+blen, f0])) if ib > 0 and means[ib] < means[ib-1]: print(f'WARNING: fiducial {args.fiducial_plot} mean goes down at burst {ib}') falling_value_count += 1 if args.verbose > 0: t = PrettyTable(['burst', f'mean for fiducial CH{args.fiducial_plot:0d}'], border=False) for ib, mean in enumerate(means): t.add_row([ib, f'{mean:.2f}']) print(t) if falling_value_count > MAX_FALLING_FIDUCIALS: print(f'WARNING: fiducial recorded a falling value in {falling_value_count} instances. acceptable: 0 .. {MAX_FALLING_FIDUCIALS}') return falling_value_count
[docs]def plot_timeseries(raw_adc, ch, ax, label): #plt.figure() #plt.title(f'{label} Time-series plot of CH{ch}\n{DATA}') #plt.ylabel('ADC codes') #plt.xlabel('sample') yraw = raw_adc[:,ch] y_no_es = np.delete(yraw, ES_STATS.get_raw_ix()) x = range(0, len(y_no_es)) ax.set_title(f'{label} Time-series plot of CH{ch}') ax.plot(x, y_no_es, label=f'CH{ch}')
[docs]def analyse(args): global STACKOFF STACKOFF = args.stack_off fname = args.data[0] raw_adc = np.fromfile(fname, dtype=args.np_data_type) ll = len(raw_adc)//args.nchan raw_adc = raw_adc[0:ll*args.nchan] raw_es = raw_adc.view(np.uint32) raw_adc = np.reshape(raw_adc, (ll, args.nchan)) raw_es = np.reshape(raw_es, (ll, args.ess)) print(f"raw_adc {raw_adc.shape}") print(f"raw_es {raw_es.shape}") if args.print_hash: m = hashlib.sha1() m.update(raw_adc) print(f'fname {fname} sha1:{m.hexdigest()}') analyse_es(args, raw_es) if args.fiducial_plot: ok = analyse_fiducial(args, raw_adc) if not ok: print("WARNING: fiducial fail") c1, c2, _atol, _rtol = args.check_range correlate(raw_adc, [ch-1 for ch in range(c1, c2+1) ], _atol, _rtol) if args.stack_plot > 0: fig, axx = plt.subplots(3, 2, figsize=(12,10)) fig.suptitle(f'Burst Mode Test: {DATA}') sample_count_plot(axx[0][0]) timing_plot(axx[1][0]) stack_plot(raw_adc, args.stack_plot-1, axx[0][1], f'signal CH{args.stack_plot}', stackoff=args.stack_off) stack_plot(raw_adc, args.stack_plot-1, axx[1][1], f'diff signal CH{args.stack_plot}', delta=True, stackoff=args.stack_off) if args.fiducial_plot: plot_timeseries(raw_adc, args.fiducial_plot-1, axx[2][0], f'fiducial CH{args.fiducial_plot}') stack_plot(raw_adc, args.fiducial_plot-1, axx[2][1], f'fiducial CH{args.fiducial_plot}') plt.show() return (raw_adc, raw_es)
[docs]def get_parser(): parser = argparse.ArgumentParser(description='rgm plot demo') parser.add_argument('--nchan', type=int, default=32) parser.add_argument('--data_type', type=int, default=None, help='Use int16 or int32 for data demux.') parser.add_argument('--verbose', type=int, default=0, help='increase verbosity') parser.add_argument('--stack_plot', type=int, default=0, help='if non zero, make a stack plot of selected channel') parser.add_argument('--stack_off', type=int, default=0, help='offset each element in stack to make a waterfall chart') parser.add_argument('--check_range', type=str, default='1,1', help='c0,c1,[atol,rtol] : range of channels to check, atol, rtol: see numpy.rclose') parser.add_argument('--print_hash', type=int, default=0, help='print sha1 of the file (protect against possibility of duplicate data') parser.add_argument('--fiducial_plot', type=int, default=0, help='if non zero, make a stack plot of selected channel') parser.add_argument('--uut', help='uut for title') parser.add_argument('data', nargs=1, help="data ") return parser
[docs]def fix_args(args): global DATA if args.data_type == 16: args.np_data_type = np.int16 args.WSIZE = 2 args.ess = args.nchan//2 elif args.data_type == 8: args.np_data_type = np.int8 args.WSIZE = 1 rgs.ess = args.nchan//4 else: args.np_data_type = np.int32 args.WSIZE = 4 args.ess = args.nchan args.ssb = args.nchan * args.WSIZE DATA = args.data[0] _check_range = [ int(ii) for ii in args.check_range.split(',')] args.check_range = [ 1, 1, 500, 10 ] for ix, val in enumerate(_check_range): args.check_range[ix] = val print(f'processing {DATA}') return args
[docs]def run_main(): analyse(fix_args(get_parser().parse_args()))
# execution starts here if __name__ == '__main__': run_main()