Source code for gpilib2.shutters

from gpilib2.generic_gpi_component import generic_gpi_component
from gpilib2.generic_gpi_assembly import generic_gpi_assembly
from typing import List, Optional, Union
from gpilib2.rpc import rpc
import numpy as np


[docs]class shutters(generic_gpi_assembly): """All GPI shutters Args: rpc (:py:class:`~gpilib2.rpc`): rpc object. sim status and verbosity will be set based on its settings. Attributes: rpc (:py:class:`~gpilib2.rpc`): rpc object for communications omss_shutter (:py:class:`~gpilib2.shutters.gpi_shuter`): OMSS shutter cal_entr (:py:class:`~gpilib2.shutters.gpi_shutter`): cal entrance shutter cal_exit (:py:class:`~gpilib2.shutters.gpi_shutter`): cal exit shutter cal_ref (:py:class:`~gpilib2.shutters.gpi_shutter`): cal reference leg shutter cal_sci (:py:class:`~gpilib2.shutters.gpi_shutter`): cal science leg shutter components (list): List of all shutter objects. Because the CAL Entrance/Exit and Ref/Sci shutters are paired for all inits, sims and datums, only one of each is included. """ def __init__(self, rpc: rpc) -> None: """Define all components Args: rpc (:py:class:`~gpilib2.rpc`): rpc object. sim status and verbosity will be set based on its settings. """ generic_gpi_assembly.__init__(self, rpc) self.omss = gpi_shutter( rpc, "gpSrcAssClientTester", # binary 0, # cmd ["-1", "-1"], # default cmd 0, # cmd args index "tlc.srcAss.srcAssCmd_SHUTTERS.subCmd", # cmd gmb field "tlc.srcAss.omssEntranceShutter", # curr state gmb field "OMSS Entrance Shutter", # what I'm called ) self.entrance = gpi_shutter( rpc, "gpCalAssClientTester", # binary 0, # cmd ["-1", "-1"], # default cmd 0, # cmd args index "tlc.calAss.calAssCmd_CAL_EE_SHUTTERS.subCmd", # cmd gmb field "tlc.calAss.entranceShutter", # curr state gmb field "CAL Entrance Shutter", # what I'm called ) self.exit = gpi_shutter( rpc, "gpCalAssClientTester", # binary 0, # cmd ["-1", "-1"], # default cmd 1, # cmd args index "tlc.calAss.calAssCmd_CAL_EE_SHUTTERS.subCmd", # cmd gmb field "tlc.calAss.exitShutter", # curr state gmb field "CAL Exit Shutter", # what I'm called ) self.ref = gpi_shutter( rpc, "gpCalAssClientTester", # binary 1, # cmd ["-1", "-1"], # default cmd 0, # cmd args index "tlc.calAss.calAssCmd_CAL_REF_SCI_SHUTTERS.subCmd", # cmd gmb field "tlc.calAss.refShutter", # curr state gmb field "CAL Reference Shutter", # what I'm called ) self.sci = gpi_shutter( rpc, "gpCalAssClientTester", # binary 1, # cmd ["-1", "-1"], # default cmd 1, # cmd args index "tlc.calAss.calAssCmd_CAL_REF_SCI_SHUTTERS.subCmd", # cmd gmb field "tlc.calAss.sciShutter", # curr state gmb field "CAL Science Shutter", # what I'm called ) # EE and RS are paired, so only need one of each self.components = [self.omss, self.entrance, self.ref] def __str__(self) -> str: """assemble all component strings""" return "\n".join( [ c.__str__() for c in [self.omss, self.entrance, self.exit, self.ref, self.sci] ] )
[docs] def move( self, omss: Optional[str] = None, entrance: Optional[str] = None, exit: Optional[str] = None, ref: Optional[str] = None, sci: Optional[str] = None, queue: bool = False, ) -> None: """MOVE command for all shutters Args: omss (str or None): OMSS setting ('open' or 'close') or None to leave as is. entrance (str or None): CAL entrance setting ('open' or 'close') or None to leave as is. exit (str or None): CAL exit setting ('open' or 'close') or None to leave as is. ref (str or None): CAL reference setting ('open' or 'close') or None to leave as is. sci (str or None): CAL science setting ('open' or 'close') or None to leave as is. queue (bool): If True, queue command rather than executing. .. note:: All string inputs are case insensitive. """ opts = {"CLOSE": 0, "OPEN": 1} states = [] for s in [omss, entrance, exit, ref, sci]: if s: assert s.upper() in opts, "Allowed inputs are: {}".format( ", ".join(opts.keys()) ) states.append(opts[s.upper()]) else: states.append(-1) # OMSS is by itself if omss: self.omss.move(states[0], queue=True) # E/E and Ref/Sci handled in pairs if np.any(np.array(states[1:3]) != -1): self.entrance.move(states[1:3], queue=True) if np.any(np.array(states[3:]) != -1): self.ref.move(states[3:], queue=True) if not queue: self.rpc.execute_queue()
[docs]class gpi_shutter(generic_gpi_component): """GPI shutter Args: rpc (:py:class:`~gpilib2.rpc`): rpc object. sim status and verbosity will be set based on its settings. binary (str): binary name move_cmd (int): Command number in binary to use default_cmd_args (list): Default argument sets to use for inits and datums and sims cmd_args_index (int): Index in default command list where actual shutter comand goes move_gmb_field (str): GMB field prefix corresponding to the move command curr_state_gmb_field (str): GMB field of current shutter state name (str): Name of this shutter server (str): Server address to send commands to. Attributes: rpc (:py:class:`~gpilib2.rpc`): rpc object for communications server (str): Server address to send commands to. binary (str): binary name move_cmd (int): Command number in binary to use default_cmd_args (list): Default argument sets to use for inits and datums and sims cmd_args_index (int): Index in default command list where actual shutter comand goes move_gmb_field (str): GMB field prefix corresponding to the move command curr_state_gmb_field (str): GMB field of current shutter state name (str): Name of this shutter """ def __init__( self, rpc: rpc, binary: str, move_cmd: int, default_cmd_args: List[str], cmd_args_index: int, move_gmb_field: str, curr_state_gmb_field: str, name: str, server: str = "tlc", ) -> None: """ Set up the shutter """ generic_gpi_component.__init__( self, rpc, binary, move_cmd, default_cmd_args, move_gmb_field, curr_state_gmb_field, name=name, server=server, ) self.cmd_args_index = cmd_args_index def __str__(self) -> str: """Returns shutter status string""" vals = self.rpc.read_gmb_values( [ self.curr_state_gmb_field, ".".join(self.move_gmb_field.split(".")[:-1]) + ".simulate", ] ).astype(int) name = self.name if int(vals[1]) == 1: name += " (sim)" return "{0: >30}: {1}".format(name, np.array(["Closed", "Open"])[vals[0]])
[docs] def move(self, state: Union[List[int], int], queue: bool = False) -> None: """Shutter move command Args: state (int or [int, int]): 0 for close, 1 for open. The list input is for toggling both E/E or ref/sci shutters at the same time. queue (bool): If True, queue command rather than executing. """ if isinstance(state, list): cmdlist = np.array(state).astype(str).tolist() else: # check where we are curr_state = self.rpc.read_gmb_values(self.curr_state_gmb_field).astype( int )[0] if curr_state == state: print( "{} is already {}.".format( self.name, np.array(["closed", "open"])[state] ) ) return cmdlist = self.default_cmd_args.copy() cmdlist[self.cmd_args_index] = "{}".format(state) cmdlist = [ self.rpc.binaries[self.binary], self.server, "{}".format(self.move_cmd), "{}".format(self.rpc.activityId), "{}".format(self.rpc.activity["START"]), "{}".format(self.rpc.ass_mode["MOVE"]), "{}".format(self.rpc.move_level), ] + cmdlist # Execute or queue self.rpc.execute(cmdlist, self.move_gmb_field, queue=queue)
[docs] def open(self, queue: bool = False) -> None: """Open a shutter Args: queue (bool): If True, queue command rather than executing. """ self.move(1, queue=queue)
[docs] def close(self, queue: bool = False) -> None: """Open a shutter Args: queue (bool): If True, queue command rather than executing. """ self.move(0, queue=queue)