#!/usr/bin/env python3
"""Remote control to composer webapp
Examples
Load new Template::
./user_apps/web/compose_remote.py --file=CONFIGS/cycloid_scan_templates.txt acq1001_434
Compose::
./user_apps/web/compose_remote.py --output=oneshot_rearm --segment=A "5*AA 5*BB" acq1001_434
Load new and compose::
./user_apps/web/compose_remote.py --file=CONFIGS/cycloid_scan_templates.txt --output=oneshot_rearm --segment=A "5*AA 5*BB" acq1001_434
Set next_seg segments::
./user_apps/web/compose_remote.py --next_seg=B,E,A,B,C
"""
import argparse
import requests
import json
import socket
import time
[docs]def run_main(args):
if not args.port:
url = f"http://{args.uut}/apps.port"
error = False
try:
r = requests.get(url)
if r.status_code >= 400:
error = True
except Exception as e:
error = True
if error:
exit('[Error] unable to detect api port')
args.port = int(r.text)
print(f"Found api on port {args.port}")
url = f"http://{args.uut}:{args.port}/endpoint"
if args.file:
with open(args.file) as f:
lines = f.readlines()
payload = {
'action' : 'build_template',
'data': {
'lines' : lines
}
}
if args.verbose:
print('Json: ')
print(json.dumps(payload, indent=2))
send_to_endpoint(url, payload)
if args.output and args.pattern and args.segment:
payload = {
'action' : 'awg_compose',
'data': {
'output' : args.output,
'pattern': ' '.join(args.pattern),
'nreps': args.nreps,
'segment': args.segment,
}
}
if args.verbose:
print('Json: ')
print(json.dumps(payload, indent=2))
send_to_endpoint(url, payload)
if args.next_seg:
for char in args.next_seg:
time.sleep(0.1)
send_to_port(args.uut, 54210, char)
[docs]def send_to_endpoint(url, payload):
print(f"\nSending to {url}")
try:
r = requests.post(url, json=payload)
print(f"[{r.status_code}] {r.text}")
except Exception as e:
print(f"Error: Unable to post to {url}")
[docs]def send_to_port(host, port, message):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect((host, port))
sock.sendall(bytes(message, "utf-8"))
print(f"Sent {message} to {host}:{port}")
[docs]def list_of_strings(value):
return value.split(',')
[docs]def get_parser():
parser = argparse.ArgumentParser(description='Remote control to composer webapp')
parser.add_argument('--port', default=None, help="Set api port")
parser.add_argument('--file', default=None, help="Template file to upload")
parser.add_argument('--output', default='oneshot_rearm', help="Composer output options:\
oneshot_rearm, oneshot, continuous or a filename")
parser.add_argument('--nreps', default='', help="Number of pattern repetitions")
parser.add_argument('--segment', default='', choices=['A', 'B', 'C', 'D', 'E'], help="Set segment")
parser.add_argument('--next_seg', default=None, type=list_of_strings, help="Queue next_seg active segments")
parser.add_argument('--verbose', default=0, type=int, help="increase verbosity")
parser.add_argument('pattern', nargs='*', help="Pattern to compose ie 5*AA 5*BB")
parser.add_argument('uut', help="uut hostname")
return parser
if __name__ == "__main__":
run_main(get_parser().parse_args())