import os
import re
from typing import Union, Any, List, Optional
import warnings
from gpilib2.generic_gpi_component import generic_gpi_component
from gpilib2.generic_gpi_assembly import generic_gpi_assembly
from gpilib2.rpc import rpc
from gpilib2.util import get_tlc_configdir
import numpy.typing as npt
[docs]class coronagraph(generic_gpi_assembly):
"""All GPI coronagraph components (apodizer, focal-plane mask, lyot stop)
Args:
rpc (:py:class:`~gpilib2.rpc`):
rpc object. sim status and verbosity will be set
based on its settings.
Attributes:
rpc (:py:class:`~gpilib2.rpc`):
rpc object for communications
fpm (:py:class:`~gpilib2.coronagraph.fpm`):
fpm object
apodizer (:py:class:`~gpilib2.coronagraph.apodizer`):
apodizer object
lyot (:py:class:`~gpilib2.coronagraph.lyot`):
lyot object
components (list):
List of apodizer, fpm, lyot (in **that** order)
"""
def __init__(self, rpc: rpc) -> None:
"""Define all components
Args:
rpc (gpilib2.rpc):
rpc object. sim status and verbosity will be set
based on its settings.
"""
generic_gpi_assembly.__init__(self, rpc)
self.fpm = fpm(self.rpc)
self.apodizer = apodizer(self.rpc)
self.lyot = lyot(self.rpc)
self.components = [self.apodizer, self.fpm, self.lyot]
[docs] def move(
self,
apodizer: Union[str, None] = None,
fpm: Union[str, None] = None,
lyot: Union[str, None] = None,
queue: bool = False,
) -> None:
"""Move some or all coronagraph components to specific masks
For override positions and other options, use the component-specific ``move``
method.
Args:
apodizer (str or None):
Apodizer mask name
fpm (str or None):
FPM mask name
lyot (str or none):
Lyot mask name
queue (bool):
If True, queue commands rather than executing.
.. note::
**End** of mask name must match exact mask string and must be a unique
match.
"""
# Try to queue all given move commands. If any results in error, empty the
# queue
any_post_moves = False
try:
for (s, c) in zip([apodizer, fpm, lyot], self.components):
if s:
c.move(s, queue=True, noqueuewarning=True)
any_post_moves = any_post_moves or c.post_move_required
except (AssertionError, RuntimeError, TypeError, NameError):
self.rpc.clear_queue()
raise
# Run!
if not (queue):
self.rpc.execute_queue()
# Cleanup as needed
for (s, c) in zip([apodizer, fpm, lyot], self.components):
if s and c.post_move_required:
c.post_move_cleanup()
else:
if any_post_moves:
warnings.warn("You have queued a command requring post-move actions")
[docs]class coronagraph_component(generic_gpi_component):
"""Generic GPI coronagraph component
Args:
rpc (gpilib2.rpc):
rpc object. sim status and verbosity will be set
based on its settings.
binary (str):
binary name
move_cmd (int):
Command number in binary to use
default_cmd_args (list):
Default argument sets to use for inits and datums and sims
move_gmb_field (str):
GMB field prefix corresponding to the move command
configfile (str):
Name of relevant assembly configuration where mask names can be found
maskstr (str):
How mask names are identified in the assembly config file
curr_state_gmb_field (str):
GMB field where current mask value is stored
motor_position_gmb_fields (list):
GMB fields with current filterwheel/slide position values. Must be in the
same order as these arguments are passed to the move command.
override_mask_name (str):
Mask name to use when setting override motor positions.
name (str):
Component name
post_move_required (bool):
Set to True if there are mandatory actions to be completed after the end of
a successful MOVE command. Specific actions are in self.post_move_cleanup()
server (str):
Server address to send commands to.
mcd_name (:py:data:`~numpy.typing.ArrayLike`, optional):
Label(s) of :term:`MCD` axis for this component. If None (default),
assume that device is not on an MCD.
Attributes:
rpc (:py:class:`~gpilib2.rpc`):
rpc object for communications
masks (list):
List of (str) mask names
configfile (str):
Name of relevant assembly configuration where mask names can be found
maskstr (str):
How mask names are identified in the assembly config file
binary (str):
Which binary to use to move the wheel
move_gmb_field (str):
GMB field prefix corresponding to the move command
curr_state_gmb_field (str):
GMB field where current mask value is stored
server (str):
Server address to send commands to.
move_cmd (int):
Command number in binary to use
default_cmd_args (list):
Default argument sets to use for inits and datums
motor_position_gmb_fields (list):
GMB fields with current filterwheel/slide position values. Must be in the
same order as these arguments are passed to the move command.
post_move_required (bool):
Set to True if there are mandatory actions to be completed after the end of
a successful MOVE command. Specific actions are in self.post_move_cleanup()
override_mask_name (str):
Mask name to use when setting override motor positions.
name (str):
Component name
mcd_name (~numpy.ndarray(str), optional):
Label(s) of :term:`MCD` axis/axes for this component. If None, assume that
device is not on an MCD.
mcd_inds (~numpy.ndarray(int), optional):
Indices of entries in :py:attr:`~gpilib2.rpc.rpc.mcdaxisnames`
corresponding to the entries of mcd_name. None if mcd_name is None.
.. warning::
Mask names are read from the specified configuration files in the config dir.
Component-specific strings that define which config file to use, and how to
find the mask name are set by the ``configfile`` and ``maskstr`` attributes.
.. note::
The FPM and apodizer (PPM) have their own assemblies, but the Lyot masks are
defined in the IFS assembly.
"""
def __init__(
self,
rpc: rpc,
binary: str,
move_cmd: int,
default_cmd_args: List[str],
move_gmb_field: str,
configfile: str,
maskstr: str,
curr_state_gmb_field: str,
motor_position_gmb_fields: List[str],
override_mask_name: str,
name: str,
post_move_required: bool = False,
server: str = "tlc",
mcd_name: Optional[npt.ArrayLike] = None,
) -> None:
"""Read the relevant assembly configuration file and extract all mask names"""
generic_gpi_component.__init__(
self,
rpc,
binary,
move_cmd,
default_cmd_args,
move_gmb_field,
curr_state_gmb_field,
name=name,
server=server,
mcd_name=mcd_name,
)
self.configfile = configfile
self.maskstr = maskstr
self.motor_position_gmb_fields = motor_position_gmb_fields
self.post_move_required = False
self.override_mask_name = '""'
# Grab configuration file
configdir = get_tlc_configdir()
configfile = os.path.join(configdir, self.configfile)
assert os.path.exists(configfile), "Cannot locate file {}".format(configfile)
with open(configfile, "r") as f:
config = f.readlines()
# Remove all comment lines and then filter for the masks strings
config = [ll for ll in config if not ll.startswith("#")]
maskp = re.compile(r"{}\s+(\S+)\s+".format(self.maskstr))
masks = []
for ll in config:
tmp = maskp.search(ll)
if tmp:
masks.append(tmp.group(1))
self.masks = masks
def __str__(self) -> str:
"""Print current mask status"""
vals = self.rpc.read_gmb_values(
[
self.curr_state_gmb_field,
".".join(self.move_gmb_field.split(".")[:-1]) + ".simulate",
]
+ self.motor_position_gmb_fields
)
name = self.name
if int(vals[1]) == 1:
name += " (sim)"
return "{0: <15} Mask: {1: <12} Motor: {2}".format(
name, vals[0], vals[2:].astype(float)
)
[docs] def match_mask_name(self, mask: str) -> str:
"""Return unique matching mask string
Args:
mask (str):
Requested mask. This must match the *ending* of the full mask string
exactly. Case insensitive.
Returns:
str:
The exact mask string.
Notes:
For apodizers, all final characters are unique, so matching is guaranteed.
For FPM, there is mild ambiguity between 50umPIN and Open, so multiple
characters are required. Lyots are most annoying with 080m12_04, 080_04, and
080m12_04_c. Using full maks names is safest in this case. Mask names can
always be checked by printing self.masks.
"""
maskstr = list(
filter(re.compile(r"{}$".format(mask), re.IGNORECASE).search, self.masks)
)
assert (
len(maskstr) == 1
), "Could not get unique match for {}. Matched: {}".format(mask, maskstr)
return maskstr[0]
[docs] def move(
self,
mask: Union[str, None] = None,
rot: Union[float, None] = None,
force: bool = False,
rel: bool = False,
queue: bool = False,
noqueuewarning: bool = False,
) -> None:
"""Send MOVE command
Args:
mask (str or None):
Mask to move to (or none for override).
rot (number or None):
Rotation override in degrees
force (bool):
Force move command even if current mask string matches requested mask
rel (bool):
Treat rotation and x-offset as relative to current values
queue (bool):
Queue command rather than immediately executing
noqueuewarning (bool):
If the specific MOVE has required cleanup actions (i.e.,
self.post_move_required is True) then a warning will be generated when
queue = True, unless noqueuewarning is also set to True. Default False.
Returns:
None
.. warning::
At least one of mask or rot must be set. If the mask string is not
None, the rotation override will be ignored.
"""
self.check_datum()
assert (mask is not None) or (
rot is not None
), "One of: mask or rot must be set."
if (mask is not None) and (rot is not None):
warnings.warn("When mask is set, rot inputs are ignored.")
# Default overrides
rotationOverride = 0.0
if mask is not None: # Named Mask
targetMaskStr = self.match_mask_name(mask)
if not force:
current_mask = self.rpc.read_gmb_values(self.curr_state_gmb_field)
if current_mask == targetMaskStr:
print("Already at requested mask: {}".format(targetMaskStr))
return
else: # Override
targetMaskStr = self.override_mask_name
rotationOverride = float(rot) # type: ignore
if rel:
current_values = self.rpc.read_gmb_values(
self.motor_position_gmb_fields
).astype(float)
rotationOverride += current_values[0]
cmdlist = [
self.rpc.binaries[self.binary],
self.server,
"{}".format(self.move_cmd),
"{}".format(self.rpc.activityId), # activity id
"{}".format(self.rpc.activity["START"]), # activity directive
"{}".format(self.rpc.ass_mode["MOVE"]), # mode
"{}".format(self.rpc.move_level), # level
"{}".format(rotationOverride),
targetMaskStr,
]
# Execute or queue
self.rpc.execute(cmdlist, self.move_gmb_field, queue=queue)
if self.post_move_required:
if not (queue):
self.post_move_cleanup()
else:
if not (noqueuewarning):
warnings.warn(
"You have queued a command requring post-move actions"
)
[docs] def post_move_cleanup(self, **kwargs: Any) -> None:
"""Execute anything required post-move"""
pass
[docs]class fpm(coronagraph_component):
"""Focal-Plane Mask"""
def __init__(self, rpc: rpc) -> None:
# FPM-specific values:
coronagraph_component.__init__(
self,
rpc,
"gpFpmAssClientTester", # binary
0, # move cmd - this is the only command in the FPM assembly
["0", "Open"], # default cmd args
"tlc.fpmAss.fpmAssCmd_FPM.subCmd", # move gmb field
"CONFIG.FpmAssembly", # config file
"FPM_FILTER_POS", # mask string in config file
"tlc.fpmAss.maskStr", # mask gmb field
["tlc.fpmAss.maskMotorPosition"], # motor position gmb fields
'""', # msk override string
"FPM", # name
post_move_required=False,
mcd_name="FPM_ROT",
)
# to avoid confusion:
if "FPM_SameAsH" in self.masks:
self.masks.remove("FPM_SameAsH")
# TODO: need to add CAL obsconfig after MOVE
[docs]class apodizer(coronagraph_component):
"""Apodizer"""
def __init__(self, rpc: rpc):
# Apodizer-specific values:
coronagraph_component.__init__(
self,
rpc,
"gpPpmAssClientTester", # binary
0, # move cmd - this is the only command in the PPM assembly
["CLEAR", "0.0", "0.0"], # default cmd args
"tlc.ppmAss.ppmAssCmd_PPM.subCmd", # move gmb field
"CONFIG.PpmAssembly", # config file
"PPM_FILTER_POS", # mask string in config file
"tlc.ppmAss.maskStr", # mask gmb field
[
"tlc.ppmAss.maskMotorPosition",
"tlc.ppmAss.xOffsetMotorPosition",
], # motor position gmb fields
'""', # msk override string
"Apodizer", # name
post_move_required=False,
mcd_name=["PPM_LINEAR", "PPM_WHEEL"], # mcd axes
)
# to avoid confusion:
if "CAL50umPIN" in self.masks:
self.masks.remove("CAL50umPIN")
[docs] def move( # type: ignore
self,
mask: Union[str, None] = None,
rot: Union[float, None] = None,
xoff: Union[float, None] = None,
force: bool = False,
rel: bool = False,
queue: bool = False,
noqueuewarning: bool = False,
) -> None:
"""Send MOVE command
Args:
mask (str or None):
Mask to move to (or none for override).
rot (number or None):
Rotation override in degrees
xoff (number or None):
x-Offset override in mm
force (bool):
Force move command even if current mask string matches requested mask
rel (bool):
Treat rotation and x-offset as relative to current values
queue (bool):
Queue command rather than immediately executing
noqueuewarning (bool):
If the specific MOVE has required cleanup actions (i.e.,
self.post_move_required is True) then a warning will be generated when
queue = True, unless noqueuewarning is also set to True. Default False.
Returns:
None
Notes:
At least one of mask, rot, or xoff must be set. If the mask string is not
None, the offsets will be ignored.
"""
self.check_datum()
assert (
(mask is not None) or (xoff is not None) or (rot is not None)
), "One of: mask, rot, xoff must be set."
if (mask is not None) and ((xoff is not None) or (rot is not None)):
warnings.warn("When mask is set, rot and xoff inputs are ignored")
# Default overrides
rotationOverride = 0.0
xOffsetOverride = 0.0
if mask is not None: # Named Mask
targetMaskStr = self.match_mask_name(mask)
if not force:
current_mask = self.rpc.read_gmb_values(self.curr_state_gmb_field)
if current_mask == targetMaskStr:
print("Already at requested mask: {}".format(targetMaskStr))
return
else: # Override
targetMaskStr = self.override_mask_name
if rot is not None:
rotationOverride = float(rot)
if xoff is not None:
xOffsetOverride = float(xoff)
if rel:
current_values = self.rpc.read_gmb_values(
self.motor_position_gmb_fields
).astype(float)
rotationOverride += current_values[0]
xOffsetOverride += current_values[1]
cmdlist = [
self.rpc.binaries[self.binary], # Binary
self.server,
"{}".format(self.move_cmd), # command number
"{}".format(self.rpc.activityId), # activity id
"{}".format(self.rpc.activity["START"]), # activity directive
"{}".format(self.rpc.ass_mode["MOVE"]), # mode
"{}".format(self.rpc.move_level), # level
targetMaskStr,
"{}".format(rotationOverride),
"{}".format(xOffsetOverride),
]
self.rpc.execute(cmdlist, self.move_gmb_field, queue=queue)
if self.post_move_required:
if not (queue):
self.post_move_cleanup()
else:
if not (noqueuewarning):
warnings.warn(
"You have queued a command requring post-move actions"
)
# TODO: if implementing internal defaults, need to update maskstring in GMB
[docs]class lyot(coronagraph_component):
"""Lyot stop"""
def __init__(self, rpc: rpc):
# Lyot-specific values:
coronagraph_component.__init__(
self,
rpc,
"gpIfsAssClientTester", # binary
2, # move cmd - MT_LYOT_MASK
["0", "Blank"], # default cmd args
"tlc.ifsAss.ifsAssCmd_LYOT_MASK.subCmd", # move gmb field
"CONFIG.IfsAssembly", # config file
"LYOT_MASK_POS", # mask string in config file
"tlc.ifsAss.lyotMaskStr", # mask gmb field
["tlc.ifsAss.lyotMaskMotorPosition"], # motor position gmb fields
"", # msk override string
"Lyot Stop", # name
post_move_required=False,
mcd_name="LyotWheel", # MCD axis
)
# TODO: Need to check and start/stop cal-ifs pnc tracking.
# PNC offsets for new Lyot mask are only applied when tracking