Source code for gpilib2.source

from typing import Union, List, Optional
from gpilib2.generic_gpi_component import generic_gpi_component
from gpilib2.generic_gpi_assembly import generic_gpi_assembly
from numbers import Number
import numpy as np
from gpilib2.rpc import rpc
import numpy.typing as npt


[docs]class source(generic_gpi_assembly): """All GPI sources (everything in source assembly other than OMSS shutter) Args: rpc (:py:class:`~gpilib2.rpc`): rpc object. sim status and verbosity will be set based on its settings. Attributes: rpc (:py:class:`~gpilib2.rpc`): rpc object for communications asu (:py:class:`~gpilib2.source.asu`): asu object cal_sphere (:py:class:`~gpilib2.source.cal_sphere`): sphere object components (list): List of ASU and CAL_SPHERE (in **that** order) """ def __init__(self, rpc: rpc) -> None: """Define all components Args: rpc (gpilib2.rpc): rpc object. sim status and verbosity will be set based on its settings. """ generic_gpi_assembly.__init__(self, rpc) self.asu = asu(self.rpc) self.cal_sphere = cal_sphere(self.rpc) self.components = [self.asu, self.cal_sphere]
[docs] def move( self, asu_pos: Union[str, List[float], None] = None, sc_pow: Union[float, None] = None, sc_att: Union[float, None] = None, rel: bool = False, ir_on: Union[bool, None] = None, vis_on: Union[bool, None] = None, sphere_pos: Union[str, float, None] = None, sphere_att: Union[float, None] = None, queue: bool = False, ) -> None: """Source MOVE command Args: asu_pos (str or list or None): 'in', 'out', 'cal_bore', 'omss_bore' or 2-element list of [Y,X] motor positions. Current position retained if None. If string, case insensitive sc_pow (number or None): Supercontinuum source power (0 to 100). Retain current value if None. sc_att (number or None): Supercontinuum source attenutation (2.2 to 60). Retain current value if None. rel (bool): Treat pos inputs as relative to current position (ignored if pos is str or None). Default False. ir_on (bool or None): Turn on ir laser is True, turn off if False, and retain current state if None. vis_on (bool or None): Turn on ir laser is True, turn off if False, and retain current state sphere_pos (str or number or None): 'in', 'out' or override position value (number). Current position retained if None. If string, case insensitive sphere_att (number or None): Sphere attenutation (2.2 to 60). Retain current value if None. if None. queue (bool): If True, queue commands rather than executing. Returns: None .. warning:: The ``rel`` input will be applied to both ASU and CAL SPHERE moves. If you wish to have more granular control, use the move methods on the two sub-classes individualls. """ doing_something = False if ( (asu_pos is not None) or (sc_pow is not None) or (sc_att is not None) or (ir_on is not None) or (vis_on is not None) ): try: self.asu.move( asu_pos=asu_pos, sc_pow=sc_pow, sc_att=sc_att, rel=rel, ir_on=ir_on, vis_on=vis_on, queue=True, ) doing_something = True except (AssertionError, RuntimeError, TypeError, NameError): self.rpc.clear_queue() raise if (sphere_pos is not None) or (sphere_att is not None): try: self.cal_sphere.move( sphere_pos=sphere_pos, sphere_att=sphere_att, rel=rel, queue=True ) doing_something = True except (AssertionError, RuntimeError, TypeError, NameError): self.rpc.clear_queue() raise if not (queue) and doing_something: self.rpc.execute_queue()
[docs]class source_component(generic_gpi_component): """Base class for ASU and CAL_SPHERE Args: rpc (:py:class:`~gpilib2.rpc`): rpc object. sim status and verbosity will be set based on its settings. binary (str): binary name move_cmd (int): Command number in binary to use default_cmd_args (list): Default argument sets to use for inits and datums and sims move_gmb_field (str): GMB field prefix corresponding to the move command curr_state_gmb_field (str): GMB field of current state of component name (str): Component name motor_position_gmb_fields (list): GMB fields with current filterwheel/slide position values. Must be in the same order as these arguments are passed to the move command. server (str): Server address to send commands to. mcd_name (:py:data:`~numpy.typing.ArrayLike`, optional): Label(s) of :term:`MCD` axis for this component. If None (default), assume that device is not on an MCD. Attributes: rpc (:py:class:`~gpilib2.rpc`): rpc object for communications server (str): Server address to send commands to. binary (str): binary name move_cmd (int): Command number in binary to use default_cmd_args (list): Default argument sets to use for inits and datums and sims move_gmb_field (str): GMB field prefix corresponding to the move command curr_state_gmb_field (str): GMB field of current state of component name (str): Component name motor_position_gmb_fields (list): GMB fields with current filterwheel/slide position values. Must be in the same order as these arguments are passed to the move command. mcd_name (~numpy.ndarray(str), optional): Label(s) of :term:`MCD` axis/axes for this component. If None, assume that device is not on an MCD. mcd_inds (~numpy.ndarray(int), optional): Indices of entries in :py:attr:`~gpilib2.rpc.rpc.mcdaxisnames` corresponding to the entries of mcd_name. None if mcd_name is None. """ def __init__( self, rpc: rpc, binary: str, move_cmd: int, default_cmd_args: List[str], move_gmb_field: str, curr_state_gmb_field: str, name: str, motor_position_gmb_fields: List[str], server: str = "tlc", mcd_name: Optional[npt.ArrayLike] = None, ) -> None: """Initialize component""" self.binary = "gpSrcAssClientTester" generic_gpi_component.__init__( self, rpc, binary, move_cmd, default_cmd_args, move_gmb_field, curr_state_gmb_field, name=name, server=server, mcd_name=mcd_name, ) self.motor_position_gmb_fields = motor_position_gmb_fields
[docs]class asu(source_component): """Artificial Source Unit""" def __init__(self, rpc: rpc, server: str = "tlc") -> None: """set up the asu Args: rpc (:py:class:`~gpilib2.rpc`): rpc object. sim status and verbosity will be set based on its settings. server (str): Server address to send commands to. """ source_component.__init__( self, rpc, "gpSrcAssClientTester", # binary 1, # move command MT_ART_SOURCE [ "3", # deploy "0", # YOverridePos "0", # XOverridePos "-3", # sourceIR "-3", # sourceVIS "-3", # sourceSC "-3", # attenuation ], # default_cmd_args "tlc.srcAss.srcAssCmd_ART_SOURCE.subCmd", # move gmb field "tlc.srcAss.srcDeploy", # curr state gmb field "ASU", # name [ "tlc.srcAss.srcDeployMotorPosition", "tlc.srcAss.srcXMotorPosition", ], # motor position gmb fields server=server, mcd_name = ['ASU_X_STAGE', 'ASU_Y_STAGE'] ) def __str__(self) -> str: """Print current ASU status""" vals = self.rpc.read_gmb_values( [ "tlc.srcAss.srcAssCmd_ART_SOURCE.simulate", # 0 "tlc.srcAss.srcDeploy", # 1 "tlc.srcAss.sourceSCpower", # 2 "tlc.srcAss.attenuationSC", # 3 "tlc.srcAss.currentTemperatureSC", # 4 "tlc.srcAss.srcDeployMotorPosition", # 5 "tlc.srcAss.srcXMotorPosition", # 6 "tlc.srcAss.sourceIR", # 7 "tlc.srcAss.sourceVIS", # 8 ] ) name = self.name if int(vals[0]) == 1: name += " (sim)" name += " " + np.array(["Extracted", "Deployed", "Pos. Override"])[int(vals[1])] onoff = np.array(["Off", "On"]) return ( "{0} Motor Position Y: {4: 2.2f} X: {5: 2.2f}\nSC Power: {1: 3.1f} " " Attenuation: {2: 3.2f} Temperature: {3: 3.1f}\nLASER IR: {6: >3} " " VIS: {7: >3}" ).format( name, float(vals[2]), float(vals[3]), float(vals[4]), float(vals[5]), float(vals[6]), onoff[int(vals[7])], onoff[int(vals[8])], )
[docs] def move( self, asu_pos: Union[str, List[float], None] = None, sc_pow: Union[float, None] = None, sc_att: Union[float, None] = None, rel: bool = False, ir_on: Union[bool, None] = None, vis_on: Union[bool, None] = None, queue: bool = False, ) -> None: """ASU MOVE command Args: asu_pos (str or list or None): 'in', 'out', 'cal_bore', 'omss_bore' or 2-element list of [Y,X] motor positions. Current position retained if None. If string, case insensitive sc_pow (number or None): Supercontinuum source power (0 to 100). Retain current value if None. sc_att (number or None): Supercontinuum source attenutation (2.2 to 60). Retain current value if None. rel (bool): Treat pos inputs as relative to current position (ignored if pos is str or None). Default False. ir_on (bool or None): Turn on ir laser is True, turn off if False, and retain current state if None. vis_on (bool or None): Turn on ir laser is True, turn off if False, and retain current state if None. queue (bool): If True, queue command rather than executing. Returns: None .. warning:: Careful! The override position order is Y, X. Y in this case is the long (deploy/extract) axis, while X is the orthogonal direction. """ assert ( (asu_pos is not None) or (sc_pow is not None) or (sc_att is not None) or (ir_on is not None) or (vis_on is not None) ), "At least one of: [asu_pos, sc_pow, sc_att, ir_on, vis_on] must be set." if asu_pos is not None: assert isinstance(asu_pos, str) or ( isinstance(asu_pos, list) and len(list) == 2 # type: ignore ), "asu_pos input must be a string or two-element list." self.check_datum() # default overrides YOverridePos = 0.0 XOverridePos = 0.0 # define pos string: [deploy, YOver, XOver] posdict = { "IN": [1, 0.0, 0.0], "OUT": [0, 0.0, 0.0], "CAL_BORE": [2, -0.96, 0.67], "OMSS_BORE": [2, -1.16, 0.64], } if isinstance(asu_pos, str): asu_pos = asu_pos.upper() assert ( asu_pos in posdict.keys() ), "String asu_pos input must be one of {}".format(list(posdict.keys())) deploy = posdict[asu_pos][0] YOverridePos, XOverridePos = posdict[asu_pos][1:] elif isinstance(asu_pos, list): asu_pos = np.array(asu_pos).astype(float) # type: ignore if rel: asu_pos += self.rpc.read_gmb_values( self.motor_position_gmb_fields ).astype(float) YOverridePos, XOverridePos = asu_pos deploy = 2 elif asu_pos is None: deploy = 3 if ir_on is None: sourceIR = -3 else: sourceIR = int(ir_on) if vis_on is None: sourceVIS = -3 else: sourceVIS = int(vis_on) if sc_pow is None: sourceSC = -3 else: sourceSC = float(sc_pow) # type: ignore if sc_att is None: attenuation = -3 else: attenuation = float(sc_att) # type: ignore cmdlist = [ self.rpc.binaries[self.binary], # Binary self.server, "{}".format(self.move_cmd), # command number "{}".format(self.rpc.activityId), # activity id "{}".format(self.rpc.activity["START"]), # activity directive "{}".format(self.rpc.ass_mode["MOVE"]), # mode "{}".format(self.rpc.move_level), # level "{}".format(deploy), "{}".format(YOverridePos), "{}".format(XOverridePos), "{}".format(sourceIR), "{}".format(sourceVIS), "{}".format(sourceSC), "{}".format(attenuation), ] # Execute or queue self.rpc.execute(cmdlist, self.move_gmb_field, queue=queue)
[docs] def deploy(self, sc_att: Union[float, None] = None, queue: bool = False) -> None: """Utility wrapper around MOVE command Args: sc_att (number or None): Supercontinuum source attenutation (2.2 to 60). Retain current value if None. If not None, set power to 100. queue (bool): If True, queue command rather than executing. .. warning:: Setting an attenuation automatically sets sc power to 100%. If you want finer control over what is happening, use the ``move`` method instead. """ if sc_att: sc_pow = 100 else: sc_pow = None self.move(asu_pos="in", sc_pow=sc_pow, sc_att=sc_att, queue=queue)
[docs] def extract(self, queue: bool = False) -> None: """Utility wrapper around MOVE command Args: queue (bool): If True, queue command rather than executing. .. warning:: This will extract the asu and set the supercontinuum power to 0 and attenuation to 60 dB. If you want finer control over what is happening, use the ``move`` method instead. """ self.move(asu_pos="out", sc_pow=0, sc_att=60, queue=queue)
[docs]class cal_sphere(source_component): """CAL Sphere""" def __init__(self, rpc: rpc, server: str = "tlc") -> None: """set up the sphere Args: rpc (:py:class:`~gpilib2.rpc`): rpc object. sim status and verbosity will be set based on its settings. server (str): Server address to send commands to. """ source_component.__init__( self, rpc, "gpSrcAssClientTester", # binary 2, # move command MT_CAL_SPHERE [ "0", # deploy "0", # deployOverridePos "-3", # attenuation ], # default_cmd_args "tlc.srcAss.srcAssCmd_SPHERE.subCmd", # move gmb field "tlc.srcAss.sphereDeploy", # curr state gmb field "CAL SPHERE", # name ["tlc.srcAss.sphereMotorPosition"], # motor position gmb fields server=server, mcd_name = 'CAL_SPHERE' ) def __str__(self) -> str: """Print current sphere status""" vals = self.rpc.read_gmb_values( [ "tlc.srcAss.srcAssCmd_SPHERE.simulate", # 0 "tlc.srcAss.sphereDeploy", # 1 "tlc.srcAss.attenuationSphere", # 2 "tlc.srcAss.sphereMotorPosition", # 3 ] ) name = self.name if int(vals[0]) == 1: name += " (sim)" name += " " + np.array(["Extracted", "Deployed", "Pos. Override"])[int(vals[1])] return ("{0} Motor Position: {1: 2.2f} Attenuation: {2: 3.2f}").format( name, float(vals[3]), float(vals[2]), )
[docs] def move( self, sphere_pos: Union[str, float, None] = None, sphere_att: Union[float, None] = None, rel: bool = False, queue: bool = False, ) -> None: """CAL Spere MOVE command Args: sphere_pos (str or number or None): 'in', 'out' or override position value (number). Current position retained if None. If string, case insensitive sphere_att (number or None): Sphere attenutation (2.2 to 60). Retain current value if None. rel (bool): Treat pos input as relative to current position (ignored if pos is str or None). Default False. queue (bool): If True, queue command rather than executing. Returns: None """ assert (sphere_pos is not None) or ( sphere_att is not None ), "At least one of: [sphere_ppos, sphere_att] must be set." if sphere_pos is not None: assert isinstance( sphere_pos, (str, Number) ), "sphere_pos input must be a string or number." self.check_datum() # default overrides deployOverridePos = 0.0 # define pos string: [deploy, YOver, XOver] posdict = { "IN": 1, "OUT": 0, } if isinstance(sphere_pos, str): sphere_pos = sphere_pos.upper() assert ( sphere_pos in posdict.keys() ), "String sphere_pos input must be one of {}".format(list(posdict.keys())) deploy = posdict[sphere_pos] elif isinstance(sphere_pos, Number): sphere_pos = float(sphere_pos) if rel: sphere_pos += self.rpc.read_gmb_values( self.motor_position_gmb_fields ).astype(float)[0] deploy = 2 elif sphere_pos is None: deploy = -3 if sphere_att is None: attenuation = -3 else: attenuation = float(sphere_att) # type: ignore cmdlist = [ self.rpc.binaries[self.binary], # Binary self.server, "{}".format(self.move_cmd), # command number "{}".format(self.rpc.activityId), # activity id "{}".format(self.rpc.activity["START"]), # activity directive "{}".format(self.rpc.ass_mode["MOVE"]), # mode "{}".format(self.rpc.move_level), # level "{}".format(deploy), "{}".format(deployOverridePos), "{}".format(attenuation), ] # Execute or queue self.rpc.execute(cmdlist, self.move_gmb_field, queue=queue)
[docs] def deploy( self, sphere_att: Union[float, None] = None, queue: bool = False ) -> None: """Utility wrapper around MOVE command Args: sphere_att (number or None): CAL sphere attenutation (2.2 to 60). Retain current value if None. queue (bool): If True, queue command rather than executing. """ self.move(sphere_pos="in", sphere_att=sphere_att, queue=queue)
[docs] def extract(self, queue: bool = False) -> None: """Utility wrapper around MOVE command Args: queue (bool): If True, queue command rather than executing. .. warning:: This will extract the cal sphere and set the attenuation to 60 dB. If you want finer control over what is happening, use the ``move`` method instead. """ self.move(sphere_pos="out", sphere_att=60, queue=queue)