Source code for acq400_hapi.channel_handlers.channel_handlers

# channel_handlers.py

''' channel_handlers will take care of a specific column in a 2D data set, with user-specified formatting
'''


import numpy as np


[docs]def decode_tai_vernier(args, y): xdt = 25e-9 if args.xdt == 0 else args.xdt secs = np.right_shift(np.bitwise_and(y, 0x70000000), 28) # secs has a spike on rollover, suppress it for ii in range(1,len(secs)-1): if secs[ii-1] == secs[ii+1] and secs[ii] != secs[ii-1]: print("ted fix at {}".format(ii)) secs[ii] = secs[ii-1] ticks = np.bitwise_and(y, 0x0fffffff)*xdt # catch ticks only spike for ii in range(1,len(ticks)-1): if ticks[ii] < ticks[ii-1] and ticks[ii+1] > ticks[ii-1]: ticks[ii] = (ticks[ii+1]+ticks[ii-1])/2 elif ticks[ii] > ticks[ii+1] and ticks[ii+1] > ticks[ii-1]: ticks[ii] = (ticks[ii+1]+ticks[ii-1])/2 secs = secs + ticks print("decode_tai_vernier @@todoi stubbed spikes") return secs
''' formatfile CHDEF :: N : channel number from 1 LIST : list of channels eg 1,2,3 RANGE: range of channels 1:4, : means ALL CHDEF,ch_raw[,fmt=F0[;F1,...]] CHDEF,ch_egu CHDEF,ch_taiv CHDEF,ch_bf,MASK[,fmt=l1;l2;l3] [pgm@hoy5 acq400_hapi]$ cat PCFG/user.pcfg # all channels raw: 1,2,8=ch_raw,fmt=AqB1;CNT1;SC 10=ch_bf,0x70000000,fmt=TAIs 10=ch_taiv # print entire BF 7=ch_bf,0x0000003f,fmt=DI6 # break BF in to bits, lable each bit .. 7=ch_bf,0x0000003f,fmt=d4;d3;d2;d2;d1;d0 ''' STEP = True SMOO = False
[docs]class channel_handler:
[docs] def __init__ (self, ic, fmt): self.ic = ic self.ch = ic+1 self.fmt = fmt
def __call__(self, raw_channels, pses): print("ERROR : abstract base class does nothing")
[docs] def make_label(self): return self.fmt.format(self.ch)
[docs] def defsplit(nchan, defstr, fmt): lstr, rstr = defstr.split("=", 1) if lstr == 'all' or lstr == ':': cdef = list(range(1,nchan+1)) elif len(lstr.split(':')) > 1: lr = lstr.split(':') x1 = 1 if lr[0] == '' else int(lr[0]) x2 = nchan+1 if lr[1] == '' else int(lr[1])+1 cdef = list(range(x1, x2)) else: cdef = lstr.split(',') rlist = rstr.split(',') fmts = (fmt,) for arg in rlist: if arg.startswith("fmt="): _fmts = arg[4:].split(';') fmts = [] for cn, ch in enumerate(cdef): fmts.append(fmt if len(_fmts) == 0 else _fmts[0] if cn >= len(_fmts) else _fmts[cn]) return list(int(xx) for xx in cdef), rlist, fmts
handlers = [] builders = []
[docs] def decode_config(nchan, lno, defstr, client_args=None): if len(defstr) < 2 or defstr.startswith("#"): return else: for builder in channel_handler.builders: if builder.build(nchan, defstr, client_args): return print("ERROR: {} no handler found for \"{}\"".format(lno, defstr))
[docs]class ch_raw(channel_handler): def_fmt = "CH{} bits"
[docs] def __init__ (self, ic, fmt=None): _fmt = fmt if fmt else ch_raw.def_fmt super().__init__(ic, _fmt)
def __call__(self, raw_channels, pses): return raw_channels[self.ic][pses[0]:pses[1]:pses[2]], self.fmt.format(self.ch), SMOO
[docs] def build(nchan, defstr, client_args): channels, args, fmts = channel_handler.defsplit(nchan, defstr, ch_raw.def_fmt) if args[0] == 'ch_raw': for cn, ch in enumerate(channels): channel_handler.handlers.append(ch_raw(ch-1, fmt=fmts[cn])) return True return False
channel_handler.builders.append(ch_raw)
[docs]class ch_egu(ch_raw): def_fmt = "CH{} V"
[docs] def __init__ (self, ic, args, fmt=None): super().__init__(ic) _fmt = fmt if fmt else ch_egu.def_fmt self.args = args self.egu_fmt = _fmt
def __call__(self, raw_channels, pses): yy, raw_fmt, step = super().__call__(raw_channels, pses) if self.args.WSIZE == 4: yy = yy/256 try: return self.args.the_uut.chan2volts(self.ch, yy), self.egu_fmt.format(self.ch), SMOO except: return yy, raw_fmt.format(self.ch), SMOO
[docs] def build(nchan, defstr, client_args): channels, args, fmts = channel_handler.defsplit(nchan, defstr, ch_egu.def_fmt) if args[0] == 'ch_egu': for cn, ch in enumerate(channels): channel_handler.handlers.append( ch_egu(ch-1, client_args, fmt=fmts[cn if cn<len(fmts) else 0])) return True return False
channel_handler.builders.append(ch_egu)
[docs]class ch_db(ch_raw): def_fmt = "CH{} dB"
[docs] def __init__ (self, ic, args, fmt=None): super().__init__(ic) _fmt = fmt if fmt else ch_egu.def_fmt self.args = args self.egu_fmt = _fmt self.vmax = 0
def __call__(self, raw_channels, pses): print(np.shape(raw_channels)) yy, raw_fmt, step = super().__call__(raw_channels, pses) dbsq = 10 * np.log10(np.square(yy)) if self.args.WSIZE == 4: db0 = 10 * np.log10(7.0368e13) # 2^23 * 2^23 else: db0 = 10 * np.log10(32768*32768) return np.subtract(db0, dbsq), self.egu_fmt.format(self.ch), SMOO
[docs] def build(nchan, defstr, client_args): channels, args, fmts = channel_handler.defsplit(nchan, defstr, ch_egu.def_fmt) if args[0] == 'ch_db': for cn, ch in enumerate(channels): channel_handler.handlers.append( ch_db(ch-1, client_args, fmt=fmts[cn if cn<len(fmts) else 0])) return True return False
channel_handler.builders.append(ch_db)
[docs]class ch_tai_vernier(ch_raw): def_fmt = "CH{} TAIv"
[docs] def __init__ (self, ic, args, fmt=None): _fmt = fmt if fmt else ch_raw.def_fmt super().__init__(ic, _fmt) self.args = args
def __call__(self, raw_channels, pses): yy, fmt, step = super().__call__(raw_channels, pses) return decode_tai_vernier(self.args, yy), fmt, STEP
[docs] def build(nchan, defstr, client_args): channels, args, fmts = channel_handler.defsplit(nchan, defstr, ch_tai_vernier.def_fmt) if args[0] == 'ch_taiv': for cn, ch in enumerate(channels): channel_handler.handlers.append(ch_tai_vernier(ch-1, client_args, fmt=fmts[cn])) return True return False
channel_handler.builders.append(ch_tai_vernier)
[docs]class ch_bitfield(channel_handler): def_fmt = "BITS{}"
[docs] def __init__ (self, ic, mask, fmt=None): _fmt = fmt if fmt else ch_bitfield.def_fmt super().__init__(ic, fmt) self.mask = mask self.shr = ch_bitfield.calc_shr(mask)
def __call__(self, raw_channels, pses): yy = raw_channels[self.ic][pses[0]:pses[1]:pses[2]] bf = np.right_shift(np.bitwise_and(yy, self.mask), self.shr) return bf, self.fmt.format(self.ch), STEP
[docs] @staticmethod def count_bits(mask): mask_bits = 0; while mask != 0: if (mask&1) != 0: mask_bits += 1 mask = mask >> 1 return mask_bits
[docs] @staticmethod def calc_shr(mask): shr = 0 while mask&0x1 == 0: mask = mask >> 1 shr += 1 return shr
[docs] def build(nchan, defstr, client_args): channels, args, fmts = channel_handler.defsplit(nchan, defstr, ch_bitfield.def_fmt) if args[0] == 'ch_bf': mask = int(args[1], 16) mask_bits = ch_bitfield.count_bits(mask) for arg in args: if arg.startswith("fmt="): _fmts = arg[4:].split(';') if len(channels) == 1 and mask_bits > 1 and len(_fmts) == mask_bits: m1 = 1 << (mask_bits + ch_bitfield.calc_shr(mask) - 1) for ii in range(0, mask_bits): channel_handler.handlers.append(ch_bitfield(channels[0]-1, m1, fmt=_fmts[ii])) m1 = m1 >> 1 else: for cn, ch in enumerate(channels): channel_handler.handlers.append(ch_bitfield(ch-1, mask, fmt=fmts[cn])) return True return False
channel_handler.builders.append(ch_bitfield)
[docs]def process_pcfg(args): ''' return list of channel_handler objects built from config file ''' print("process_pcfg {}".format(args.pcfg)) with open(args.pcfg) as fp: for lno, defstr in enumerate(fp.readlines()): channel_handler.decode_config(args.nchan, lno, defstr.strip(), client_args=args) return channel_handler.handlers