import os
import asyncio
import subprocess
from typing import Union, Any, Tuple, Optional, List
import re
import numpy.typing as npt
import numpy as np
import time
from gpilib2.util import get_tlc_bindir
[docs]async def run(
cmd: str, env: Optional[os._Environ[str]] = None, cwd: Optional[str] = None
) -> Tuple[str, Optional[int], bytes, bytes]:
"""Wrapper for asynchronous command execution
Args:
cmd (str):
Full command string to execute
env (os._Environ)
Environment dictionary. Defaults to None.
cwd (str)
Working directory. Defaults to None
Returns:
tuple:
cmd (str):
Original command string
returncode (int):
proc.returncode
stdout (bytes):
stdout bytestream
stderr (bytes):
stderr bytestream
"""
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
cwd=cwd,
)
stdout, stderr = await proc.communicate()
return cmd, proc.returncode, stdout, stderr
[docs]class rpc:
"""Class for io methods via rpc utilities
Args:
sim (bool):
Run in sim mode (default False). Setting to true automatically toggles
verbose mode
verb (bool):
Toggle verbose mode (default False)
sim_fill_val (str):
Default value to fill for simulated GMB queries. Defaults to '0'.
Attributes:
sim (bool):
Simulated mode toggle (can be manually set at any time)
verb (bool):
Verbosity toggle (can be manually set at any time). If object is created
with ``sim = True`` then verbosity is automatically toggled on.
binaries (dict):
Dictionary of binary names (keys) and full paths (values)
command_queue (list):
Command queue. List of tuples of the form ``(cmdlist, cmd_gmb_field)``
where ``cmdlist`` is a list of command arguments and ``cmd_gmb_field`` is
a string of the base of the command GMB field.
ass_mode (dict):
Dictionary of assembly modes. Must match enum in gpUtGeneral.h
activity (dict):
Dictionary of activity directives. Must match enum in gpUtGeneral.h
activityId (int):
Default activity id (set to -1 to match GUI activities)
move_level (int):
Default level value for MOVE commands
nmcd (int):
Number of :term:`MCD`
nax (int):
Number of axes per :term:`MCD`
mcdaxes (~numpy.ndarray(str)):
Base of GMB fields for all MCD axes
mcdaxisnames (~numpy.ndarray(str)):
Descriptions of all MCD axes
mcdaxisinited (~numpy.ndarray(str)):
GMB fields for MCD Axis init status
mcdaxisdatumed (~numpy.ndarray(str)):
GMB fields for MCD Axis datum status
mcdaxissimed (~numpy.ndarray(str)):
GMB fields for MCD Axis sim status
"""
def __init__(
self, sim: bool = False, verb: bool = False, sim_fill_val: str = "0"
) -> None:
"""Locate all required rpc binaries and populate standard values.
Args:
sim (bool):
Run in sim mode (default False). Setting to true automatically toggles
verbose mode
verb (bool):
Toggle verbose mode (default False)
sim_fill_val (str):
Default value to fill for simulated GMB queries. Defaults to '0'.
"""
self.sim = sim # global flags
self.sim_fill_val = sim_fill_val
if self.sim:
self.verb = True
else:
self.verb = verb
# These are all of the binaries we expect to have
binaries = [
"gpAoAssClientTester",
"gpAocRpcClientTester",
"gpCalAssClientTester",
"gpCalRpcClientTester",
"gpFpmAssClientTester",
"gpIfsAssClientTester",
"gpIfsRpcClientTester",
"gpIsClientTester",
"gpPowBarAssClientTester",
"gpPpmAssClientTester",
"gpSrcAssClientTester",
"gpTrkAssClientTester",
"gpUtReadWriteClient",
"gpUtReadGmbValues",
"gpUtGmbReadVal",
"gpUtReadGmbArrs",
]
# dictionary of binary names and full paths
self.binaries = {}
# verify that everything is as it should be
if not self.sim:
self.tlcbindir = get_tlc_bindir()
else:
self.tlcbindir = ""
for c in binaries:
self.binaries[c] = os.path.join(self.tlcbindir, c)
if not self.sim:
assert os.access(
self.binaries[c], os.X_OK
), "{} not found or is not executable.".format(c)
# command queue is list of tuples (cmdlist, cmd_gmb_field)
self.command_queue = [] # type: List[Tuple[List[str], str]]
# define assembly modes
self.ass_mode = {
"INIT": 0,
"MOVE": 1,
"TRACK": 2,
"DATUM": 3,
"PARK": 4,
"TEST": 5,
"SIM": 6,
"DEBUG": 7,
"REBOOT": 8,
}
# define activity directives
self.activity = {"PRESET": 0, "START": 1, "PRESET_START": 2, "CANCEL": 3}
self.activityId = -1 # this is always the default command id
self.move_level = 0 # this is the level to use for any move
# built MCD list
self.nmcd = 4 # number of MCDs
self.nax = 8 # number of axes per MCD
self.mcdaxes = np.array([
"mcd{}.axisList[{}].axConfig".format(j, k)
for j in range(1, self.nmcd + 1)
for k in range(1, self.nax + 1)
])
self.mcdaxisnames = self.read_gmb_arrays(
["{}.desc".format(ax) for ax in self.mcdaxes], 20
)
self.mcdaxisinited = np.array(
["{}.initialized".format(ax) for ax in self.mcdaxes]
)
self.mcdaxisdatumed = np.array(["{}.datumed".format(ax) for ax in self.mcdaxes])
self.mcdaxissimed = np.array(["{}.sim".format(ax) for ax in self.mcdaxes])
if self.verb:
print("Completed init of rpc object.")
def __str__(self) -> str:
"""Print attributes"""
return "SIMULATED: {}, VERBOSE: {}".format(self.sim, self.verb)
[docs] async def assemble_pending_commands(
self, cmdstrs: List[str]
) -> List[Tuple[str, Optional[int], bytes, bytes]]:
"""Dump set of pending commands into a gather
Args:
cmdstrs (list):
List of full command strings
Returns:
list:
list of tuples as returned by run
"""
statements = [run(s) for s in cmdstrs]
return await asyncio.gather(*statements)
[docs] def clear_queue(self) -> None:
"""Remove all pending commands from queue without executing"""
self.command_queue = []
[docs] def execute_queue(self, nowait: bool = False) -> None:
"""Execute all pending commands in queue
Args:
nowait (bool):
If True, return immediately upon confirming an ack. Default False - wait
for command state to register complete.
Returns:
None
"""
if len(self.command_queue) == 0:
print("Comand queue is empty.")
return
# Empty out the queue
cmdstrs = []
fields = []
for q in self.command_queue:
cmdstrs.append(" ".join(q[0]))
fields.append(q[1])
self.command_queue = []
# assemble ack checks
ackfields = []
for f in fields:
ackfields.append(f + ".ack")
ackfields.append(f + ".ackErrMsg")
# execute and check results
if self.verb:
print("Executing:\n {}".format("\n".join(cmdstrs)))
return
if not self.sim:
res = np.array(
asyncio.run(self.assemble_pending_commands(cmdstrs))
).transpose()
ack = self.read_gmb_values(ackfields).reshape((len(fields), 2)).transpose()
# In case of error try to get as much info as possible:
bad = np.where((res[1].astype(int) != 0) | (ack[0].astype(int) == 3))[0]
if len(bad) > 0:
for b in bad:
errtxt = ""
if res[2, bad]:
errtxt += res[2, bad][0]
if res[3, bad]:
errtxt += res[3, bad][0]
errtxt += "Ack Error Message:\n {}\n".format(ack[1, bad][0])
raise RuntimeError(
"Error encountered sending command:\n {}\n{}".format(
res[0, bad][0], errtxt
)
)
# Wait for completion
if not nowait:
res2 = self.wait_for_command_completion(fields)
if res2:
raise RuntimeError("Command State Error:\n{}".format(res2[0]))
[docs] def wait_for_command_completion(
self, fields: npt.ArrayLike, polling_interval: float = 0.5
) -> Optional[npt.NDArray[Any]]:
"""Wait for completion of one or more commands
Args:
fields (:py:data:`~numpy.typing.ArrayLike`):
Base of GMB fields for commands
polling_interval (float):
Time (seconds) to wait between state checks (defaults to 0.5)
Returns:
None or ~numpy.ndarray:
If all commands were successful, return None, otherwise an array of
state error messages equivalent in size to the input list of GMB fields.
Notes:
"""
polling_interval = float(polling_interval)
statefields = np.array([f + ".state" for f in np.array(fields, ndmin=1)])
states = self.read_gmb_values(statefields).astype(int)
while np.any(states == 1):
states[states == 1] = self.read_gmb_values(statefields[states == 1]).astype(
int
)
time.sleep(polling_interval)
if np.any(states == 2):
errorfields = [f + "ErrMsg" for f in statefields[states == 2]]
out = np.zeros(states.shape, dtype=object)
out[states == 2] = self.read_gmb_values(errorfields)
return out.astype(str)
else:
return None
[docs] def execute(
self,
cmdlist: List[str],
cmd_gmb_field: str,
queue: bool = False,
nowait: bool = False,
) -> None:
"""Queue or execute a command, and optionally wait for completion
Args:
cmdlist (list):
Command and arguments
cmd_gmb_field (str):
Base of GMB field to check for ack and status
queue (bool):
If True, queue the command for later execution. Defaults False.
nowait (bool):
If True, return immediately upon confirming an ack. Default False - wait
for command state to register complete.
Returns:
None:
Notes:
"""
if queue:
self.command_queue.append((cmdlist, cmd_gmb_field))
return
# Issue command and check result
if self.verb:
print("Executing {}".format(" ".join(cmdlist)))
if not self.sim:
output = subprocess.run(cmdlist, capture_output=True)
else:
output = subprocess.CompletedProcess(cmdlist, 0)
# check ack state
ack = self.read_gmb_values(
[cmd_gmb_field + ".ack", cmd_gmb_field + ".ackErrMsg"]
)
# In case of error try to get as much info as possible:
if (output.returncode != 0) or (int(ack[0]) == 3):
errtxt = ""
if output.stderr:
errtxt += output.stderr.decode() + "\n"
if output.stdout:
errtxt += output.stdout.decode() + "\n"
errtxt += "Ack Error Message:\n {}\n".format(ack[1])
raise RuntimeError(
"Error encountered sending command:\n {}\n{}".format(
" ".join(cmdlist), errtxt
)
)
if not nowait:
res = self.wait_for_command_completion(cmd_gmb_field)
if res:
raise RuntimeError("Command State Error:\n{}".format(res[0]))
[docs] def parse_ReadWriteClient_output(self, input: str) -> Union[str, None]:
"""Parse output of gpUtReadWriteClient command
Args:
input (str):
gpUtReadWriteClient query output
Returns:
str or None:
Contents of query output. None returned if 'error' in input or regular
expression matching fails.
Notes:
gpUtReadWriteClient returns multiple lines of information,
with queried value typically at end of the last line as
= <#####>.
Numeric output will have format of:
[+/-]X or [+/-]X.X or [+/-]X.X[e/E][+/-]X
"""
if "ERROR" in input.upper():
return None
res = re.search(r"= <([^>]+)>.", input)
if res:
return res.groups()[0]
else:
return None
[docs] def read_gmb_value(self, field: str, server: str = "tlc") -> Union[str, None]:
"""Wrapper of gpUtReadWriteClient read command
Args:
field (str):
Field to read.
server (str):
Server name or address (defaults to 'tlc')
Returns:
str or None:
Contents of query output. None returned if querying or pattern matching
fails.
Notes:
gpUtReadWriteClient returns multiple lines of information,
with queried value typically at end of the last line as
= <#####>.
Numeric output will have format of:
[+/-]X or [+/-]X.X or [+/-]X.X[e/E][+/-]X
"""
cmdlist = [self.binaries["gpUtReadWriteClient"], server, "0", field]
if self.verb:
print("Executing {}".format(" ".join(cmdlist)))
if not self.sim:
output = subprocess.check_output(cmdlist, stderr=subprocess.STDOUT)
else:
output = "= <{}>.".format(self.sim_fill_val).encode()
if self.verb:
print("gpUtReadWriteClient call returned:\n{}".format(output.decode()))
return self.parse_ReadWriteClient_output(output.decode())
[docs] def write_gmb_value(self, field: str, value: Any, server: str = "tlc") -> None:
"""Wrapper of gpUtReadWriteClient write command
Args:
field (str):
Field to read.
server (str):
Server name or address (defaults to 'tlc')
Returns:
None
"""
cmdlist = [
self.binaries["gpUtReadWriteClient"],
server,
"1",
field,
"{}".format(value),
]
if self.verb:
print("Executing {}".format(" ".join(cmdlist)))
if not self.sim:
output = subprocess.check_output(cmdlist, stderr=subprocess.STDOUT)
else:
output = (
(
"gpUtReadWriteClient.c, 533: <1> main: connecting to host <{}>\n"
"gpUtReadWriteClient.c, 602: <0> main: <{}>[0][0][0] = <{}>.\n"
)
.format(server, field, value)
.encode()
)
if self.verb:
print("gpUtReadWriteClient call returned:\n{}".format(output.decode()))
[docs] def read_gmb_values(
self, fields: npt.ArrayLike, server: str = "tlc"
) -> npt.NDArray[Any]:
"""Wrapper of gpUtReadGmbValues
Args:
field (:py:data:`~numpy.typing.ArrayLike`):
Fields to read.
server (str):
Server name or address (defaults to 'tlc')
Returns:
~numpy.ndarray:
Contents of fields.
Notes:
gpUtReadGmbValues returns one line of info per query, followed by a blank
newline. The data lines will either be the value of the field, or an error
message (including the word 'ERROR').
In cases where an initial connection cannot be established (server name is
wrong or server cannot be reached), only a single line of error message is
returned.
Numeric outputs will have format of:
[+/-]X or [+/-]X.X or [+/-]X.X[e/E][+/-]X
"""
fieldsarr = np.array(fields, ndmin=1)
cmdlist = [self.binaries["gpUtReadGmbValues"], server]
for f in fieldsarr:
cmdlist.append(f)
if self.verb:
print("Executing {}".format(" ".join(cmdlist)))
if not self.sim:
output = subprocess.check_output(cmdlist, stderr=subprocess.STDOUT)
else:
output = "\n".join([self.sim_fill_val] * (fieldsarr.size + 1)).encode()
if self.verb:
print("gpUtReadGmbValues call returned:\n{}".format(output.decode()))
res = np.array(output.decode().split("\n"))[:-1]
assert (
res.size == fieldsarr.size
), "Output length doesn't match requested fields length."
return res
[docs] def read_gmb_arrays(
self, fields: npt.ArrayLike, maxlen: int = 1, server: str = "tlc"
) -> npt.NDArray[Any]:
"""Wrapper of gpUtReadGmbArrs
Args:
field (:py:data:`~numpy.typing.ArrayLike`):
Fields to read.
maxlen (int):
Maximum field length. Defaults to 1.
server (str):
Server name or address (defaults to 'tlc')
Returns:
~numpy.ndarray:
Contents of fields.
Notes:
gpUtReadGmbArrs returns one line of info per query, followed by a blank
newline. The data lines will either be the value of the field, or an error
message (including the word 'ERROR').
In cases where an initial connection cannot be established (server name is
wrong or server cannot be reached), only a single line of error message is
returned.
Numeric outputs will have format of:
[+/-]X or [+/-]X.X or [+/-]X.X[e/E][+/-]X
"""
fieldsarr = np.array(fields, ndmin=1)
cmdlist = [self.binaries["gpUtReadGmbArrs"], server, "{}".format(maxlen)]
for f in fieldsarr:
cmdlist.append(f)
if self.verb:
print("Executing {}".format(" ".join(cmdlist)))
if not self.sim:
output = subprocess.check_output(cmdlist, stderr=subprocess.STDOUT)
else:
output = "\n".join([self.sim_fill_val] * (fieldsarr.size + 1)).encode()
if self.verb:
print("gpUtReadGmbArrs call returned:\n{}".format(output.decode()))
res = np.array(output.decode().split("\n"))[:-1]
assert (
res.size == fieldsarr.size
), "Output length doesn't match requested fields length."
return res
[docs] def read_gmb_values_legacy(
self, fields: npt.ArrayLike, server: str = "tlc"
) -> npt.NDArray[Any]:
"""Wrapper of read_gmb_value for array-like inputs. Matches read_gmb_values
syntax.
Args:
field (:py:data:`~numpy.typing.ArrayLike`):
Fields to read.
server (str):
Server name or address (defaults to 'tlc')
Returns:
~numpy.ndarray:
Contents of fields.
Notes:
See :py:meth:`read_gmb_value`
"""
fieldsarr = np.array(fields, ndmin=1)
output = np.zeros(fieldsarr.size, dtype=object)
for j, f in enumerate(fieldsarr):
output[j] = self.read_gmb_value(f, server=server)
return output.astype(str)
[docs] def list_gmb_fields(self, server: str = "tlc") -> List[str]:
"""Wrapper of gpUtReadWriteClient read command
Args:
server (str):
Server name or address (defaults to 'tlc')
Returns:
list:
List of field names
Notes:
Expected return from gpUtGmbReadVal is multiple lines of debug messages
with all requested field names, followed by a single blank newline.
"""
cmdlist = [self.binaries["gpUtGmbReadVal"], "-list", server]
if self.verb:
print("Executing {}".format(" ".join(cmdlist)))
if not self.sim:
output = subprocess.check_output(cmdlist, stderr=subprocess.STDOUT)
else:
output = "<0> {}".format(self.sim_fill_val).encode()
if self.verb:
print("gpUtGmbReadVal call returned:\n{}".format(output.decode()))
tmp = output.decode().split("\n")[:-1]
p = re.compile(r"<0> (\S*)")
res = [p.search(o).group(1) for o in tmp] # type: ignore
return res