Source code for gpilib2.coronagraph

import os
import re
from typing import Union, Any, List, Optional
import warnings
from gpilib2.generic_gpi_component import generic_gpi_component
from gpilib2.generic_gpi_assembly import generic_gpi_assembly
from gpilib2.rpc import rpc
from gpilib2.util import get_tlc_configdir
import numpy.typing as npt


[docs]class coronagraph(generic_gpi_assembly): """All GPI coronagraph components (apodizer, focal-plane mask, lyot stop) Args: rpc (:py:class:`~gpilib2.rpc`): rpc object. sim status and verbosity will be set based on its settings. Attributes: rpc (:py:class:`~gpilib2.rpc`): rpc object for communications fpm (:py:class:`~gpilib2.coronagraph.fpm`): fpm object apodizer (:py:class:`~gpilib2.coronagraph.apodizer`): apodizer object lyot (:py:class:`~gpilib2.coronagraph.lyot`): lyot object components (list): List of apodizer, fpm, lyot (in **that** order) """ def __init__(self, rpc: rpc) -> None: """Define all components Args: rpc (gpilib2.rpc): rpc object. sim status and verbosity will be set based on its settings. """ generic_gpi_assembly.__init__(self, rpc) self.fpm = fpm(self.rpc) self.apodizer = apodizer(self.rpc) self.lyot = lyot(self.rpc) self.components = [self.apodizer, self.fpm, self.lyot]
[docs] def move( self, apodizer: Union[str, None] = None, fpm: Union[str, None] = None, lyot: Union[str, None] = None, queue: bool = False, ) -> None: """Move some or all coronagraph components to specific masks For override positions and other options, use the component-specific ``move`` method. Args: apodizer (str or None): Apodizer mask name fpm (str or None): FPM mask name lyot (str or none): Lyot mask name queue (bool): If True, queue commands rather than executing. .. note:: **End** of mask name must match exact mask string and must be a unique match. """ # Try to queue all given move commands. If any results in error, empty the # queue any_post_moves = False try: for (s, c) in zip([apodizer, fpm, lyot], self.components): if s: c.move(s, queue=True, noqueuewarning=True) any_post_moves = any_post_moves or c.post_move_required except (AssertionError, RuntimeError, TypeError, NameError): self.rpc.clear_queue() raise # Run! if not (queue): self.rpc.execute_queue() # Cleanup as needed for (s, c) in zip([apodizer, fpm, lyot], self.components): if s and c.post_move_required: c.post_move_cleanup() else: if any_post_moves: warnings.warn("You have queued a command requring post-move actions")
[docs]class coronagraph_component(generic_gpi_component): """Generic GPI coronagraph component Args: rpc (gpilib2.rpc): rpc object. sim status and verbosity will be set based on its settings. binary (str): binary name move_cmd (int): Command number in binary to use default_cmd_args (list): Default argument sets to use for inits and datums and sims move_gmb_field (str): GMB field prefix corresponding to the move command configfile (str): Name of relevant assembly configuration where mask names can be found maskstr (str): How mask names are identified in the assembly config file curr_state_gmb_field (str): GMB field where current mask value is stored motor_position_gmb_fields (list): GMB fields with current filterwheel/slide position values. Must be in the same order as these arguments are passed to the move command. override_mask_name (str): Mask name to use when setting override motor positions. name (str): Component name post_move_required (bool): Set to True if there are mandatory actions to be completed after the end of a successful MOVE command. Specific actions are in self.post_move_cleanup() server (str): Server address to send commands to. mcd_name (:py:data:`~numpy.typing.ArrayLike`, optional): Label(s) of :term:`MCD` axis for this component. If None (default), assume that device is not on an MCD. Attributes: rpc (:py:class:`~gpilib2.rpc`): rpc object for communications masks (list): List of (str) mask names configfile (str): Name of relevant assembly configuration where mask names can be found maskstr (str): How mask names are identified in the assembly config file binary (str): Which binary to use to move the wheel move_gmb_field (str): GMB field prefix corresponding to the move command curr_state_gmb_field (str): GMB field where current mask value is stored server (str): Server address to send commands to. move_cmd (int): Command number in binary to use default_cmd_args (list): Default argument sets to use for inits and datums motor_position_gmb_fields (list): GMB fields with current filterwheel/slide position values. Must be in the same order as these arguments are passed to the move command. post_move_required (bool): Set to True if there are mandatory actions to be completed after the end of a successful MOVE command. Specific actions are in self.post_move_cleanup() override_mask_name (str): Mask name to use when setting override motor positions. name (str): Component name mcd_name (~numpy.ndarray(str), optional): Label(s) of :term:`MCD` axis/axes for this component. If None, assume that device is not on an MCD. mcd_inds (~numpy.ndarray(int), optional): Indices of entries in :py:attr:`~gpilib2.rpc.rpc.mcdaxisnames` corresponding to the entries of mcd_name. None if mcd_name is None. .. warning:: Mask names are read from the specified configuration files in the config dir. Component-specific strings that define which config file to use, and how to find the mask name are set by the ``configfile`` and ``maskstr`` attributes. .. note:: The FPM and apodizer (PPM) have their own assemblies, but the Lyot masks are defined in the IFS assembly. """ def __init__( self, rpc: rpc, binary: str, move_cmd: int, default_cmd_args: List[str], move_gmb_field: str, configfile: str, maskstr: str, curr_state_gmb_field: str, motor_position_gmb_fields: List[str], override_mask_name: str, name: str, post_move_required: bool = False, server: str = "tlc", mcd_name: Optional[npt.ArrayLike] = None, ) -> None: """Read the relevant assembly configuration file and extract all mask names""" generic_gpi_component.__init__( self, rpc, binary, move_cmd, default_cmd_args, move_gmb_field, curr_state_gmb_field, name=name, server=server, mcd_name=mcd_name, ) self.configfile = configfile self.maskstr = maskstr self.motor_position_gmb_fields = motor_position_gmb_fields self.post_move_required = False self.override_mask_name = '""' # Grab configuration file configdir = get_tlc_configdir() configfile = os.path.join(configdir, self.configfile) assert os.path.exists(configfile), "Cannot locate file {}".format(configfile) with open(configfile, "r") as f: config = f.readlines() # Remove all comment lines and then filter for the masks strings config = [ll for ll in config if not ll.startswith("#")] maskp = re.compile(r"{}\s+(\S+)\s+".format(self.maskstr)) masks = [] for ll in config: tmp = maskp.search(ll) if tmp: masks.append(tmp.group(1)) self.masks = masks def __str__(self) -> str: """Print current mask status""" vals = self.rpc.read_gmb_values( [ self.curr_state_gmb_field, ".".join(self.move_gmb_field.split(".")[:-1]) + ".simulate", ] + self.motor_position_gmb_fields ) name = self.name if int(vals[1]) == 1: name += " (sim)" return "{0: <15} Mask: {1: <12} Motor: {2}".format( name, vals[0], vals[2:].astype(float) )
[docs] def match_mask_name(self, mask: str) -> str: """Return unique matching mask string Args: mask (str): Requested mask. This must match the *ending* of the full mask string exactly. Case insensitive. Returns: str: The exact mask string. Notes: For apodizers, all final characters are unique, so matching is guaranteed. For FPM, there is mild ambiguity between 50umPIN and Open, so multiple characters are required. Lyots are most annoying with 080m12_04, 080_04, and 080m12_04_c. Using full maks names is safest in this case. Mask names can always be checked by printing self.masks. """ maskstr = list( filter(re.compile(r"{}$".format(mask), re.IGNORECASE).search, self.masks) ) assert ( len(maskstr) == 1 ), "Could not get unique match for {}. Matched: {}".format(mask, maskstr) return maskstr[0]
[docs] def move( self, mask: Union[str, None] = None, rot: Union[float, None] = None, force: bool = False, rel: bool = False, queue: bool = False, noqueuewarning: bool = False, ) -> None: """Send MOVE command Args: mask (str or None): Mask to move to (or none for override). rot (number or None): Rotation override in degrees force (bool): Force move command even if current mask string matches requested mask rel (bool): Treat rotation and x-offset as relative to current values queue (bool): Queue command rather than immediately executing noqueuewarning (bool): If the specific MOVE has required cleanup actions (i.e., self.post_move_required is True) then a warning will be generated when queue = True, unless noqueuewarning is also set to True. Default False. Returns: None .. warning:: At least one of mask or rot must be set. If the mask string is not None, the rotation override will be ignored. """ self.check_datum() assert (mask is not None) or ( rot is not None ), "One of: mask or rot must be set." if (mask is not None) and (rot is not None): warnings.warn("When mask is set, rot inputs are ignored.") # Default overrides rotationOverride = 0.0 if mask is not None: # Named Mask targetMaskStr = self.match_mask_name(mask) if not force: current_mask = self.rpc.read_gmb_values(self.curr_state_gmb_field) if current_mask == targetMaskStr: print("Already at requested mask: {}".format(targetMaskStr)) return else: # Override targetMaskStr = self.override_mask_name rotationOverride = float(rot) # type: ignore if rel: current_values = self.rpc.read_gmb_values( self.motor_position_gmb_fields ).astype(float) rotationOverride += current_values[0] cmdlist = [ self.rpc.binaries[self.binary], self.server, "{}".format(self.move_cmd), "{}".format(self.rpc.activityId), # activity id "{}".format(self.rpc.activity["START"]), # activity directive "{}".format(self.rpc.ass_mode["MOVE"]), # mode "{}".format(self.rpc.move_level), # level "{}".format(rotationOverride), targetMaskStr, ] # Execute or queue self.rpc.execute(cmdlist, self.move_gmb_field, queue=queue) if self.post_move_required: if not (queue): self.post_move_cleanup() else: if not (noqueuewarning): warnings.warn( "You have queued a command requring post-move actions" )
[docs] def post_move_cleanup(self, **kwargs: Any) -> None: """Execute anything required post-move""" pass
[docs]class fpm(coronagraph_component): """Focal-Plane Mask""" def __init__(self, rpc: rpc) -> None: # FPM-specific values: coronagraph_component.__init__( self, rpc, "gpFpmAssClientTester", # binary 0, # move cmd - this is the only command in the FPM assembly ["0", "Open"], # default cmd args "tlc.fpmAss.fpmAssCmd_FPM.subCmd", # move gmb field "CONFIG.FpmAssembly", # config file "FPM_FILTER_POS", # mask string in config file "tlc.fpmAss.maskStr", # mask gmb field ["tlc.fpmAss.maskMotorPosition"], # motor position gmb fields '""', # msk override string "FPM", # name post_move_required=False, mcd_name="FPM_ROT", ) # to avoid confusion: if "FPM_SameAsH" in self.masks: self.masks.remove("FPM_SameAsH")
# TODO: need to add CAL obsconfig after MOVE
[docs]class apodizer(coronagraph_component): """Apodizer""" def __init__(self, rpc: rpc): # Apodizer-specific values: coronagraph_component.__init__( self, rpc, "gpPpmAssClientTester", # binary 0, # move cmd - this is the only command in the PPM assembly ["CLEAR", "0.0", "0.0"], # default cmd args "tlc.ppmAss.ppmAssCmd_PPM.subCmd", # move gmb field "CONFIG.PpmAssembly", # config file "PPM_FILTER_POS", # mask string in config file "tlc.ppmAss.maskStr", # mask gmb field [ "tlc.ppmAss.maskMotorPosition", "tlc.ppmAss.xOffsetMotorPosition", ], # motor position gmb fields '""', # msk override string "Apodizer", # name post_move_required=False, mcd_name=["PPM_LINEAR", "PPM_WHEEL"], # mcd axes ) # to avoid confusion: if "CAL50umPIN" in self.masks: self.masks.remove("CAL50umPIN")
[docs] def move( # type: ignore self, mask: Union[str, None] = None, rot: Union[float, None] = None, xoff: Union[float, None] = None, force: bool = False, rel: bool = False, queue: bool = False, noqueuewarning: bool = False, ) -> None: """Send MOVE command Args: mask (str or None): Mask to move to (or none for override). rot (number or None): Rotation override in degrees xoff (number or None): x-Offset override in mm force (bool): Force move command even if current mask string matches requested mask rel (bool): Treat rotation and x-offset as relative to current values queue (bool): Queue command rather than immediately executing noqueuewarning (bool): If the specific MOVE has required cleanup actions (i.e., self.post_move_required is True) then a warning will be generated when queue = True, unless noqueuewarning is also set to True. Default False. Returns: None Notes: At least one of mask, rot, or xoff must be set. If the mask string is not None, the offsets will be ignored. """ self.check_datum() assert ( (mask is not None) or (xoff is not None) or (rot is not None) ), "One of: mask, rot, xoff must be set." if (mask is not None) and ((xoff is not None) or (rot is not None)): warnings.warn("When mask is set, rot and xoff inputs are ignored") # Default overrides rotationOverride = 0.0 xOffsetOverride = 0.0 if mask is not None: # Named Mask targetMaskStr = self.match_mask_name(mask) if not force: current_mask = self.rpc.read_gmb_values(self.curr_state_gmb_field) if current_mask == targetMaskStr: print("Already at requested mask: {}".format(targetMaskStr)) return else: # Override targetMaskStr = self.override_mask_name if rot is not None: rotationOverride = float(rot) if xoff is not None: xOffsetOverride = float(xoff) if rel: current_values = self.rpc.read_gmb_values( self.motor_position_gmb_fields ).astype(float) rotationOverride += current_values[0] xOffsetOverride += current_values[1] cmdlist = [ self.rpc.binaries[self.binary], # Binary self.server, "{}".format(self.move_cmd), # command number "{}".format(self.rpc.activityId), # activity id "{}".format(self.rpc.activity["START"]), # activity directive "{}".format(self.rpc.ass_mode["MOVE"]), # mode "{}".format(self.rpc.move_level), # level targetMaskStr, "{}".format(rotationOverride), "{}".format(xOffsetOverride), ] self.rpc.execute(cmdlist, self.move_gmb_field, queue=queue) if self.post_move_required: if not (queue): self.post_move_cleanup() else: if not (noqueuewarning): warnings.warn( "You have queued a command requring post-move actions" )
# TODO: if implementing internal defaults, need to update maskstring in GMB
[docs]class lyot(coronagraph_component): """Lyot stop""" def __init__(self, rpc: rpc): # Lyot-specific values: coronagraph_component.__init__( self, rpc, "gpIfsAssClientTester", # binary 2, # move cmd - MT_LYOT_MASK ["0", "Blank"], # default cmd args "tlc.ifsAss.ifsAssCmd_LYOT_MASK.subCmd", # move gmb field "CONFIG.IfsAssembly", # config file "LYOT_MASK_POS", # mask string in config file "tlc.ifsAss.lyotMaskStr", # mask gmb field ["tlc.ifsAss.lyotMaskMotorPosition"], # motor position gmb fields "", # msk override string "Lyot Stop", # name post_move_required=False, mcd_name="LyotWheel", # MCD axis )
# TODO: Need to check and start/stop cal-ifs pnc tracking. # PNC offsets for new Lyot mask are only applied when tracking