from gpilib2.generic_gpi_component import generic_gpi_component
from gpilib2.generic_gpi_assembly import generic_gpi_assembly
from gpilib2.rpc import rpc
from typing import List, Tuple, Optional
import numpy as np
import numpy.typing as npt
import warnings
[docs]class pnc(generic_gpi_assembly):
"""All GPI pointing and centering components (inputfold, ao pncs, cal-ifs pncs)
Args:
rpc (:py:class:`~gpilib2.rpc`):
rpc object. sim status and verbosity will be set based on its settings.
Attributes:
rpc (:py:class:`~gpilib2.rpc`):
rpc object for communications
ao_pnc (:py:class:`~gpilib2.pnc.ao_pnc`):
fpm object
cal_pnc (:py:class:`~gpilib2.pnc.generic_pnc`):
apodizer object
inputfold (:py:class:`~gpilib2.pnc.generic_pnc`):
lyot object
components (list):
List of inputfold, ao_pnc, cal_pnc (in **that** order)
"""
def __init__(self, rpc: rpc) -> None:
"""Define all components"""
generic_gpi_assembly.__init__(self, rpc)
self.ao_pnc = ao_pnc(
rpc,
"gpTrkAssClientTester", # binary
0, # cmd MT_AO_WFS_PnC_PAIR
["0"] * 6, # default cmd
"tlc.trkAss.trkAssCmd_WFS_PnC_PAIR.subCmd", # cmd gmb field
"tlc.trkAss.aoWfs.tracking", # curr state gmb field
"AO PnC", # what I'm called
has_pointing=True,
has_focus=True,
)
self.cal_pnc = cal_pnc(
rpc,
"gpTrkAssClientTester", # binary
2, # cmd MT_CAL_IFS_PnC_PAIR
["0"] * 6, # default cmd
"tlc.trkAss.trkAssCmd_WFS_PnC_PAIR.subCmd", # cmd gmb field
"tlc.trkAss.calIfs.tracking", # curr state gmb field
"CAL PnC", # what I'm called
has_pointing=True,
has_focus=True,
)
self.inputfold = inputfold(
rpc,
"gpTrkAssClientTester", # binary
1, # cmd MT_FOLD_TT_MIRROR
["0"] * 3, # default cmd
"tlc.trkAss.trkAssCmd_WFS_PnC_PAIR.subCmd", # cmd gmb field
"tlc.trkAss.fold.tracking", # curr state gmb field
"Inputfold", # what I'm called
has_pointing=False,
has_focus=False,
)
self.components = [self.inputfold, self.ao_pnc, self.cal_pnc]
[docs]class generic_pnc(generic_gpi_component):
"""GPI pointing and/or centering component
Args:
rpc (:py:class:`~gpilib2.rpc`):
rpc object. sim status and verbosity will be set
based on its settings.
binary (str):
binary name
move_cmd (int):
Command number in binary to use
default_cmd_args (list):
Default argument sets to use for inits and datums and sims
move_gmb_field (str):
GMB field prefix corresponding to the move command
current_state_gmb_field (str):
GMB field of current shutter state
name (str):
Name of this shutter
server (str):
Server address to send commands to.
has_pointing (bool):
Mechanism has pointing capability (default True)
has_focus (bool):
Mechanism has focus capability (default True)
Attributes:
rpc (:py:class:`~gpilib2.rpc`):
rpc object for communications
server (str):
Server address to send commands to.
binary (str):
binary name
move_cmd (int):
Command number in binary to use
default_cmd_args (list):
Default argument sets to use for inits and datums and sims
move_gmb_field (str):
GMB field prefix corresponding to the move command
current_state_gmb_field (str):
GMB field of current shutter state
name (str):
Name of this shutter
has_pointing (bool):
Mechanism has pointing capability
has_focus (bool):
Mechanism has focus capability
n_vals (int):
Number of total settable values (2 for pointing + 2 for centering + 1 for
focus)
tol (float):
Absolute tolerance to accept successful move commands
"""
def __init__(
self,
rpc: rpc,
binary: str,
move_cmd: int,
default_cmd_args: List[str],
move_gmb_field: str,
curr_state_gmb_field: str,
name: str,
server: str = "tlc",
has_pointing: bool = True,
has_focus: bool = True,
tol: float = 1e-5,
) -> None:
"""Set up the mirror(s)"""
generic_gpi_component.__init__(
self,
rpc,
binary,
move_cmd,
default_cmd_args,
move_gmb_field,
curr_state_gmb_field,
name=name,
server=server,
)
self.has_pointing = has_pointing
self.has_focus = has_focus
self.n = 2
if self.has_pointing:
self.n += 2
if self.has_focus:
self.n += 2
self.tol = tol
def __str__(self) -> str:
"""Returns pnc status string"""
vals = self.rpc.read_gmb_values(
[
self.curr_state_gmb_field,
".".join(self.move_gmb_field.split(".")[:-1]) + ".simulate",
]
).astype(int)
name = self.name
if int(vals[1]) == 1:
name += " (sim)"
return "{0: >30}: {1}".format(name, np.array(["Stopped", "Tracking"])[vals[0]])
[docs] def get_curr_values(self) -> Tuple[npt.NDArray[np.float_], ...]:
pass
[docs] def assemble_move_cmd(self, pos: npt.NDArray[np.float_]) -> List[str]:
"""Utility method to package move AO PnC move commands
Args:
pos (numpy.ndarray):
5 element floating point array of [tip, tilt, x cent, ycent, focus]
Returns:
list:
Command list to execute
"""
cmdlist = [
self.rpc.binaries[self.binary],
self.server,
"{}".format(self.move_cmd),
"{}".format(self.rpc.activityId), # activity id
"{}".format(self.rpc.activity["START"]), # activity directive
"{}".format(self.rpc.ass_mode["MOVE"]), # mode
"{}".format(self.rpc.move_level), # level
"1", # override
]
for p in pos:
cmdlist.append("{:.6f}".format(p))
return cmdlist
[docs] def toggle_offset(self, field: str, val: int) -> None:
"""Toggle offset field
Args:
field (str):
GMB field of offset
val (int):
0 for off, 1 for on
Returns:
None
"""
assert val in [0, 1], "val can only be 0 or 1."
self.rpc.write_gmb_value(field, val)
curr_val = self.rpc.read_gmb_values(field).astype(int)[0]
assert curr_val == val, "Setting {} failed.".format(field)
[docs] def move(
self,
targ: Optional[List[float]] = None,
point: Optional[List[float]] = None,
cent: Optional[List[float]] = None,
focus: Optional[float] = None,
rel: bool = False,
) -> None:
"""PnC MOVE command
Args:
targ (list or None):
Either 4 or 5 element list of [tip, tilt, x cent, ycent, focus]
values. If 4 elements, focus is set to zero. If None, point and/or cent
must be set.
point (list or None):
2-element list of pointing values [tip, tilt] in mas. Ignored if targ
is set.
cent (list or None):
2-element list of centering values [x,y] in mm. Ignored if targ is set
focus (float or None):
Scalar focus value in mm. Ignored if targ is set.
rel (bool):
Apply inputs as offsets from current values (taking into account of
all other offsets).
Returns:
None
.. note::
Either targ or at least one of point/cent/focus must be set. If targ is set
the other three inputs are ignored.
.. warning::
The default behavior of this method is inherently different from the
equivalent functionality of the original gpilib. When rel is False (i.e.,
absolute inputs) he offsets to be applied (based on current settings) will
be subtracted from the inputs, such that the resulting mirror settings
should match the inputs exactly (to within a very small margin of error).
"""
curr_net, curr_off, next_off = self.get_curr_values()
if targ is not None:
assert len(targ) in [4, 5], "targ input must have 4 or 5 elements."
if (point is not None) or (cent is not None) or (focus is not None):
warnings.warn(
"point, cent, and focus inputs are ignored when targ is set"
)
if len(targ) == 4:
targ.append(0)
pos = np.array(targ).astype(float)
else:
assert (
(point is not None) or (cent is not None) or (focus is not None)
), "At least one of: point, cent, or focus must be set when pos is None."
pos = np.zeros(5)
if point:
assert len(point) == 2, "point input must have two elements."
pos[0:2] = point
else:
if not (rel):
pos[0:2] = curr_net[0:2]
if cent:
assert len(cent) == 2, "cent input must have two elements."
pos[2:4] = cent
else:
if not (rel):
pos[2:4] = curr_net[2:4]
if focus:
pos[4] = focus
else:
if not (rel):
pos[4] = curr_net[4]
# If this is a relative move, add the current net
if rel:
pos += curr_net
# Execute initial move and check where we went
self.rpc.execute(
self.assemble_move_cmd(pos - next_off), self.move_gmb_field, nowait=True
)
curr_net, curr_off, next_off = self.get_curr_values()
if np.any(np.abs(curr_net - pos) > self.tol):
self.rpc.execute(
self.assemble_move_cmd(pos - next_off), self.move_gmb_field
)
else:
self.rpc.wait_for_command_completion(self.move_gmb_field)
[docs] def olm_on(self) -> None:
"""Toggle to ON"""
self.olm(1)
[docs] def olm_off(self) -> None:
"""Toggle to OFF"""
self.olm(0)
[docs] def olm(self, val: int) -> None:
pass
[docs]class ao_pnc(generic_pnc):
"""AO PnCs"""
[docs] def get_curr_values(self) -> Tuple[npt.NDArray[np.float_], ...]:
"""Query all of the GMB variables required to determine current positions and
offsets
Args:
None
Returns:
tuple:
:py:class:`numpy.ndarray`:
Array of current net target value (of size self.n)
:py:class:`numpy.ndarray`:
Array of currently applied offsets (of size self.n)
:py:class:`numpy.ndarray`:
Array of offsets that will be applied to next MOVE (of size self.n)
.. note::
Depending on the current instrument state, there may be residual values in
some offset fields even though the offset itself is currently toggled off.
For example :term:`OLM` offsets will be non-zero in the event that the OLM
has just been toggled off while the PnCs are stopped. Upon the next move
(or track start) the OLM fields will all go to zero. Because of this, the
current net target is validated based on all of the offset fields, whereas
the offset for the next move is determined based on the current state of
the offset toggles.
"""
base_names = [
"PointingTip",
"PointingTilt",
"CentringX",
"CentringY",
"PCFocus",
]
# Net Target fields
fields = ["tlc.trkAss.aoWfs.target{}".format(b) for b in base_names]
# Nominal values:
fields += ["tlc.trkAss.aoWfs.nominal{}Offset".format(b) for b in base_names]
# Base values
fields += ["tlc.trkAss.aoWfs.base{}".format(b) for b in base_names[:-1]]
# TODO: this name needs to be fixed in GMB
fields.append("tlc.trkAss.aoWfs.baseFoucs")
# OLM values:
fields += ["tlc.trkAss.aoWfs.olm{}".format(b) for b in base_names[:-1]]
fields.append("tlc.trkAss.aoWfs.olmFocus")
# DAR offsets:
fields += ["tlc.trkAss.aoWfs.dar{}".format(b) for b in base_names[:2]]
fields.append("tlc.trkAss.aoWfs.darFocus")
# FPM offsets:
fields += ["tlc.fpmAss.mask{}".format(b) for b in base_names[:2]]
fields += [
"tlc.trkAss.aoWfs.applyOpenLoopModel",
"tlc.trkAss.aoWfs.applyDarResiduals",
"tlc.trkAss.aoWfs.applyFpmOffsets",
]
vals = self.rpc.read_gmb_values(fields)
offsets = vals[-3:].astype(int)
net = vals[:5].astype(float)
nominal = vals[5:10].astype(float)
base = vals[10:15].astype(float)
olm = vals[15:20].astype(float)
dar = np.zeros(5)
dar[:2] = vals[20:22].astype(float)
fpm = np.zeros(5)
fpm[:2] = vals[22:24].astype(float)
prev_offset = nominal + olm + dar + fpm
assert np.all(
np.abs(base + prev_offset - net) < self.tol
), "Cannot reconstruct current AO PnC Target Values."
offset = nominal + olm * offsets[0] + dar * offsets[1] + fpm * offsets[2]
return net, prev_offset, offset
[docs] def cal_correct(self, val: int) -> None:
"""Toggle CAL correct for AO PnCs
Args:
val (int):
0 for off, 1 for on
Returns:
None
"""
self.toggle_offset("tlc.trkAss.aoWfs.applyCalCorrection", val)
[docs] def cal_correct_on(self) -> None:
"""Toggle CAL correct to ON"""
self.cal_correct(1)
[docs] def cal_correct_off(self) -> None:
"""Toggle CAL correct to OFF"""
self.cal_correct(0)
[docs] def dar(self, val: int) -> None:
"""Toggle DAR for AO PnCs
Args:
val (int):
0 for off, 1 for on
Returns:
None
"""
self.toggle_offset("tlc.trkAss.aoWfs.applyDarResiduals", val)
[docs] def dar_on(self) -> None:
"""Toggle DAR to ON"""
self.dar(1)
[docs] def dar_off(self) -> None:
"""Toggle DAR to OFF"""
self.dar(0)
[docs] def olm(self, val: int) -> None:
"""Toggle OLM for AO PnCs
Args:
val (int):
0 for off, 1 for on
Returns:
None
"""
self.toggle_offset("tlc.trkAss.aoWfs.applyOpenLoopModel", val)
[docs] def fpm_offsets(self, val: int) -> None:
"""Toggle FPM offsetsfor AO PnCs
Args:
val (int):
0 for off, 1 for on
Returns:
None
"""
self.toggle_offset("tlc.trkAss.aoWfs.applyFpmOffsets", val)
[docs] def fpm_offsets_on(self) -> None:
"""Toggle to ON"""
self.fpm_offsets(1)
[docs] def fpm_offsets_off(self) -> None:
"""Toggle to OFF"""
self.fpm_offsets(0)
[docs]class cal_pnc(generic_pnc):
"""CAL PnCs"""
[docs] def get_curr_values(self) -> Tuple[npt.NDArray[np.float_], ...]:
"""Query all of the GMB variables required to determine current positions and
offsets
Args:
None
Returns:
tuple:
:py:class:`numpy.ndarray`:
Array of current net target value (of size self.n)
:py:class:`numpy.ndarray`:
Array of currently applied offsets (of size self.n)
:py:class:`numpy.ndarray`:
Array of offsets that will be applied to next MOVE (of size self.n)
.. note::
Depending on the current instrument state, there may be residual values in
some offset fields even though the offset itself is currently toggled off.
For example :term:`OLM` offsets will be non-zero in the event that the OLM
has just been toggled off while the PnCs are stopped. Upon the next move
(or track start) the OLM fields will all go to zero. Because of this, the
current net target is validated based on all of the offset fields, whereas
the offset for the next move is determined based on the current state of
the offset toggles.
"""
base_names = [
"PointingTip",
"PointingTilt",
"CentringX",
"CentringY",
"PCFocus",
]
# Net Target fields
fields = ["tlc.trkAss.calIfs.target{}".format(b) for b in base_names]
# Nominal values:
fields += ["tlc.trkAss.calIfs.nominal{}Offset".format(b) for b in base_names]
# Base values
fields += ["tlc.trkAss.calIfs.base{}".format(b) for b in base_names[:-1]]
# TODO: this name needs to be fixed in GMB
fields.append("tlc.trkAss.calIfs.baseFoucs")
# OLM values:
fields += ["tlc.trkAss.calIfs.olm{}".format(b) for b in base_names[:-1]]
fields.append("tlc.trkAss.calIfs.olmFocus")
# FOV offsets (pointing only):
fields += ["tlc.isRpc.{}FovIfsOffset".format(b) for b in ["x", "y"]]
fields.append("tlc.trkAss.aoWfs.darFocus")
# Lyot offsets (centering only):
fields += ["tlc.ifsAss.lyotMaskCentring{}".format(b) for b in ["X", "Y"]]
fields += [
"tlc.trkAss.calIfs.applyOpenLoopModel",
"tlc.trkAss.calIfs.applyFovOffsets",
"tlc.trkAss.calIfs.applyLyotOffsets",
]
vals = self.rpc.read_gmb_values(fields)
offsets = vals[-3:].astype(int)
net = vals[:5].astype(float)
nominal = vals[5:10].astype(float)
base = vals[10:15].astype(float)
olm = vals[15:20].astype(float)
fov = np.zeros(5)
fov[:2] = vals[20:22].astype(float)
lyot = np.zeros(5)
lyot[2:4] = vals[22:24].astype(float)
prev_offset = nominal + olm + fov + lyot
assert np.all(
np.abs(base + prev_offset - net) < self.tol
), "Cannot reconstruct current CAL PnC Target Values."
offset = nominal + olm * offsets[0] + fov * offsets[1] + lyot * offsets[2]
return net, prev_offset, offset
[docs] def olm(self, val: int) -> None:
"""Toggle OLM for CAL PnCs
Args:
val (int):
0 for off, 1 for on
Returns:
None
"""
self.toggle_offset("tlc.trkAss.calIfs.applyOpenLoopModel", val)
[docs] def fov_offsets(self, val: int) -> None:
"""Toggle FOV offsets for CAL PnCs
Args:
val (int):
0 for off, 1 for on
Returns:
None
"""
self.toggle_offset("tlc.trkAss.calIfs.applyFovOffsets", val)
[docs] def fov_offsets_on(self) -> None:
"""Toggle to ON"""
self.fov_offsets(1)
[docs] def fov_offsets_off(self) -> None:
"""Toggle to OFF"""
self.fov_offsets(0)
[docs] def lyot_offsets(self, val: int) -> None:
"""Toggle Lyot offsets for CAL PnCs
Args:
val (int):
0 for off, 1 for on
Returns:
None
"""
self.toggle_offset("tlc.trkAss.calIfs.applyLyotOffsets", val)
[docs] def lyot_offsets_on(self) -> None:
"""Toggle to ON"""
self.lyot_offsets(1)
[docs] def lyot_offsets_off(self) -> None:
"""Toggle to OFF"""
self.lyot_offsets(0)