from gpilib2.generic_gpi_component import generic_gpi_component
from gpilib2.generic_gpi_assembly import generic_gpi_assembly
from typing import List, Optional, Union
from gpilib2.rpc import rpc
import numpy as np
[docs]class shutters(generic_gpi_assembly):
"""All GPI shutters
Args:
rpc (:py:class:`~gpilib2.rpc`):
rpc object. sim status and verbosity will be set
based on its settings.
Attributes:
rpc (:py:class:`~gpilib2.rpc`):
rpc object for communications
omss_shutter (:py:class:`~gpilib2.shutters.gpi_shuter`):
OMSS shutter
cal_entr (:py:class:`~gpilib2.shutters.gpi_shutter`):
cal entrance shutter
cal_exit (:py:class:`~gpilib2.shutters.gpi_shutter`):
cal exit shutter
cal_ref (:py:class:`~gpilib2.shutters.gpi_shutter`):
cal reference leg shutter
cal_sci (:py:class:`~gpilib2.shutters.gpi_shutter`):
cal science leg shutter
components (list):
List of all shutter objects. Because the CAL Entrance/Exit and Ref/Sci
shutters are paired for all inits, sims and datums, only one of each is
included.
"""
def __init__(self, rpc: rpc) -> None:
"""Define all components
Args:
rpc (:py:class:`~gpilib2.rpc`):
rpc object. sim status and verbosity will be set
based on its settings.
"""
generic_gpi_assembly.__init__(self, rpc)
self.omss = gpi_shutter(
rpc,
"gpSrcAssClientTester", # binary
0, # cmd
["-1", "-1"], # default cmd
0, # cmd args index
"tlc.srcAss.srcAssCmd_SHUTTERS.subCmd", # cmd gmb field
"tlc.srcAss.omssEntranceShutter", # curr state gmb field
"OMSS Entrance Shutter", # what I'm called
)
self.entrance = gpi_shutter(
rpc,
"gpCalAssClientTester", # binary
0, # cmd
["-1", "-1"], # default cmd
0, # cmd args index
"tlc.calAss.calAssCmd_CAL_EE_SHUTTERS.subCmd", # cmd gmb field
"tlc.calAss.entranceShutter", # curr state gmb field
"CAL Entrance Shutter", # what I'm called
)
self.exit = gpi_shutter(
rpc,
"gpCalAssClientTester", # binary
0, # cmd
["-1", "-1"], # default cmd
1, # cmd args index
"tlc.calAss.calAssCmd_CAL_EE_SHUTTERS.subCmd", # cmd gmb field
"tlc.calAss.exitShutter", # curr state gmb field
"CAL Exit Shutter", # what I'm called
)
self.ref = gpi_shutter(
rpc,
"gpCalAssClientTester", # binary
1, # cmd
["-1", "-1"], # default cmd
0, # cmd args index
"tlc.calAss.calAssCmd_CAL_REF_SCI_SHUTTERS.subCmd", # cmd gmb field
"tlc.calAss.refShutter", # curr state gmb field
"CAL Reference Shutter", # what I'm called
)
self.sci = gpi_shutter(
rpc,
"gpCalAssClientTester", # binary
1, # cmd
["-1", "-1"], # default cmd
1, # cmd args index
"tlc.calAss.calAssCmd_CAL_REF_SCI_SHUTTERS.subCmd", # cmd gmb field
"tlc.calAss.sciShutter", # curr state gmb field
"CAL Science Shutter", # what I'm called
)
# EE and RS are paired, so only need one of each
self.components = [self.omss, self.entrance, self.ref]
def __str__(self) -> str:
"""assemble all component strings"""
return "\n".join(
[
c.__str__()
for c in [self.omss, self.entrance, self.exit, self.ref, self.sci]
]
)
[docs] def move(
self,
omss: Optional[str] = None,
entrance: Optional[str] = None,
exit: Optional[str] = None,
ref: Optional[str] = None,
sci: Optional[str] = None,
queue: bool = False,
) -> None:
"""MOVE command for all shutters
Args:
omss (str or None):
OMSS setting ('open' or 'close') or None to leave as is.
entrance (str or None):
CAL entrance setting ('open' or 'close') or None to leave as is.
exit (str or None):
CAL exit setting ('open' or 'close') or None to leave as is.
ref (str or None):
CAL reference setting ('open' or 'close') or None to leave as is.
sci (str or None):
CAL science setting ('open' or 'close') or None to leave as is.
queue (bool):
If True, queue command rather than executing.
.. note::
All string inputs are case insensitive.
"""
opts = {"CLOSE": 0, "OPEN": 1}
states = []
for s in [omss, entrance, exit, ref, sci]:
if s:
assert s.upper() in opts, "Allowed inputs are: {}".format(
", ".join(opts.keys())
)
states.append(opts[s.upper()])
else:
states.append(-1)
# OMSS is by itself
if omss:
self.omss.move(states[0], queue=True)
# E/E and Ref/Sci handled in pairs
if np.any(np.array(states[1:3]) != -1):
self.entrance.move(states[1:3], queue=True)
if np.any(np.array(states[3:]) != -1):
self.ref.move(states[3:], queue=True)
if not queue:
self.rpc.execute_queue()
[docs]class gpi_shutter(generic_gpi_component):
"""GPI shutter
Args:
rpc (:py:class:`~gpilib2.rpc`):
rpc object. sim status and verbosity will be set
based on its settings.
binary (str):
binary name
move_cmd (int):
Command number in binary to use
default_cmd_args (list):
Default argument sets to use for inits and datums and sims
cmd_args_index (int):
Index in default command list where actual shutter comand goes
move_gmb_field (str):
GMB field prefix corresponding to the move command
curr_state_gmb_field (str):
GMB field of current shutter state
name (str):
Name of this shutter
server (str):
Server address to send commands to.
Attributes:
rpc (:py:class:`~gpilib2.rpc`):
rpc object for communications
server (str):
Server address to send commands to.
binary (str):
binary name
move_cmd (int):
Command number in binary to use
default_cmd_args (list):
Default argument sets to use for inits and datums and sims
cmd_args_index (int):
Index in default command list where actual shutter comand goes
move_gmb_field (str):
GMB field prefix corresponding to the move command
curr_state_gmb_field (str):
GMB field of current shutter state
name (str):
Name of this shutter
"""
def __init__(
self,
rpc: rpc,
binary: str,
move_cmd: int,
default_cmd_args: List[str],
cmd_args_index: int,
move_gmb_field: str,
curr_state_gmb_field: str,
name: str,
server: str = "tlc",
) -> None:
""" Set up the shutter """
generic_gpi_component.__init__(
self,
rpc,
binary,
move_cmd,
default_cmd_args,
move_gmb_field,
curr_state_gmb_field,
name=name,
server=server,
)
self.cmd_args_index = cmd_args_index
def __str__(self) -> str:
"""Returns shutter status string"""
vals = self.rpc.read_gmb_values(
[
self.curr_state_gmb_field,
".".join(self.move_gmb_field.split(".")[:-1]) + ".simulate",
]
).astype(int)
name = self.name
if int(vals[1]) == 1:
name += " (sim)"
return "{0: >30}: {1}".format(name, np.array(["Closed", "Open"])[vals[0]])
[docs] def move(self, state: Union[List[int], int], queue: bool = False) -> None:
"""Shutter move command
Args:
state (int or [int, int]):
0 for close, 1 for open. The list input is for toggling both E/E or
ref/sci shutters at the same time.
queue (bool):
If True, queue command rather than executing.
"""
if isinstance(state, list):
cmdlist = np.array(state).astype(str).tolist()
else:
# check where we are
curr_state = self.rpc.read_gmb_values(self.curr_state_gmb_field).astype(
int
)[0]
if curr_state == state:
print(
"{} is already {}.".format(
self.name, np.array(["closed", "open"])[state]
)
)
return
cmdlist = self.default_cmd_args.copy()
cmdlist[self.cmd_args_index] = "{}".format(state)
cmdlist = [
self.rpc.binaries[self.binary],
self.server,
"{}".format(self.move_cmd),
"{}".format(self.rpc.activityId),
"{}".format(self.rpc.activity["START"]),
"{}".format(self.rpc.ass_mode["MOVE"]),
"{}".format(self.rpc.move_level),
] + cmdlist
# Execute or queue
self.rpc.execute(cmdlist, self.move_gmb_field, queue=queue)
[docs] def open(self, queue: bool = False) -> None:
"""Open a shutter
Args:
queue (bool):
If True, queue command rather than executing.
"""
self.move(1, queue=queue)
[docs] def close(self, queue: bool = False) -> None:
"""Open a shutter
Args:
queue (bool):
If True, queue command rather than executing.
"""
self.move(0, queue=queue)