Source code for gpilib2.generic_gpi_component

from typing import List, Optional
from gpilib2.rpc import rpc
import numpy.typing as npt
import numpy as np
import warnings


[docs]class generic_gpi_component: """Base class for various components that take generic commands Args: rpc (:py:class:`~gpilib2.rpc`): rpc object. sim status and verbosity will be set based on its settings. binary (str): binary name move_cmd (int): Command number in binary to use default_cmd_args (list): Default argument sets to use for inits and datums and sims move_gmb_field (str): GMB field prefix corresponding to the move command curr_state_gmb_field (str): GMB field of current state of component name (str): Component name server (str): Server address to send commands to. mcd_name (:py:data:`~numpy.typing.ArrayLike`, optional): Label(s) of :term:`MCD` axis for this component. If None (default), assume that device is not on an MCD. Attributes: rpc (:py:class:`~gpilib2.rpc`): rpc object for communications server (str): Server address to send commands to. binary (str): binary name move_cmd (int): Command number in binary to use default_cmd_args (list): Default argument sets to use for inits and datums and sims move_gmb_field (str): GMB field prefix corresponding to the move command curr_state_gmb_field (str): GMB field of current state of component name (str): Component name mcd_name (~numpy.ndarray(str), optional): Label(s) of :term:`MCD` axis/axes for this component. If None, assume that device is not on an MCD. mcd_inds (~numpy.ndarray(int), optional): Indices of entries in :py:attr:`~gpilib2.rpc.rpc.mcdaxisnames` corresponding to the entries of mcd_name. None if mcd_name is None. """ def __init__( self, rpc: rpc, binary: str, move_cmd: int, default_cmd_args: List[str], move_gmb_field: str, curr_state_gmb_field: str, name: str, server: str = "tlc", mcd_name: Optional[npt.ArrayLike] = None, ) -> None: """Initialize component""" self.rpc = rpc self.binary = binary self.move_cmd = move_cmd self.default_cmd_args = default_cmd_args self.move_gmb_field = move_gmb_field self.curr_state_gmb_field = curr_state_gmb_field self.name = name self.server = server if mcd_name is not None: mcd_name = np.array(mcd_name, ndmin=1) mcd_inds = [] # make sure that MCD axes actually exist for ax in mcd_name: assert ax in self.rpc.mcdaxisnames, "{} is not a known MCD axis".format( ax ) mcd_inds.append(np.where(self.rpc.mcdaxisnames == ax)[0]) self.mcd_inds: Optional[npt.NDArray[np.int_]] = np.array(mcd_inds).flatten() else: self.mcd_inds = None self.mcd_name = mcd_name # if this thing is on an MCD, let's make sure it's inited and datumed if self.mcd_name is not None: self.check_init(warn=True) self.check_datum(warn=True)
[docs] def check_mcd_status( self, statarr: npt.NDArray[np.str_], msg_txt: str, warn: bool = False ) -> None: """Check component status at the MCD level Args: statarr (~numpy.ndarray(str)): Status array to check. Should be one of: * :py:attr:`~gpilib2.rpc.rpc.mcdaxisdatumed` * :py:attr:`~gpilib2.rpc.rpc.mcdaxisinited` * :py:attr:`~gpilib2.rpc.rpc.mcdaxissimed` msg_txt (str): Text to use in warning/assertion message. Should be on of 'init', 'datum', or 'sim', corresponding to choice of statarr. warn (bool): Issue warning instead of throwing AssertionError. Defaults False. .. note:: If ``self.mcd_name`` is None, do nothing. Otherwise, by default, throw an AssertionError if check fails. """ if self.mcd_name is None: return stat = self.rpc.read_gmb_values(statarr[self.mcd_inds]).astype(bool) if not (np.all(stat)): msg = ( "\nIn mechanism {0}, MCD Axis {1} not {2}ed.\n" "Correct by running the relevant component {2}() command." ).format(self.name, self.mcd_name[~stat], msg_txt) if warn: warnings.warn(msg, stacklevel=2) else: assert np.all(stat), msg
[docs] def check_datum(self, warn: bool = False) -> None: """Check that component is datumed at the MCD level Args: warn (bool): Issue warning instead of throwing AssertionError. Defaults False. """ self.check_mcd_status(self.rpc.mcdaxisdatumed, 'datum', warn=warn)
[docs] def check_init(self, warn: bool = False) -> None: """Check that component is inited at the MCD level Args: warn (bool): Issue warning instead of throwing AssertionError. Defaults False. """ self.check_mcd_status(self.rpc.mcdaxisinited, 'init', warn=warn)
[docs] def datum(self, queue: bool = False) -> None: """Send DATUM command Args: queue (bool): Queue rather than execute immediately. Defaults False. Returns: None """ self.generic_command("DATUM", queue=queue)
[docs] def init(self, queue: bool = False) -> None: """Send INIT command Args: queue (bool): Queue rather than execute immediately. Defaults False. Returns: None """ self.generic_command("INIT", queue=queue)
[docs] def set_sim(self, queue: bool = False) -> None: """Send SIM LEVEL=1 command Args: queue (bool): Queue rather than execute immediately. Defaults False. Returns: None """ self.generic_command("SIM", level=1, queue=queue)
[docs] def set_unsim(self, queue: bool = False) -> None: """Send SIM LEVEL=0 command Args: queue (bool): Queue rather than execute immediately. Defaults False. Returns: None """ self.generic_command("SIM", level=0, queue=queue)
[docs] def track(self, queue: bool = False) -> None: """Send TRACK command Args: queue (bool): Queue rather than execute immediately. Defaults False. Returns: None """ self.generic_command("TRACK", queue=queue)
[docs] def stop(self, queue: bool = False) -> None: """Send STOP command Args: queue (bool): Queue rather than execute immediately. Defaults False. Returns: None """ self.generic_command("MOVE", activity="CANCEL", queue=queue)
[docs] def generic_command( self, mode: str, level: Optional[int] = None, activity: str = "START", queue: bool = False, ) -> None: """Send a non-move command Args: mode (str): Mode string (typically INIT, DATUM or SIM). Must be in rpc.ass_mode level (int or None): If None, use rpc.move_level. Otherwise use this level. Only has an effect for SIM and DEBUG. activity (str): Activity directve. Defaults to "START" queue (bool): Queue rather than execute immediately. Defaults False. Returns: None Notes: All such commands look the same, but each individual binary command will require a different set of default values to tack on to the command string. """ if level is None: level = self.rpc.move_level cmdlist = [ self.rpc.binaries[self.binary], # binary self.server, # server "{}".format(self.move_cmd), # move command "{}".format(self.rpc.activityId), # activity id "{}".format(self.rpc.activity[activity]), # activity directive "{}".format(self.rpc.ass_mode[mode]), # mode "{}".format(level), # level ] + self.default_cmd_args # default arguments # Execute or Queue: self.rpc.execute(cmdlist, self.move_gmb_field, queue=queue)