from typing import Union, List, Optional
from gpilib2.generic_gpi_component import generic_gpi_component
from gpilib2.generic_gpi_assembly import generic_gpi_assembly
from numbers import Number
import numpy as np
from gpilib2.rpc import rpc
import numpy.typing as npt
[docs]class source(generic_gpi_assembly):
"""All GPI sources (everything in source assembly other than OMSS shutter)
Args:
rpc (:py:class:`~gpilib2.rpc`):
rpc object. sim status and verbosity will be set
based on its settings.
Attributes:
rpc (:py:class:`~gpilib2.rpc`):
rpc object for communications
asu (:py:class:`~gpilib2.source.asu`):
asu object
cal_sphere (:py:class:`~gpilib2.source.cal_sphere`):
sphere object
components (list):
List of ASU and CAL_SPHERE (in **that** order)
"""
def __init__(self, rpc: rpc) -> None:
"""Define all components
Args:
rpc (gpilib2.rpc):
rpc object. sim status and verbosity will be set
based on its settings.
"""
generic_gpi_assembly.__init__(self, rpc)
self.asu = asu(self.rpc)
self.cal_sphere = cal_sphere(self.rpc)
self.components = [self.asu, self.cal_sphere]
[docs] def move(
self,
asu_pos: Union[str, List[float], None] = None,
sc_pow: Union[float, None] = None,
sc_att: Union[float, None] = None,
rel: bool = False,
ir_on: Union[bool, None] = None,
vis_on: Union[bool, None] = None,
sphere_pos: Union[str, float, None] = None,
sphere_att: Union[float, None] = None,
queue: bool = False,
) -> None:
"""Source MOVE command
Args:
asu_pos (str or list or None):
'in', 'out', 'cal_bore', 'omss_bore' or 2-element list of [Y,X] motor
positions. Current position retained if None. If string, case
insensitive
sc_pow (number or None):
Supercontinuum source power (0 to 100). Retain current value if None.
sc_att (number or None):
Supercontinuum source attenutation (2.2 to 60). Retain current value if
None.
rel (bool):
Treat pos inputs as relative to current position (ignored if pos is str
or None). Default False.
ir_on (bool or None):
Turn on ir laser is True, turn off if False, and retain current state
if None.
vis_on (bool or None):
Turn on ir laser is True, turn off if False, and retain current state
sphere_pos (str or number or None):
'in', 'out' or override position value (number). Current position
retained if None. If string, case insensitive
sphere_att (number or None):
Sphere attenutation (2.2 to 60). Retain current value if None.
if None.
queue (bool):
If True, queue commands rather than executing.
Returns:
None
.. warning::
The ``rel`` input will be applied to both ASU and CAL SPHERE moves. If you
wish to have more granular control, use the move methods on the two
sub-classes individualls.
"""
doing_something = False
if (
(asu_pos is not None)
or (sc_pow is not None)
or (sc_att is not None)
or (ir_on is not None)
or (vis_on is not None)
):
try:
self.asu.move(
asu_pos=asu_pos,
sc_pow=sc_pow,
sc_att=sc_att,
rel=rel,
ir_on=ir_on,
vis_on=vis_on,
queue=True,
)
doing_something = True
except (AssertionError, RuntimeError, TypeError, NameError):
self.rpc.clear_queue()
raise
if (sphere_pos is not None) or (sphere_att is not None):
try:
self.cal_sphere.move(
sphere_pos=sphere_pos, sphere_att=sphere_att, rel=rel, queue=True
)
doing_something = True
except (AssertionError, RuntimeError, TypeError, NameError):
self.rpc.clear_queue()
raise
if not (queue) and doing_something:
self.rpc.execute_queue()
[docs]class source_component(generic_gpi_component):
"""Base class for ASU and CAL_SPHERE
Args:
rpc (:py:class:`~gpilib2.rpc`):
rpc object. sim status and verbosity will be set
based on its settings.
binary (str):
binary name
move_cmd (int):
Command number in binary to use
default_cmd_args (list):
Default argument sets to use for inits and datums and sims
move_gmb_field (str):
GMB field prefix corresponding to the move command
curr_state_gmb_field (str):
GMB field of current state of component
name (str):
Component name
motor_position_gmb_fields (list):
GMB fields with current filterwheel/slide position values. Must be in the
same order as these arguments are passed to the move command.
server (str):
Server address to send commands to.
mcd_name (:py:data:`~numpy.typing.ArrayLike`, optional):
Label(s) of :term:`MCD` axis for this component. If None (default),
assume that device is not on an MCD.
Attributes:
rpc (:py:class:`~gpilib2.rpc`):
rpc object for communications
server (str):
Server address to send commands to.
binary (str):
binary name
move_cmd (int):
Command number in binary to use
default_cmd_args (list):
Default argument sets to use for inits and datums and sims
move_gmb_field (str):
GMB field prefix corresponding to the move command
curr_state_gmb_field (str):
GMB field of current state of component
name (str):
Component name
motor_position_gmb_fields (list):
GMB fields with current filterwheel/slide position values. Must be in the
same order as these arguments are passed to the move command.
mcd_name (~numpy.ndarray(str), optional):
Label(s) of :term:`MCD` axis/axes for this component. If None, assume that
device is not on an MCD.
mcd_inds (~numpy.ndarray(int), optional):
Indices of entries in :py:attr:`~gpilib2.rpc.rpc.mcdaxisnames`
corresponding to the entries of mcd_name. None if mcd_name is None.
"""
def __init__(
self,
rpc: rpc,
binary: str,
move_cmd: int,
default_cmd_args: List[str],
move_gmb_field: str,
curr_state_gmb_field: str,
name: str,
motor_position_gmb_fields: List[str],
server: str = "tlc",
mcd_name: Optional[npt.ArrayLike] = None,
) -> None:
"""Initialize component"""
self.binary = "gpSrcAssClientTester"
generic_gpi_component.__init__(
self,
rpc,
binary,
move_cmd,
default_cmd_args,
move_gmb_field,
curr_state_gmb_field,
name=name,
server=server,
mcd_name=mcd_name,
)
self.motor_position_gmb_fields = motor_position_gmb_fields
[docs]class asu(source_component):
"""Artificial Source Unit"""
def __init__(self, rpc: rpc, server: str = "tlc") -> None:
"""set up the asu
Args:
rpc (:py:class:`~gpilib2.rpc`):
rpc object. sim status and verbosity will be set
based on its settings.
server (str):
Server address to send commands to.
"""
source_component.__init__(
self,
rpc,
"gpSrcAssClientTester", # binary
1, # move command MT_ART_SOURCE
[
"3", # deploy
"0", # YOverridePos
"0", # XOverridePos
"-3", # sourceIR
"-3", # sourceVIS
"-3", # sourceSC
"-3", # attenuation
], # default_cmd_args
"tlc.srcAss.srcAssCmd_ART_SOURCE.subCmd", # move gmb field
"tlc.srcAss.srcDeploy", # curr state gmb field
"ASU", # name
[
"tlc.srcAss.srcDeployMotorPosition",
"tlc.srcAss.srcXMotorPosition",
], # motor position gmb fields
server=server,
mcd_name = ['ASU_X_STAGE', 'ASU_Y_STAGE']
)
def __str__(self) -> str:
"""Print current ASU status"""
vals = self.rpc.read_gmb_values(
[
"tlc.srcAss.srcAssCmd_ART_SOURCE.simulate", # 0
"tlc.srcAss.srcDeploy", # 1
"tlc.srcAss.sourceSCpower", # 2
"tlc.srcAss.attenuationSC", # 3
"tlc.srcAss.currentTemperatureSC", # 4
"tlc.srcAss.srcDeployMotorPosition", # 5
"tlc.srcAss.srcXMotorPosition", # 6
"tlc.srcAss.sourceIR", # 7
"tlc.srcAss.sourceVIS", # 8
]
)
name = self.name
if int(vals[0]) == 1:
name += " (sim)"
name += " " + np.array(["Extracted", "Deployed", "Pos. Override"])[int(vals[1])]
onoff = np.array(["Off", "On"])
return (
"{0} Motor Position Y: {4: 2.2f} X: {5: 2.2f}\nSC Power: {1: 3.1f} "
" Attenuation: {2: 3.2f} Temperature: {3: 3.1f}\nLASER IR: {6: >3} "
" VIS: {7: >3}"
).format(
name,
float(vals[2]),
float(vals[3]),
float(vals[4]),
float(vals[5]),
float(vals[6]),
onoff[int(vals[7])],
onoff[int(vals[8])],
)
[docs] def move(
self,
asu_pos: Union[str, List[float], None] = None,
sc_pow: Union[float, None] = None,
sc_att: Union[float, None] = None,
rel: bool = False,
ir_on: Union[bool, None] = None,
vis_on: Union[bool, None] = None,
queue: bool = False,
) -> None:
"""ASU MOVE command
Args:
asu_pos (str or list or None):
'in', 'out', 'cal_bore', 'omss_bore' or 2-element list of [Y,X] motor
positions. Current position retained if None. If string, case
insensitive
sc_pow (number or None):
Supercontinuum source power (0 to 100). Retain current value if None.
sc_att (number or None):
Supercontinuum source attenutation (2.2 to 60). Retain current value if
None.
rel (bool):
Treat pos inputs as relative to current position (ignored if pos is str
or None). Default False.
ir_on (bool or None):
Turn on ir laser is True, turn off if False, and retain current state
if None.
vis_on (bool or None):
Turn on ir laser is True, turn off if False, and retain current state
if None.
queue (bool):
If True, queue command rather than executing.
Returns:
None
.. warning::
Careful! The override position order is Y, X. Y in this case is the long
(deploy/extract) axis, while X is the orthogonal direction.
"""
assert (
(asu_pos is not None)
or (sc_pow is not None)
or (sc_att is not None)
or (ir_on is not None)
or (vis_on is not None)
), "At least one of: [asu_pos, sc_pow, sc_att, ir_on, vis_on] must be set."
if asu_pos is not None:
assert isinstance(asu_pos, str) or (
isinstance(asu_pos, list) and len(list) == 2 # type: ignore
), "asu_pos input must be a string or two-element list."
self.check_datum()
# default overrides
YOverridePos = 0.0
XOverridePos = 0.0
# define pos string: [deploy, YOver, XOver]
posdict = {
"IN": [1, 0.0, 0.0],
"OUT": [0, 0.0, 0.0],
"CAL_BORE": [2, -0.96, 0.67],
"OMSS_BORE": [2, -1.16, 0.64],
}
if isinstance(asu_pos, str):
asu_pos = asu_pos.upper()
assert (
asu_pos in posdict.keys()
), "String asu_pos input must be one of {}".format(list(posdict.keys()))
deploy = posdict[asu_pos][0]
YOverridePos, XOverridePos = posdict[asu_pos][1:]
elif isinstance(asu_pos, list):
asu_pos = np.array(asu_pos).astype(float) # type: ignore
if rel:
asu_pos += self.rpc.read_gmb_values(
self.motor_position_gmb_fields
).astype(float)
YOverridePos, XOverridePos = asu_pos
deploy = 2
elif asu_pos is None:
deploy = 3
if ir_on is None:
sourceIR = -3
else:
sourceIR = int(ir_on)
if vis_on is None:
sourceVIS = -3
else:
sourceVIS = int(vis_on)
if sc_pow is None:
sourceSC = -3
else:
sourceSC = float(sc_pow) # type: ignore
if sc_att is None:
attenuation = -3
else:
attenuation = float(sc_att) # type: ignore
cmdlist = [
self.rpc.binaries[self.binary], # Binary
self.server,
"{}".format(self.move_cmd), # command number
"{}".format(self.rpc.activityId), # activity id
"{}".format(self.rpc.activity["START"]), # activity directive
"{}".format(self.rpc.ass_mode["MOVE"]), # mode
"{}".format(self.rpc.move_level), # level
"{}".format(deploy),
"{}".format(YOverridePos),
"{}".format(XOverridePos),
"{}".format(sourceIR),
"{}".format(sourceVIS),
"{}".format(sourceSC),
"{}".format(attenuation),
]
# Execute or queue
self.rpc.execute(cmdlist, self.move_gmb_field, queue=queue)
[docs] def deploy(self, sc_att: Union[float, None] = None, queue: bool = False) -> None:
"""Utility wrapper around MOVE command
Args:
sc_att (number or None):
Supercontinuum source attenutation (2.2 to 60). Retain current value if
None. If not None, set power to 100.
queue (bool):
If True, queue command rather than executing.
.. warning::
Setting an attenuation automatically sets sc power to 100%. If you want
finer control over what is happening, use the ``move`` method instead.
"""
if sc_att:
sc_pow = 100
else:
sc_pow = None
self.move(asu_pos="in", sc_pow=sc_pow, sc_att=sc_att, queue=queue)
[docs]class cal_sphere(source_component):
"""CAL Sphere"""
def __init__(self, rpc: rpc, server: str = "tlc") -> None:
"""set up the sphere
Args:
rpc (:py:class:`~gpilib2.rpc`):
rpc object. sim status and verbosity will be set
based on its settings.
server (str):
Server address to send commands to.
"""
source_component.__init__(
self,
rpc,
"gpSrcAssClientTester", # binary
2, # move command MT_CAL_SPHERE
[
"0", # deploy
"0", # deployOverridePos
"-3", # attenuation
], # default_cmd_args
"tlc.srcAss.srcAssCmd_SPHERE.subCmd", # move gmb field
"tlc.srcAss.sphereDeploy", # curr state gmb field
"CAL SPHERE", # name
["tlc.srcAss.sphereMotorPosition"], # motor position gmb fields
server=server,
mcd_name = 'CAL_SPHERE'
)
def __str__(self) -> str:
"""Print current sphere status"""
vals = self.rpc.read_gmb_values(
[
"tlc.srcAss.srcAssCmd_SPHERE.simulate", # 0
"tlc.srcAss.sphereDeploy", # 1
"tlc.srcAss.attenuationSphere", # 2
"tlc.srcAss.sphereMotorPosition", # 3
]
)
name = self.name
if int(vals[0]) == 1:
name += " (sim)"
name += " " + np.array(["Extracted", "Deployed", "Pos. Override"])[int(vals[1])]
return ("{0} Motor Position: {1: 2.2f} Attenuation: {2: 3.2f}").format(
name,
float(vals[3]),
float(vals[2]),
)
[docs] def move(
self,
sphere_pos: Union[str, float, None] = None,
sphere_att: Union[float, None] = None,
rel: bool = False,
queue: bool = False,
) -> None:
"""CAL Spere MOVE command
Args:
sphere_pos (str or number or None):
'in', 'out' or override position value (number). Current position
retained if None. If string, case insensitive
sphere_att (number or None):
Sphere attenutation (2.2 to 60). Retain current value if None.
rel (bool):
Treat pos input as relative to current position (ignored if pos is str
or None). Default False.
queue (bool):
If True, queue command rather than executing.
Returns:
None
"""
assert (sphere_pos is not None) or (
sphere_att is not None
), "At least one of: [sphere_ppos, sphere_att] must be set."
if sphere_pos is not None:
assert isinstance(
sphere_pos, (str, Number)
), "sphere_pos input must be a string or number."
self.check_datum()
# default overrides
deployOverridePos = 0.0
# define pos string: [deploy, YOver, XOver]
posdict = {
"IN": 1,
"OUT": 0,
}
if isinstance(sphere_pos, str):
sphere_pos = sphere_pos.upper()
assert (
sphere_pos in posdict.keys()
), "String sphere_pos input must be one of {}".format(list(posdict.keys()))
deploy = posdict[sphere_pos]
elif isinstance(sphere_pos, Number):
sphere_pos = float(sphere_pos)
if rel:
sphere_pos += self.rpc.read_gmb_values(
self.motor_position_gmb_fields
).astype(float)[0]
deploy = 2
elif sphere_pos is None:
deploy = -3
if sphere_att is None:
attenuation = -3
else:
attenuation = float(sphere_att) # type: ignore
cmdlist = [
self.rpc.binaries[self.binary], # Binary
self.server,
"{}".format(self.move_cmd), # command number
"{}".format(self.rpc.activityId), # activity id
"{}".format(self.rpc.activity["START"]), # activity directive
"{}".format(self.rpc.ass_mode["MOVE"]), # mode
"{}".format(self.rpc.move_level), # level
"{}".format(deploy),
"{}".format(deployOverridePos),
"{}".format(attenuation),
]
# Execute or queue
self.rpc.execute(cmdlist, self.move_gmb_field, queue=queue)
[docs] def deploy(
self, sphere_att: Union[float, None] = None, queue: bool = False
) -> None:
"""Utility wrapper around MOVE command
Args:
sphere_att (number or None):
CAL sphere attenutation (2.2 to 60). Retain current value if
None.
queue (bool):
If True, queue command rather than executing.
"""
self.move(sphere_pos="in", sphere_att=sphere_att, queue=queue)