Source code for acq400_hapi.netclient

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
netclient.py interface to client tcp socket with
- sr() send/receive a command

Created on Sun Jan  8 12:36:38 2017

@author: pgm
"""


import socket
import re
import sys
import os
from threading import Lock
import select


if sys.version_info < (3, 0):
    try:
        from future import builtins
        from builtins import input
    except:
        print("No builtins/future found. Some features might not work correctly.")


[docs]class Netclient: """connects and holds open a socket to defined port. Args: addr (str) : ip-address or dns name on network port (int) : server port number. """ trace = int(os.getenv("NETCLIENT_TRACE", "0")) connect_timeout = int(os.getenv("NETCLIENT_CONNECT_TO", "0"))
[docs] def receive_message(self, termex, maxlen=4096): """Read the information from the socket line at a time. Args: termex (str): regex defines line terminator maxlen (int): max read size Returns: string representing message """ match = termex.search(self.buffer) while match == None: self.buffer += self.sock.recv(maxlen).decode("latin-1") if Netclient.trace > 1: print("self.buffer {}".format(self.buffer)) match = termex.search(self.buffer) rc = self.buffer[:match.start(1)] self.buffer = self.buffer[match.end(1):] return rc
[docs] def send(self, message): if Netclient.trace > 1: print("send({})".format(message)) self.sock.send(message.encode())
[docs] def has_data(self): socket_list = [self.sock] rs, wr, es = select.select(socket_list, [], [], 0) return self.sock in rs
instances = []
[docs] def __init__(self, addr, port) : if Netclient.trace: print("Netclient.init {} {}".format(addr, port)) self.buffer = "" self.__addr = addr self.__port = int(port) try: self.sock = socket.socket() if Netclient.connect_timeout: self.sock.settimeout(Netclient.connect_timeout); if Netclient.trace > 1: print("Netclient(%s, %d) connect" % (self.__addr, self.__port)) self.sock.connect((self.__addr, self.__port)) except socket.error as e: print("Netclient {}.{} connect fail {}".format(addr, port, e)) raise e Netclient.instances.append(self)
def __enter__(self): return self
[docs] def close(self): # print("close() {} {}".format(self.__addr, self.__port)) try: self.sock.shutdown(socket.SHUT_RDWR) except socket.error: pass self.sock.close() Netclient.instances.remove(self)
def __exit__(self, exc_type, exc_value, traceback): self.close() # def __del__(self): # print("__del__ {} {}".format(self.__addr, self.__port)) # self.close() #@property
[docs] def addr(self): return self.__addr
#@property
[docs] def port(self): return self.__port
def __repr__(self): return 'Netclient(%s, %d)' % (self.__addr, self.__port)
[docs] @staticmethod def closeall(): for nc in Netclient.instances: nc.close()
[docs]class Logclient(Netclient): """Netclient optimised for logging, line by line"""
[docs] def __init__(self, addr, port): Netclient.__init__(self,addr, port) self.termex = re.compile("(\r\n)")
[docs] def poll(self): return self.receive_message(self.termex)
[docs]class Siteclient(Netclient):
[docs] def synchronized(f): def _synchronized(self, *args, **kwargs): self.lock.acquire() try: rc = f(self, *args, **kwargs) finally: self.lock.release() return rc return _synchronized
"""Netclient optimised for site service, may be multi-line response. Autodetects all knobs and holds them as properties for simple script-like set/get syntax. """ knobs = {} prevent_autocreate = False pat = re.compile(r"[:.]")
[docs] @synchronized def sr(self, message): """send a command and receive a reply Args: message (str) : command (query) to send Returns: rx (str): response string """ if (self.trace): print("%s >%s" % (repr(self), message.rstrip())) self.sock.send((message+"\n").encode()) rx = self.receive_message(self.termex).rstrip() if self.show_responses and len(rx) > 1: print(rx) if (self.trace): print("%s <%s" % (repr(self), rx)) return rx
[docs] def build_knobs(self, knobstr): # http://stackoverflow.com/questions/10967551/how-do-i-dynamically-create-properties-in-python self.knobs = dict((Siteclient.pat.sub(r"_", key), key) for key in knobstr.split())
[docs] def help(self, regex = ".*"): """list available knobs, optionally filtered by regex. eg - help() : list all - help("SIG) : list all knobs with SIG - help("SIG*FREQ") list all knobs SIG*FREQ """ regex = re.compile(regex) hr = [] for key in sorted(self.knobs): if regex.match(key): hr.append(key) return hr
def __getattr__(self, name): if self.knobs == None: return object.__setattr__(self, name) if self.knobs.get(name) != None: return self.sr(self.knobs.get(name)) else: msg = "'{0}' object has no attribute '{1}'" raise AttributeError(msg.format(type(self).__name__, name)) def __setattr__(self, name, value): if self.knobs == None: return object.__setattr__(self, name, value) if self.knobs.get(name) != None: return self.sr("%s=%s" % (self.knobs.get(name), value)) elif not self.prevent_autocreate or self.__dict__.get(name) != None: self.__dict__[name] = value else: msg = "'{0}' object has no attribute '{1}'" raise AttributeError(msg.format(type(self).__name__, name))
[docs] def get_knob(self, name): return self.__getattr__(name)
[docs] def set_knob(self, name, value): return self.__setattr__(name, value)
def __repr__(self): return 'Siteclient(%s, %d)' % (self.addr(), self.port()) trace = int(os.getenv("SITECLIENT_TRACE", "0"))
[docs] def __init__(self, addr, port): # print("Siteclient.init") self.knobs = {} self.lock = Lock() self.show_responses = False Netclient.__init__(self, addr, port) # no more new props once set self.prevent_autocreate = False self.termex = re.compile(r"\n(acq400.[0-9]+ ([0-9]+) >)") self.trace = 1 if Siteclient.trace > 1 else 0 self.sr("prompt on") self.build_knobs(self.sr("help")) self.trace = Siteclient.trace self.prevent_autocreate = True
#self.show_responses = True
[docs]def run_unit_test(): SERVER_ADDRESS = 'acq2106_066' SERVER_PORT=4233 if len(sys.argv) > 1: SERVER_ADDRESS = sys.argv[1] if len(sys.argv) > 2: SERVER_PORT = int(sys.argv[2]) print("create Netclient %s %d" %(SERVER_ADDRESS, SERVER_PORT)) svc = Siteclient(SERVER_ADDRESS, SERVER_PORT) print("Model: %s" % (svc.MODEL)) print("SITELIST: %s" % (svc.SITELIST)) print("software_version: %s" % (svc.software_version)) svc.trace = True print("spad1: %s" % (svc.spad1)) svc.spad1 = "0x1234" print("spad1: %s" % (svc.spad1)) svc.spad1 = "0x5678" print("spad1: %s" % (svc.spad1)) svc.spad2 = "0x22222222" raise SystemExit for key in svc.knobs: cmd = svc.knobs[key] if cmd.startswith("help"): continue print("%s %s" % (cmd, svc.sr(cmd))) raise SystemExit while True: try: data = input("Enter some data: ") except EOFError: print("\nOkay. Leaving. Bye") break print("Hello") if not data: print("Can't send empty string!") print("Ctrl-D [or Ctrl-Z on Windows] to exit") continue print("< %s" % (data)) data += "\n" svc.send(data) data = svc.recv() print("Got this string from server:") print(data + '\n')
# unit test execution starts here if __name__ == '__main__': run_unit_test