#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
netclient.py interface to client tcp socket with
- sr() send/receive a command
Created on Sun Jan 8 12:36:38 2017
@author: pgm
"""
import socket
import re
import sys
import os
from threading import Lock
import select
if sys.version_info < (3, 0):
try:
from future import builtins
from builtins import input
except:
print("No builtins/future found. Some features might not work correctly.")
[docs]class Netclient:
"""connects and holds open a socket to defined port.
Args:
addr (str) : ip-address or dns name on network
port (int) : server port number.
"""
trace = int(os.getenv("NETCLIENT_TRACE", "0"))
connect_timeout = int(os.getenv("NETCLIENT_CONNECT_TO", "0"))
[docs] def receive_message(self, termex, maxlen=4096):
"""Read the information from the socket line at a time.
Args:
termex (str): regex defines line terminator
maxlen (int): max read size
Returns:
string representing message
"""
match = termex.search(self.buffer)
while match == None:
self.buffer += self.sock.recv(maxlen).decode("latin-1")
if Netclient.trace > 1:
print("self.buffer {}".format(self.buffer))
match = termex.search(self.buffer)
rc = self.buffer[:match.start(1)]
self.buffer = self.buffer[match.end(1):]
return rc
[docs] def send(self, message):
if Netclient.trace > 1:
print("send({})".format(message))
self.sock.send(message.encode())
[docs] def has_data(self):
socket_list = [self.sock]
rs, wr, es = select.select(socket_list, [], [], 0)
return self.sock in rs
instances = []
[docs] def __init__(self, addr, port) :
if Netclient.trace:
print("Netclient.init {} {}".format(addr, port))
self.buffer = ""
self.__addr = addr
self.__port = int(port)
try:
self.sock = socket.socket()
if Netclient.connect_timeout:
self.sock.settimeout(Netclient.connect_timeout);
if Netclient.trace > 1:
print("Netclient(%s, %d) connect" % (self.__addr, self.__port))
self.sock.connect((self.__addr, self.__port))
except socket.error as e:
print("Netclient {}.{} connect fail {}".format(addr, port, e))
raise e
Netclient.instances.append(self)
def __enter__(self):
return self
[docs] def close(self):
# print("close() {} {}".format(self.__addr, self.__port))
try:
self.sock.shutdown(socket.SHUT_RDWR)
except socket.error:
pass
self.sock.close()
Netclient.instances.remove(self)
def __exit__(self, exc_type, exc_value, traceback):
self.close()
# def __del__(self):
# print("__del__ {} {}".format(self.__addr, self.__port))
# self.close()
#@property
[docs] def addr(self):
return self.__addr
#@property
[docs] def port(self):
return self.__port
def __repr__(self):
return 'Netclient(%s, %d)' % (self.__addr, self.__port)
[docs] @staticmethod
def closeall():
for nc in Netclient.instances:
nc.close()
[docs]class Logclient(Netclient):
"""Netclient optimised for logging, line by line"""
[docs] def __init__(self, addr, port):
Netclient.__init__(self,addr, port)
self.termex = re.compile("(\r\n)")
[docs] def poll(self):
return self.receive_message(self.termex)
[docs]class Siteclient(Netclient):
[docs] def synchronized(f):
def _synchronized(self, *args, **kwargs):
self.lock.acquire()
try:
rc = f(self, *args, **kwargs)
finally:
self.lock.release()
return rc
return _synchronized
"""Netclient optimised for site service, may be multi-line response.
Autodetects all knobs and holds them as properties for simple script-like
set/get syntax.
"""
knobs = {}
prevent_autocreate = False
pat = re.compile(r"[:.]")
[docs] @synchronized
def sr(self, message):
"""send a command and receive a reply
Args:
message (str) : command (query) to send
Returns:
rx (str): response string
"""
if (self.trace):
print("%s >%s" % (repr(self), message.rstrip()))
self.sock.send((message+"\n").encode())
rx = self.receive_message(self.termex).rstrip()
if self.show_responses and len(rx) > 1:
print(rx)
if (self.trace):
print("%s <%s" % (repr(self), rx))
return rx
[docs] def build_knobs(self, knobstr):
# http://stackoverflow.com/questions/10967551/how-do-i-dynamically-create-properties-in-python
self.knobs = dict((Siteclient.pat.sub(r"_", key), key) for key in knobstr.split())
[docs] def help(self, regex = ".*"):
"""list available knobs, optionally filtered by regex.
eg
- help() : list all
- help("SIG) : list all knobs with SIG
- help("SIG*FREQ") list all knobs SIG*FREQ
"""
regex = re.compile(regex)
hr = []
for key in sorted(self.knobs):
if regex.match(key):
hr.append(key)
return hr
def __getattr__(self, name):
if self.knobs == None:
return object.__setattr__(self, name)
if self.knobs.get(name) != None:
return self.sr(self.knobs.get(name))
else:
msg = "'{0}' object has no attribute '{1}'"
raise AttributeError(msg.format(type(self).__name__, name))
def __setattr__(self, name, value):
if self.knobs == None:
return object.__setattr__(self, name, value)
if self.knobs.get(name) != None:
return self.sr("%s=%s" % (self.knobs.get(name), value))
elif not self.prevent_autocreate or self.__dict__.get(name) != None:
self.__dict__[name] = value
else:
msg = "'{0}' object has no attribute '{1}'"
raise AttributeError(msg.format(type(self).__name__, name))
[docs] def get_knob(self, name):
return self.__getattr__(name)
[docs] def set_knob(self, name, value):
return self.__setattr__(name, value)
def __repr__(self):
return 'Siteclient(%s, %d)' % (self.addr(), self.port())
trace = int(os.getenv("SITECLIENT_TRACE", "0"))
[docs] def __init__(self, addr, port):
# print("Siteclient.init")
self.knobs = {}
self.lock = Lock()
self.show_responses = False
Netclient.__init__(self, addr, port)
# no more new props once set
self.prevent_autocreate = False
self.termex = re.compile(r"\n(acq400.[0-9]+ ([0-9]+) >)")
self.trace = 1 if Siteclient.trace > 1 else 0
self.sr("prompt on")
self.build_knobs(self.sr("help"))
self.trace = Siteclient.trace
self.prevent_autocreate = True
#self.show_responses = True
[docs]def run_unit_test():
SERVER_ADDRESS = 'acq2106_066'
SERVER_PORT=4233
if len(sys.argv) > 1:
SERVER_ADDRESS = sys.argv[1]
if len(sys.argv) > 2:
SERVER_PORT = int(sys.argv[2])
print("create Netclient %s %d" %(SERVER_ADDRESS, SERVER_PORT))
svc = Siteclient(SERVER_ADDRESS, SERVER_PORT)
print("Model: %s" % (svc.MODEL))
print("SITELIST: %s" % (svc.SITELIST))
print("software_version: %s" % (svc.software_version))
svc.trace = True
print("spad1: %s" % (svc.spad1))
svc.spad1 = "0x1234"
print("spad1: %s" % (svc.spad1))
svc.spad1 = "0x5678"
print("spad1: %s" % (svc.spad1))
svc.spad2 = "0x22222222"
raise SystemExit
for key in svc.knobs:
cmd = svc.knobs[key]
if cmd.startswith("help"):
continue
print("%s %s" % (cmd, svc.sr(cmd)))
raise SystemExit
while True:
try:
data = input("Enter some data: ")
except EOFError:
print("\nOkay. Leaving. Bye")
break
print("Hello")
if not data:
print("Can't send empty string!")
print("Ctrl-D [or Ctrl-Z on Windows] to exit")
continue
print("< %s" % (data))
data += "\n"
svc.send(data)
data = svc.recv()
print("Got this string from server:")
print(data + '\n')
# unit test execution starts here
if __name__ == '__main__':
run_unit_test