Source code for gpilib2.cameras

from gpilib2.rpc import rpc
import numpy as np
from gpilib2.util import validate_directory
import os
import glob
from datetime import datetime
import re
import time
from astropy.io import fits  # type: ignore
from typing import Union, Tuple
import numpy.typing as npt


[docs]class cameras: """All GPI cameras Args: rpc (:py:class:`~gpilib2.rpc`): rpc object. sim status and verbosity will be set based on its settings. Attributes: rpc (:py:class:`~gpilib2.rpc`): rpc object for communications """ def __init__(self, rpc: rpc) -> None: """Define all components Args: rpc (:py:class:`~gpilib2.rpc`): rpc object. sim status and verbosity will be set based on its settings. """ self.rpc = rpc self.lowfs = cal_camera( rpc, "gpCalRpcClientTester", # binary 15, # cmd: MT_CAL_TAKE_EXP "cal.calCmd_TAKE_EXP", # cmd gmb field "cal.calRpc.exposing", # curr state gmb field "CAL LOWFS", # what I'm called 0, # camera number ) self.howfs = cal_camera( rpc, "gpCalRpcClientTester", # binary 15, # cmd: MT_CAL_TAKE_EXP "cal.calCmd_TAKE_EXP", # cmd gmb field "cal.calRpc.exposing", # curr state gmb field "CAL LOWFS", # what I'm called 1, # camera number )
[docs]class gpi_camera: """Generic camera object Args: rpc (:py:class:`~gpilib2.rpc`): rpc object. sim status and verbosity will be set based on its settings. binary (str): binary name move_cmd (int): Command number in binary to use move_gmb_field (str): GMB field prefix corresponding to the move command curr_state_gmb_field (str): GMB field of current camera state name (str): Name of this camera server (str): Server address to send commands to. Attributes: rpc (:py:class:`~gpilib2.rpc`): rpc object for communications server (str): Server address to send commands to. binary (str): binary name move_cmd (int): Command number in binary to use move_gmb_field (str): GMB field prefix corresponding to the move command curr_state_gmb_field (str): GMB field of current camera state name (str): Name of this camera """ def __init__( self, rpc: rpc, binary: str, move_cmd: int, move_gmb_field: str, curr_state_gmb_field: str, name: str, server: str = "tlc", ) -> None: """Initialize component""" self.rpc = rpc self.binary = binary self.move_cmd = move_cmd self.move_gmb_field = move_gmb_field self.curr_state_gmb_field = curr_state_gmb_field self.name = name self.server = server def __str__(self) -> str: """Returns camera status string""" val = self.rpc.read_gmb_values(self.curr_state_gmb_field).astype(int)[0] return "{0: >30}: {1}".format(self.name, np.array(["Ready", "Exposing"])[val])
[docs]class cal_camera(gpi_camera): """CAL camera object Args: rpc (:py:class:`~gpilib2.rpc`): rpc object. sim status and verbosity will be set based on its settings. binary (str): binary name move_cmd (int): Command number in binary to use move_gmb_field (str): GMB field prefix corresponding to the move command curr_state_gmb_field (str): GMB field of current camera state name (str): Name of this camera server (str): Server address to send commands to. Attributes: rpc (:py:class:`~gpilib2.rpc`): rpc object for communications server (str): Server address to send commands to. binary (str): binary name move_cmd (int): Command number in binary to use move_gmb_field (str): GMB field prefix corresponding to the move command curr_state_gmb_field (str): GMB field of current camera state name (str): Name of this camera camera_num (int): Camera number (0 = LOWFS, 1 = HOWFS) """ def __init__( self, rpc: rpc, binary: str, move_cmd: int, move_gmb_field: str, curr_state_gmb_field: str, name: str, camera_num: int, server: str = "cal", ) -> None: """Initialize component""" gpi_camera.__init__( self, rpc, binary, move_cmd, move_gmb_field, curr_state_gmb_field, name, server=server, ) self.camera_num = camera_num # if not simulated, get the data directory if not (rpc.sim): cal_root = os.environ["CAL_ROOT"] assert cal_root is not None, "Environment variable CAL_ROOT must be set" self.data_dir = os.path.join(cal_root, "data") validate_directory(self.data_dir, perms="rwx", mkdir=False) else: self.data_dir = ""
[docs] def assemble_fname(self) -> str: """Assemble full name of next exposure Args: None Returns: str: Path to next exposure file relative to the data directory """ basename = np.array(["lowfs", "howfs"])[self.camera_num] outdir = os.path.join(basename.upper(), datetime.now().strftime("%y%m%d")) fulldir = os.path.join(self.data_dir, outdir) validate_directory(fulldir, perms="rwx", mode=0o777) existing_files = glob.glob(os.path.join(fulldir, "{}*.fits".format(basename))) if len(existing_files) == 0: fnum = 1 else: p = re.compile(os.path.join(fulldir, r"{}(\d{{4}}).fits".format(basename))) fnum = ( np.array([p.match(f).group(1) for f in existing_files]) # type: ignore .astype(int) .max() + 1 ) fname = "{}{:04d}.fits".format(basename, fnum) outpath = os.path.join(outdir, fname) assert not ( os.path.exists(os.path.join(self.data_dir, outpath)) ), "{} already exists. Something went wrong.".format(outpath) return outpath
[docs] def take_exp( self, return_im: bool = False ) -> Union[str, Tuple[npt.NDArray[np.float_], str]]: """Take Exposure Args: return_im (bool): If True, return the image data along with the filename. Returns: str or tuple: str: Full path to file on disk numpy.ndarray: Image data (only if return_im is True) ..warning:: The NFS cache may cause file access to fail. To avoid this, the cal data volume must be mounted with option ``noac``. """ outpath = self.assemble_fname() cmdlist = [ self.rpc.binaries[self.binary], self.server, "{}".format(self.move_cmd), "{}".format(self.rpc.activityId), "{}".format(self.camera_num), # camera number "{}".format(1), # numImages "{}".format(outpath), ] self.rpc.execute(cmdlist, self.move_gmb_field) state = self.rpc.read_gmb_values(self.curr_state_gmb_field).astype(int)[0] while state == 1: state == self.rpc.read_gmb_values(self.curr_state_gmb_field).astype(int)[0] time.sleep(0.5) fullpath = os.path.join(self.data_dir, outpath) if not (return_im): return fullpath assert os.path.exists(fullpath), "{} not found.".format(fullpath) with fits.open(fullpath) as hdu: data = hdu[0].data return data, fullpath