from typing import List, Optional
from gpilib2.rpc import rpc
import numpy.typing as npt
import numpy as np
import warnings
[docs]class generic_gpi_component:
"""Base class for various components that take generic commands
Args:
rpc (:py:class:`~gpilib2.rpc`):
rpc object. sim status and verbosity will be set
based on its settings.
binary (str):
binary name
move_cmd (int):
Command number in binary to use
default_cmd_args (list):
Default argument sets to use for inits and datums and sims
move_gmb_field (str):
GMB field prefix corresponding to the move command
curr_state_gmb_field (str):
GMB field of current state of component
name (str):
Component name
server (str):
Server address to send commands to.
mcd_name (:py:data:`~numpy.typing.ArrayLike`, optional):
Label(s) of :term:`MCD` axis for this component. If None (default),
assume that device is not on an MCD.
Attributes:
rpc (:py:class:`~gpilib2.rpc`):
rpc object for communications
server (str):
Server address to send commands to.
binary (str):
binary name
move_cmd (int):
Command number in binary to use
default_cmd_args (list):
Default argument sets to use for inits and datums and sims
move_gmb_field (str):
GMB field prefix corresponding to the move command
curr_state_gmb_field (str):
GMB field of current state of component
name (str):
Component name
mcd_name (~numpy.ndarray(str), optional):
Label(s) of :term:`MCD` axis/axes for this component. If None, assume that
device is not on an MCD.
mcd_inds (~numpy.ndarray(int), optional):
Indices of entries in :py:attr:`~gpilib2.rpc.rpc.mcdaxisnames`
corresponding to the entries of mcd_name. None if mcd_name is None.
"""
def __init__(
self,
rpc: rpc,
binary: str,
move_cmd: int,
default_cmd_args: List[str],
move_gmb_field: str,
curr_state_gmb_field: str,
name: str,
server: str = "tlc",
mcd_name: Optional[npt.ArrayLike] = None,
) -> None:
"""Initialize component"""
self.rpc = rpc
self.binary = binary
self.move_cmd = move_cmd
self.default_cmd_args = default_cmd_args
self.move_gmb_field = move_gmb_field
self.curr_state_gmb_field = curr_state_gmb_field
self.name = name
self.server = server
if mcd_name is not None:
mcd_name = np.array(mcd_name, ndmin=1)
mcd_inds = []
# make sure that MCD axes actually exist
for ax in mcd_name:
assert ax in self.rpc.mcdaxisnames, "{} is not a known MCD axis".format(
ax
)
mcd_inds.append(np.where(self.rpc.mcdaxisnames == ax)[0])
self.mcd_inds: Optional[npt.NDArray[np.int_]] = np.array(mcd_inds).flatten()
else:
self.mcd_inds = None
self.mcd_name = mcd_name
# if this thing is on an MCD, let's make sure it's inited and datumed
if self.mcd_name is not None:
self.check_init(warn=True)
self.check_datum(warn=True)
[docs] def check_mcd_status(
self, statarr: npt.NDArray[np.str_], msg_txt: str, warn: bool = False
) -> None:
"""Check component status at the MCD level
Args:
statarr (~numpy.ndarray(str)):
Status array to check. Should be one of:
* :py:attr:`~gpilib2.rpc.rpc.mcdaxisdatumed`
* :py:attr:`~gpilib2.rpc.rpc.mcdaxisinited`
* :py:attr:`~gpilib2.rpc.rpc.mcdaxissimed`
msg_txt (str):
Text to use in warning/assertion message. Should be on of 'init',
'datum', or 'sim', corresponding to choice of statarr.
warn (bool):
Issue warning instead of throwing AssertionError. Defaults False.
.. note::
If ``self.mcd_name`` is None, do nothing. Otherwise, by default, throw an
AssertionError if check fails.
"""
if self.mcd_name is None:
return
stat = self.rpc.read_gmb_values(statarr[self.mcd_inds]).astype(bool)
if not (np.all(stat)):
msg = (
"\nIn mechanism {0}, MCD Axis {1} not {2}ed.\n"
"Correct by running the relevant component {2}() command."
).format(self.name, self.mcd_name[~stat], msg_txt)
if warn:
warnings.warn(msg, stacklevel=2)
else:
assert np.all(stat), msg
[docs] def check_datum(self, warn: bool = False) -> None:
"""Check that component is datumed at the MCD level
Args:
warn (bool):
Issue warning instead of throwing AssertionError. Defaults False.
"""
self.check_mcd_status(self.rpc.mcdaxisdatumed, 'datum', warn=warn)
[docs] def check_init(self, warn: bool = False) -> None:
"""Check that component is inited at the MCD level
Args:
warn (bool):
Issue warning instead of throwing AssertionError. Defaults False.
"""
self.check_mcd_status(self.rpc.mcdaxisinited, 'init', warn=warn)
[docs] def datum(self, queue: bool = False) -> None:
"""Send DATUM command
Args:
queue (bool):
Queue rather than execute immediately. Defaults False.
Returns:
None
"""
self.generic_command("DATUM", queue=queue)
[docs] def init(self, queue: bool = False) -> None:
"""Send INIT command
Args:
queue (bool):
Queue rather than execute immediately. Defaults False.
Returns:
None
"""
self.generic_command("INIT", queue=queue)
[docs] def set_sim(self, queue: bool = False) -> None:
"""Send SIM LEVEL=1 command
Args:
queue (bool):
Queue rather than execute immediately. Defaults False.
Returns:
None
"""
self.generic_command("SIM", level=1, queue=queue)
[docs] def set_unsim(self, queue: bool = False) -> None:
"""Send SIM LEVEL=0 command
Args:
queue (bool):
Queue rather than execute immediately. Defaults False.
Returns:
None
"""
self.generic_command("SIM", level=0, queue=queue)
[docs] def track(self, queue: bool = False) -> None:
"""Send TRACK command
Args:
queue (bool):
Queue rather than execute immediately. Defaults False.
Returns:
None
"""
self.generic_command("TRACK", queue=queue)
[docs] def stop(self, queue: bool = False) -> None:
"""Send STOP command
Args:
queue (bool):
Queue rather than execute immediately. Defaults False.
Returns:
None
"""
self.generic_command("MOVE", activity="CANCEL", queue=queue)
[docs] def generic_command(
self,
mode: str,
level: Optional[int] = None,
activity: str = "START",
queue: bool = False,
) -> None:
"""Send a non-move command
Args:
mode (str):
Mode string (typically INIT, DATUM or SIM). Must be in rpc.ass_mode
level (int or None):
If None, use rpc.move_level. Otherwise use this level. Only has an
effect for SIM and DEBUG.
activity (str):
Activity directve. Defaults to "START"
queue (bool):
Queue rather than execute immediately. Defaults False.
Returns:
None
Notes:
All such commands look the same, but each individual binary command will
require a different set of default values to tack on to the command string.
"""
if level is None:
level = self.rpc.move_level
cmdlist = [
self.rpc.binaries[self.binary], # binary
self.server, # server
"{}".format(self.move_cmd), # move command
"{}".format(self.rpc.activityId), # activity id
"{}".format(self.rpc.activity[activity]), # activity directive
"{}".format(self.rpc.ass_mode[mode]), # mode
"{}".format(level), # level
] + self.default_cmd_args # default arguments
# Execute or Queue:
self.rpc.execute(cmdlist, self.move_gmb_field, queue=queue)