Source code for gpilib2.rpc

import os
import asyncio
import subprocess
from typing import Union, Any, Tuple, Optional, List
import re
import numpy.typing as npt
import numpy as np
import time
from gpilib2.util import get_tlc_bindir


[docs]async def run( cmd: str, env: Optional[os._Environ[str]] = None, cwd: Optional[str] = None ) -> Tuple[str, Optional[int], bytes, bytes]: """Wrapper for asynchronous command execution Args: cmd (str): Full command string to execute env (os._Environ) Environment dictionary. Defaults to None. cwd (str) Working directory. Defaults to None Returns: tuple: cmd (str): Original command string returncode (int): proc.returncode stdout (bytes): stdout bytestream stderr (bytes): stderr bytestream """ proc = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=env, cwd=cwd, ) stdout, stderr = await proc.communicate() return cmd, proc.returncode, stdout, stderr
[docs]class rpc: """Class for io methods via rpc utilities Args: sim (bool): Run in sim mode (default False). Setting to true automatically toggles verbose mode verb (bool): Toggle verbose mode (default False) sim_fill_val (str): Default value to fill for simulated GMB queries. Defaults to '0'. Attributes: sim (bool): Simulated mode toggle (can be manually set at any time) verb (bool): Verbosity toggle (can be manually set at any time). If object is created with ``sim = True`` then verbosity is automatically toggled on. binaries (dict): Dictionary of binary names (keys) and full paths (values) command_queue (list): Command queue. List of tuples of the form ``(cmdlist, cmd_gmb_field)`` where ``cmdlist`` is a list of command arguments and ``cmd_gmb_field`` is a string of the base of the command GMB field. ass_mode (dict): Dictionary of assembly modes. Must match enum in gpUtGeneral.h activity (dict): Dictionary of activity directives. Must match enum in gpUtGeneral.h activityId (int): Default activity id (set to -1 to match GUI activities) move_level (int): Default level value for MOVE commands nmcd (int): Number of :term:`MCD` nax (int): Number of axes per :term:`MCD` mcdaxes (~numpy.ndarray(str)): Base of GMB fields for all MCD axes mcdaxisnames (~numpy.ndarray(str)): Descriptions of all MCD axes mcdaxisinited (~numpy.ndarray(str)): GMB fields for MCD Axis init status mcdaxisdatumed (~numpy.ndarray(str)): GMB fields for MCD Axis datum status mcdaxissimed (~numpy.ndarray(str)): GMB fields for MCD Axis sim status """ def __init__( self, sim: bool = False, verb: bool = False, sim_fill_val: str = "0" ) -> None: """Locate all required rpc binaries and populate standard values. Args: sim (bool): Run in sim mode (default False). Setting to true automatically toggles verbose mode verb (bool): Toggle verbose mode (default False) sim_fill_val (str): Default value to fill for simulated GMB queries. Defaults to '0'. """ self.sim = sim # global flags self.sim_fill_val = sim_fill_val if self.sim: self.verb = True else: self.verb = verb # These are all of the binaries we expect to have binaries = [ "gpAoAssClientTester", "gpAocRpcClientTester", "gpCalAssClientTester", "gpCalRpcClientTester", "gpFpmAssClientTester", "gpIfsAssClientTester", "gpIfsRpcClientTester", "gpIsClientTester", "gpPowBarAssClientTester", "gpPpmAssClientTester", "gpSrcAssClientTester", "gpTrkAssClientTester", "gpUtReadWriteClient", "gpUtReadGmbValues", "gpUtGmbReadVal", "gpUtReadGmbArrs", ] # dictionary of binary names and full paths self.binaries = {} # verify that everything is as it should be if not self.sim: self.tlcbindir = get_tlc_bindir() else: self.tlcbindir = "" for c in binaries: self.binaries[c] = os.path.join(self.tlcbindir, c) if not self.sim: assert os.access( self.binaries[c], os.X_OK ), "{} not found or is not executable.".format(c) # command queue is list of tuples (cmdlist, cmd_gmb_field) self.command_queue = [] # type: List[Tuple[List[str], str]] # define assembly modes self.ass_mode = { "INIT": 0, "MOVE": 1, "TRACK": 2, "DATUM": 3, "PARK": 4, "TEST": 5, "SIM": 6, "DEBUG": 7, "REBOOT": 8, } # define activity directives self.activity = {"PRESET": 0, "START": 1, "PRESET_START": 2, "CANCEL": 3} self.activityId = -1 # this is always the default command id self.move_level = 0 # this is the level to use for any move # built MCD list self.nmcd = 4 # number of MCDs self.nax = 8 # number of axes per MCD self.mcdaxes = np.array([ "mcd{}.axisList[{}].axConfig".format(j, k) for j in range(1, self.nmcd + 1) for k in range(1, self.nax + 1) ]) self.mcdaxisnames = self.read_gmb_arrays( ["{}.desc".format(ax) for ax in self.mcdaxes], 20 ) self.mcdaxisinited = np.array( ["{}.initialized".format(ax) for ax in self.mcdaxes] ) self.mcdaxisdatumed = np.array(["{}.datumed".format(ax) for ax in self.mcdaxes]) self.mcdaxissimed = np.array(["{}.sim".format(ax) for ax in self.mcdaxes]) if self.verb: print("Completed init of rpc object.") def __str__(self) -> str: """Print attributes""" return "SIMULATED: {}, VERBOSE: {}".format(self.sim, self.verb)
[docs] async def assemble_pending_commands( self, cmdstrs: List[str] ) -> List[Tuple[str, Optional[int], bytes, bytes]]: """Dump set of pending commands into a gather Args: cmdstrs (list): List of full command strings Returns: list: list of tuples as returned by run """ statements = [run(s) for s in cmdstrs] return await asyncio.gather(*statements)
[docs] def clear_queue(self) -> None: """Remove all pending commands from queue without executing""" self.command_queue = []
[docs] def execute_queue(self, nowait: bool = False) -> None: """Execute all pending commands in queue Args: nowait (bool): If True, return immediately upon confirming an ack. Default False - wait for command state to register complete. Returns: None """ if len(self.command_queue) == 0: print("Comand queue is empty.") return # Empty out the queue cmdstrs = [] fields = [] for q in self.command_queue: cmdstrs.append(" ".join(q[0])) fields.append(q[1]) self.command_queue = [] # assemble ack checks ackfields = [] for f in fields: ackfields.append(f + ".ack") ackfields.append(f + ".ackErrMsg") # execute and check results if self.verb: print("Executing:\n {}".format("\n".join(cmdstrs))) return if not self.sim: res = np.array( asyncio.run(self.assemble_pending_commands(cmdstrs)) ).transpose() ack = self.read_gmb_values(ackfields).reshape((len(fields), 2)).transpose() # In case of error try to get as much info as possible: bad = np.where((res[1].astype(int) != 0) | (ack[0].astype(int) == 3))[0] if len(bad) > 0: for b in bad: errtxt = "" if res[2, bad]: errtxt += res[2, bad][0] if res[3, bad]: errtxt += res[3, bad][0] errtxt += "Ack Error Message:\n {}\n".format(ack[1, bad][0]) raise RuntimeError( "Error encountered sending command:\n {}\n{}".format( res[0, bad][0], errtxt ) ) # Wait for completion if not nowait: res2 = self.wait_for_command_completion(fields) if res2: raise RuntimeError("Command State Error:\n{}".format(res2[0]))
[docs] def wait_for_command_completion( self, fields: npt.ArrayLike, polling_interval: float = 0.5 ) -> Optional[npt.NDArray[Any]]: """Wait for completion of one or more commands Args: fields (:py:data:`~numpy.typing.ArrayLike`): Base of GMB fields for commands polling_interval (float): Time (seconds) to wait between state checks (defaults to 0.5) Returns: None or ~numpy.ndarray: If all commands were successful, return None, otherwise an array of state error messages equivalent in size to the input list of GMB fields. Notes: """ polling_interval = float(polling_interval) statefields = np.array([f + ".state" for f in np.array(fields, ndmin=1)]) states = self.read_gmb_values(statefields).astype(int) while np.any(states == 1): states[states == 1] = self.read_gmb_values(statefields[states == 1]).astype( int ) time.sleep(polling_interval) if np.any(states == 2): errorfields = [f + "ErrMsg" for f in statefields[states == 2]] out = np.zeros(states.shape, dtype=object) out[states == 2] = self.read_gmb_values(errorfields) return out.astype(str) else: return None
[docs] def execute( self, cmdlist: List[str], cmd_gmb_field: str, queue: bool = False, nowait: bool = False, ) -> None: """Queue or execute a command, and optionally wait for completion Args: cmdlist (list): Command and arguments cmd_gmb_field (str): Base of GMB field to check for ack and status queue (bool): If True, queue the command for later execution. Defaults False. nowait (bool): If True, return immediately upon confirming an ack. Default False - wait for command state to register complete. Returns: None: Notes: """ if queue: self.command_queue.append((cmdlist, cmd_gmb_field)) return # Issue command and check result if self.verb: print("Executing {}".format(" ".join(cmdlist))) if not self.sim: output = subprocess.run(cmdlist, capture_output=True) else: output = subprocess.CompletedProcess(cmdlist, 0) # check ack state ack = self.read_gmb_values( [cmd_gmb_field + ".ack", cmd_gmb_field + ".ackErrMsg"] ) # In case of error try to get as much info as possible: if (output.returncode != 0) or (int(ack[0]) == 3): errtxt = "" if output.stderr: errtxt += output.stderr.decode() + "\n" if output.stdout: errtxt += output.stdout.decode() + "\n" errtxt += "Ack Error Message:\n {}\n".format(ack[1]) raise RuntimeError( "Error encountered sending command:\n {}\n{}".format( " ".join(cmdlist), errtxt ) ) if not nowait: res = self.wait_for_command_completion(cmd_gmb_field) if res: raise RuntimeError("Command State Error:\n{}".format(res[0]))
[docs] def parse_ReadWriteClient_output(self, input: str) -> Union[str, None]: """Parse output of gpUtReadWriteClient command Args: input (str): gpUtReadWriteClient query output Returns: str or None: Contents of query output. None returned if 'error' in input or regular expression matching fails. Notes: gpUtReadWriteClient returns multiple lines of information, with queried value typically at end of the last line as = <#####>. Numeric output will have format of: [+/-]X or [+/-]X.X or [+/-]X.X[e/E][+/-]X """ if "ERROR" in input.upper(): return None res = re.search(r"= <([^>]+)>.", input) if res: return res.groups()[0] else: return None
[docs] def read_gmb_value(self, field: str, server: str = "tlc") -> Union[str, None]: """Wrapper of gpUtReadWriteClient read command Args: field (str): Field to read. server (str): Server name or address (defaults to 'tlc') Returns: str or None: Contents of query output. None returned if querying or pattern matching fails. Notes: gpUtReadWriteClient returns multiple lines of information, with queried value typically at end of the last line as = <#####>. Numeric output will have format of: [+/-]X or [+/-]X.X or [+/-]X.X[e/E][+/-]X """ cmdlist = [self.binaries["gpUtReadWriteClient"], server, "0", field] if self.verb: print("Executing {}".format(" ".join(cmdlist))) if not self.sim: output = subprocess.check_output(cmdlist, stderr=subprocess.STDOUT) else: output = "= <{}>.".format(self.sim_fill_val).encode() if self.verb: print("gpUtReadWriteClient call returned:\n{}".format(output.decode())) return self.parse_ReadWriteClient_output(output.decode())
[docs] def write_gmb_value(self, field: str, value: Any, server: str = "tlc") -> None: """Wrapper of gpUtReadWriteClient write command Args: field (str): Field to read. server (str): Server name or address (defaults to 'tlc') Returns: None """ cmdlist = [ self.binaries["gpUtReadWriteClient"], server, "1", field, "{}".format(value), ] if self.verb: print("Executing {}".format(" ".join(cmdlist))) if not self.sim: output = subprocess.check_output(cmdlist, stderr=subprocess.STDOUT) else: output = ( ( "gpUtReadWriteClient.c, 533: <1> main: connecting to host <{}>\n" "gpUtReadWriteClient.c, 602: <0> main: <{}>[0][0][0] = <{}>.\n" ) .format(server, field, value) .encode() ) if self.verb: print("gpUtReadWriteClient call returned:\n{}".format(output.decode()))
[docs] def read_gmb_values( self, fields: npt.ArrayLike, server: str = "tlc" ) -> npt.NDArray[Any]: """Wrapper of gpUtReadGmbValues Args: field (:py:data:`~numpy.typing.ArrayLike`): Fields to read. server (str): Server name or address (defaults to 'tlc') Returns: ~numpy.ndarray: Contents of fields. Notes: gpUtReadGmbValues returns one line of info per query, followed by a blank newline. The data lines will either be the value of the field, or an error message (including the word 'ERROR'). In cases where an initial connection cannot be established (server name is wrong or server cannot be reached), only a single line of error message is returned. Numeric outputs will have format of: [+/-]X or [+/-]X.X or [+/-]X.X[e/E][+/-]X """ fieldsarr = np.array(fields, ndmin=1) cmdlist = [self.binaries["gpUtReadGmbValues"], server] for f in fieldsarr: cmdlist.append(f) if self.verb: print("Executing {}".format(" ".join(cmdlist))) if not self.sim: output = subprocess.check_output(cmdlist, stderr=subprocess.STDOUT) else: output = "\n".join([self.sim_fill_val] * (fieldsarr.size + 1)).encode() if self.verb: print("gpUtReadGmbValues call returned:\n{}".format(output.decode())) res = np.array(output.decode().split("\n"))[:-1] assert ( res.size == fieldsarr.size ), "Output length doesn't match requested fields length." return res
[docs] def read_gmb_arrays( self, fields: npt.ArrayLike, maxlen: int = 1, server: str = "tlc" ) -> npt.NDArray[Any]: """Wrapper of gpUtReadGmbArrs Args: field (:py:data:`~numpy.typing.ArrayLike`): Fields to read. maxlen (int): Maximum field length. Defaults to 1. server (str): Server name or address (defaults to 'tlc') Returns: ~numpy.ndarray: Contents of fields. Notes: gpUtReadGmbArrs returns one line of info per query, followed by a blank newline. The data lines will either be the value of the field, or an error message (including the word 'ERROR'). In cases where an initial connection cannot be established (server name is wrong or server cannot be reached), only a single line of error message is returned. Numeric outputs will have format of: [+/-]X or [+/-]X.X or [+/-]X.X[e/E][+/-]X """ fieldsarr = np.array(fields, ndmin=1) cmdlist = [self.binaries["gpUtReadGmbArrs"], server, "{}".format(maxlen)] for f in fieldsarr: cmdlist.append(f) if self.verb: print("Executing {}".format(" ".join(cmdlist))) if not self.sim: output = subprocess.check_output(cmdlist, stderr=subprocess.STDOUT) else: output = "\n".join([self.sim_fill_val] * (fieldsarr.size + 1)).encode() if self.verb: print("gpUtReadGmbArrs call returned:\n{}".format(output.decode())) res = np.array(output.decode().split("\n"))[:-1] assert ( res.size == fieldsarr.size ), "Output length doesn't match requested fields length." return res
[docs] def read_gmb_values_legacy( self, fields: npt.ArrayLike, server: str = "tlc" ) -> npt.NDArray[Any]: """Wrapper of read_gmb_value for array-like inputs. Matches read_gmb_values syntax. Args: field (:py:data:`~numpy.typing.ArrayLike`): Fields to read. server (str): Server name or address (defaults to 'tlc') Returns: ~numpy.ndarray: Contents of fields. Notes: See :py:meth:`read_gmb_value` """ fieldsarr = np.array(fields, ndmin=1) output = np.zeros(fieldsarr.size, dtype=object) for j, f in enumerate(fieldsarr): output[j] = self.read_gmb_value(f, server=server) return output.astype(str)
[docs] def list_gmb_fields(self, server: str = "tlc") -> List[str]: """Wrapper of gpUtReadWriteClient read command Args: server (str): Server name or address (defaults to 'tlc') Returns: list: List of field names Notes: Expected return from gpUtGmbReadVal is multiple lines of debug messages with all requested field names, followed by a single blank newline. """ cmdlist = [self.binaries["gpUtGmbReadVal"], "-list", server] if self.verb: print("Executing {}".format(" ".join(cmdlist))) if not self.sim: output = subprocess.check_output(cmdlist, stderr=subprocess.STDOUT) else: output = "<0> {}".format(self.sim_fill_val).encode() if self.verb: print("gpUtGmbReadVal call returned:\n{}".format(output.decode())) tmp = output.decode().split("\n")[:-1] p = re.compile(r"<0> (\S*)") res = [p.search(o).group(1) for o in tmp] # type: ignore return res