from gpilib2.rpc import rpc
import numpy as np
from gpilib2.util import validate_directory
import os
import glob
from datetime import datetime
import re
import time
from astropy.io import fits # type: ignore
from typing import Union, Tuple
import numpy.typing as npt
[docs]class cameras:
"""All GPI cameras
Args:
rpc (:py:class:`~gpilib2.rpc`):
rpc object. sim status and verbosity will be set
based on its settings.
Attributes:
rpc (:py:class:`~gpilib2.rpc`):
rpc object for communications
"""
def __init__(self, rpc: rpc) -> None:
"""Define all components
Args:
rpc (:py:class:`~gpilib2.rpc`):
rpc object. sim status and verbosity will be set
based on its settings.
"""
self.rpc = rpc
self.lowfs = cal_camera(
rpc,
"gpCalRpcClientTester", # binary
15, # cmd: MT_CAL_TAKE_EXP
"cal.calCmd_TAKE_EXP", # cmd gmb field
"cal.calRpc.exposing", # curr state gmb field
"CAL LOWFS", # what I'm called
0, # camera number
)
self.howfs = cal_camera(
rpc,
"gpCalRpcClientTester", # binary
15, # cmd: MT_CAL_TAKE_EXP
"cal.calCmd_TAKE_EXP", # cmd gmb field
"cal.calRpc.exposing", # curr state gmb field
"CAL LOWFS", # what I'm called
1, # camera number
)
[docs]class gpi_camera:
"""Generic camera object
Args:
rpc (:py:class:`~gpilib2.rpc`):
rpc object. sim status and verbosity will be set
based on its settings.
binary (str):
binary name
move_cmd (int):
Command number in binary to use
move_gmb_field (str):
GMB field prefix corresponding to the move command
curr_state_gmb_field (str):
GMB field of current camera state
name (str):
Name of this camera
server (str):
Server address to send commands to.
Attributes:
rpc (:py:class:`~gpilib2.rpc`):
rpc object for communications
server (str):
Server address to send commands to.
binary (str):
binary name
move_cmd (int):
Command number in binary to use
move_gmb_field (str):
GMB field prefix corresponding to the move command
curr_state_gmb_field (str):
GMB field of current camera state
name (str):
Name of this camera
"""
def __init__(
self,
rpc: rpc,
binary: str,
move_cmd: int,
move_gmb_field: str,
curr_state_gmb_field: str,
name: str,
server: str = "tlc",
) -> None:
"""Initialize component"""
self.rpc = rpc
self.binary = binary
self.move_cmd = move_cmd
self.move_gmb_field = move_gmb_field
self.curr_state_gmb_field = curr_state_gmb_field
self.name = name
self.server = server
def __str__(self) -> str:
"""Returns camera status string"""
val = self.rpc.read_gmb_values(self.curr_state_gmb_field).astype(int)[0]
return "{0: >30}: {1}".format(self.name, np.array(["Ready", "Exposing"])[val])
[docs]class cal_camera(gpi_camera):
"""CAL camera object
Args:
rpc (:py:class:`~gpilib2.rpc`):
rpc object. sim status and verbosity will be set
based on its settings.
binary (str):
binary name
move_cmd (int):
Command number in binary to use
move_gmb_field (str):
GMB field prefix corresponding to the move command
curr_state_gmb_field (str):
GMB field of current camera state
name (str):
Name of this camera
server (str):
Server address to send commands to.
Attributes:
rpc (:py:class:`~gpilib2.rpc`):
rpc object for communications
server (str):
Server address to send commands to.
binary (str):
binary name
move_cmd (int):
Command number in binary to use
move_gmb_field (str):
GMB field prefix corresponding to the move command
curr_state_gmb_field (str):
GMB field of current camera state
name (str):
Name of this camera
camera_num (int):
Camera number (0 = LOWFS, 1 = HOWFS)
"""
def __init__(
self,
rpc: rpc,
binary: str,
move_cmd: int,
move_gmb_field: str,
curr_state_gmb_field: str,
name: str,
camera_num: int,
server: str = "cal",
) -> None:
"""Initialize component"""
gpi_camera.__init__(
self,
rpc,
binary,
move_cmd,
move_gmb_field,
curr_state_gmb_field,
name,
server=server,
)
self.camera_num = camera_num
# if not simulated, get the data directory
if not (rpc.sim):
cal_root = os.environ["CAL_ROOT"]
assert cal_root is not None, "Environment variable CAL_ROOT must be set"
self.data_dir = os.path.join(cal_root, "data")
validate_directory(self.data_dir, perms="rwx", mkdir=False)
else:
self.data_dir = ""
[docs] def assemble_fname(self) -> str:
"""Assemble full name of next exposure
Args:
None
Returns:
str:
Path to next exposure file relative to the data directory
"""
basename = np.array(["lowfs", "howfs"])[self.camera_num]
outdir = os.path.join(basename.upper(), datetime.now().strftime("%y%m%d"))
fulldir = os.path.join(self.data_dir, outdir)
validate_directory(fulldir, perms="rwx", mode=0o777)
existing_files = glob.glob(os.path.join(fulldir, "{}*.fits".format(basename)))
if len(existing_files) == 0:
fnum = 1
else:
p = re.compile(os.path.join(fulldir, r"{}(\d{{4}}).fits".format(basename)))
fnum = (
np.array([p.match(f).group(1) for f in existing_files]) # type: ignore
.astype(int)
.max()
+ 1
)
fname = "{}{:04d}.fits".format(basename, fnum)
outpath = os.path.join(outdir, fname)
assert not (
os.path.exists(os.path.join(self.data_dir, outpath))
), "{} already exists. Something went wrong.".format(outpath)
return outpath
[docs] def take_exp(
self, return_im: bool = False
) -> Union[str, Tuple[npt.NDArray[np.float_], str]]:
"""Take Exposure
Args:
return_im (bool):
If True, return the image data along with the filename.
Returns:
str or tuple:
str:
Full path to file on disk
numpy.ndarray:
Image data (only if return_im is True)
..warning::
The NFS cache may cause file access to fail. To avoid this, the cal data
volume must be mounted with option ``noac``.
"""
outpath = self.assemble_fname()
cmdlist = [
self.rpc.binaries[self.binary],
self.server,
"{}".format(self.move_cmd),
"{}".format(self.rpc.activityId),
"{}".format(self.camera_num), # camera number
"{}".format(1), # numImages
"{}".format(outpath),
]
self.rpc.execute(cmdlist, self.move_gmb_field)
state = self.rpc.read_gmb_values(self.curr_state_gmb_field).astype(int)[0]
while state == 1:
state == self.rpc.read_gmb_values(self.curr_state_gmb_field).astype(int)[0]
time.sleep(0.5)
fullpath = os.path.join(self.data_dir, outpath)
if not (return_im):
return fullpath
assert os.path.exists(fullpath), "{} not found.".format(fullpath)
with fits.open(fullpath) as hdu:
data = hdu[0].data
return data, fullpath