Source code for gpilib2.sensors

import numpy as np
import re
import csv
import pkg_resources
from datetime import datetime, timedelta
import os
import shutil
import glob
import warnings
import json
import numpy.typing as npt
from gpilib2.rpc import rpc
from typing import Dict, Any, Optional
from gpilib2.util import gpilib2_data_dir, validate_directory, get_tlc_logdir
import pandas  # type: ignore


[docs]class sensors: """All GPI sensors (onewire + IFS) Args: rpc (:py:class:`gpilib2.rpc`): rpc object. sim status and verbosity will be set based on its settings. Attributes: rpc (:py:class:`gpilib2.rpc`): rpc object for communications sensorfile (str): Full path to sensors list on disk sensornames (numpy.ndarray): Array of all sensor names sensorfields (numpy.ndarray): Array of GMB fields corresponding to sensornames onewire_data (pandas.DataFrame): oneWire log data (only available after :py:meth:`~gpilib2.sensors.sensors.get_onewire_logs` is run) onewire_keys (dict): Dictionary or sensor names/log ids. (only available after :py:meth:`~gpilib2.sensors.sensors.get_onewire_logs` is run) """ def __init__(self, rpc: rpc) -> None: """Define all sensors Args: rpc (:py:class:`gpilib2.rpc`): rpc object. sim status and verbosity will be set based on its settings. """ self.rpc = rpc self.sensorfile = pkg_resources.resource_filename( "gpilib2.gpilib2", "sensors.csv" ) if not os.path.exists(self.sensorfile): warnings.warn( ( "File {} not found. You must run update_sensor_list" "with write permissions to the gpilib2 library path.".format( self.sensorfile ) ) ) return sensorfields = [] sensornames = [] with open(self.sensorfile, "r") as f: reader = csv.reader(f) for j in range(2): next(reader, None) # expect 2 header rows for row in reader: sensorfields.append(row[0]) sensornames.append(row[1]) self.sensorfields = np.array(sensorfields) self.sensornames = np.array(sensornames)
[docs] def update_sensor_list(self) -> None: """Query TLC for all oneWire fields with values and descriptions and update sensor list on disk. .. warning:: Write permissions required to the gpilib2 install directory. This is intended for use only in developer installations and only during integration. """ # grab all tlc fields and filter out the oneWire tlcfields = self.rpc.list_gmb_fields(server="tlc") # looking for all oneWire fields with values p = re.compile(r"(tlc\.oneWireAss\.\S+)\.val") onewirefields = [] for t in tlcfields: tmp = p.match(t) if tmp: onewirefields.append(tmp.group(1)) # now let's get their descriptions querynames = [o + ".desc" for o in onewirefields] sensornames = self.rpc.read_gmb_values(querynames) # we only want those with descriptions sensorfields = np.array(onewirefields)[sensornames != ""] sensornames = sensornames[sensornames != ""] # sensorserials = self.rpc.read_gmb_values([f + ".serialNumStr" for f in sensorfields]) sensorfields = np.array([f + ".val" for f in sensorfields]) # TODO: revisit IFS sensors # now the ifs ones there are 10 temperature sensors and the 2 cryo tips ifstempnames = [ "ifs.temperature.sensorLocation{0:02}".format(j) for j in range(1, 11) ] ifstempfields = [ "ifs.temperature.temperatureInKelvinSensor{0:02}".format(j) for j in range(1, 11) ] cryonames = ["Cryo Tip {}".format(j) for j in range(1, 3)] cryofields = [ "ifs.cryoStat.cryo{}TemperatureVal".format(j) for j in range(1, 3) ] sensornames = np.hstack( (sensornames, self.rpc.read_gmb_values(ifstempnames), cryonames) ) sensorfields = np.hstack( (sensorfields, np.array(ifstempfields), np.array(cryofields)) ) if os.path.exists(self.sensorfile): bckfile = pkg_resources.resource_filename( "gpilib2.gpilib2", "sensors.bck.{}".format(datetime.now().strftime("%Y%m%d_%H%M%S")), ) print("Backing up current sensors file to: {}".format(bckfile)) shutil.copyfile(self.sensorfile, bckfile) print("Updating sensors file: {}".format(self.sensorfile)) with open(self.sensorfile, "w") as f: f.write( "#DO NOT EDIT. This file is automatically generated by gpilib2.sensors.update_sensor_list().\n" ) f.write("#GMB Field, Description\n") writer = csv.writer(f) for field, name in zip(sensorfields, sensornames): writer.writerow([field, name]) self.sensorfields = sensorfields self.sensornames = sensornames
def __str__(self) -> str: """Generate pretty string representation of all sensors""" res = self.get_values(self.sensornames, exact=True) keys = list(res.keys()) vals = list(res.values()) mxkeylen = len(max(keys, key=len)) out = "" for k, v in zip(keys, vals): out += "{0: >{1}}: {2}\n".format(k, mxkeylen, v) return out
[docs] def get_values( self, sensors: npt.ArrayLike, fieldname: bool = False, exact: bool = False ) -> Dict[str, float]: """Get sensor values Args: sensors (:py:data:`~numpy.typing.ArrayLike`): Sensors to query fieldname (bool): Treat the inputs as exact GMB field names rather than descriptions. Defaults False. If True, matching is exact on the full field name. exact (bool): Only match exact descriptions rather than searching for all matches. Defaults False Returns: dict: Dictionary of field description:sensor value key:value pairs. All values will be floats. Notes: """ # First figure out exactly what we're querying if fieldname: fields = np.array(sensors, ndmin=1) names = [] for f in fields: assert f in self.sensorfields, "{} not in sensor GMB field list." names.append(self.sensornames[self.sensorfields == f][0]) names = np.array(names) # type: ignore else: if exact: names = np.array(sensors, ndmin=1) # type: ignore fields = [] # type: ignore for n in names: assert n in self.sensornames, "{} not in sensor descriptions." fields.append(self.sensorfields[self.sensornames == n][0]) # type: ignore fields = np.array(fields) else: tmp = np.array(sensors, ndmin=1) names = [] fields = [] # type: ignore for t in tmp: tmp2 = list( filter( re.compile(r"{}".format(t), re.IGNORECASE).search, self.sensornames, ) ) if len(tmp2) == 0: warnings.warn("Query '{}' produced zero matches.".format(t)) else: names += tmp2 for t2 in tmp2: fields.append(self.sensorfields[self.sensornames == t2][0]) # type: ignore names = np.array(names) # type: ignore fields = np.array(fields) # now actually do the query vals = self.rpc.read_gmb_values(fields) out = {} for n, v in zip(names, vals.astype(float)): out[n] = v return out
[docs] def get_onewire_logs(self) -> None: """Reads oneWire logs from TLC cross-mount and updates internal dataframe""" logs = glob.glob(os.path.join(get_tlc_logdir(), "oneWire*.log")) onewiredir = os.path.join(gpilib2_data_dir(), "oneWire") validate_directory(onewiredir, perms="rwx") colnames = ["time", "id", "value", "alarm", "alarm_type", "err_msg"] # grab anything existing on disk or create new dataframe pkl_file = os.path.join(onewiredir, "oneWireLogData.pkl") if os.path.exists(pkl_file): data = pandas.read_pickle(pkl_file) else: data = pandas.DataFrame(columns=colnames) # need to check last modification of each file logdictfile = os.path.join(onewiredir, "oneWireLogs.json") if os.path.exists(logdictfile): with open(logdictfile, "r") as f: logdict = json.load(f) else: logdict = {} for log in logs: # only bother reading if modified later than last recorded mtime mtime = os.path.getmtime(log) if (log not in logdict) or (logdict[log] < mtime): tmp = pandas.read_csv(log, names=colnames, comment="#") logdict[log] = mtime # drop any rows with null ids tmp = tmp[tmp["id"].notnull()].reset_index(drop=True) # strip spaces from id col tmp["id"] = tmp["id"].str.strip() data = pandas.concat([data, tmp]) # remove duplictes, strip spaces, and updated files on disk data.sort_values("time", inplace=True) data.drop_duplicates(inplace=True) data.reset_index(drop=True, inplace=True) data.to_pickle(pkl_file) with open(logdictfile, "w") as f: f.write(json.dumps(logdict)) # convert time to true time data["time"] = pandas.to_datetime(data["time"]) self.onewire_data = data keydictfile = os.path.join(onewiredir, "oneWireKeys.json") if os.path.exists(keydictfile): with open(keydictfile, "r") as f: keydict = json.load(f) else: keyp = re.compile(r"#\s*(\S*) - (.*)") keydict = {} with open(logs[0], "r") as f: txt = f.readlines() for t in txt: tmp = keyp.match(t) if tmp: keydict[tmp.group(2)] = tmp.group(1) with open(keydictfile, "w") as f: f.write(json.dumps(keydict)) self.onewire_keys = keydict
[docs] def query_onewire_logs( self, names: npt.ArrayLike, before: Optional[str] = None, after: Optional[str] = None, last_hours: Optional[float] = None, force_update: bool = False, ) -> Dict[str, npt.NDArray[np.float_]]: """Retrieve data from oneWire logs Args: names (str or list or numpy.array): Names or search patterns of sensors to retrieve. For full list of names see ``sensors.onewire_keys.keys()`` before (str or None): Return data before this date/time. Can be: "YYYY-MM-DD" or "YYYY-MM-DD hh:mm:ss" after (str or None): Return data after this date time. Can be: "YYYY-MM-DD" or "YYYY-MM-DD hh:mm:ss" last_hours (float or None): Query data from the last last_hours. If set, before and after are ignored. force_update (bool): If True, re-read log data, even if it has already been loaded. Defaults False .. note:: The names input can be an exact sensor name as found in ``sensors.onewire_keys.keys()`` or a case insensitive part of the name, or any valid regular expression. For example, input ``ccr`` will match sensors ``CCR Body temperature, IFS CCR Glycol Output Temp, IFS CCR Glycol Input Temp`` while input ``ccr.*body`` will match only ``CCR Body temperature``. """ if not (hasattr(self, "onewire_data")) or force_update: self.get_onewire_logs() names = np.array(names, ndmin=1).astype(str) sensors = [] for n in names: sensors += list( filter( re.compile(r"{}".format(n), re.IGNORECASE).search, self.onewire_keys.keys(), ) ) assert len(sensors) > 0, "Could not match any sensors to inputs." # filter by time inds = self.onewire_data["time"].notnull() if (last_hours is not None) and ((before is not None) or (after is not None)): warnings.warn( "When last_hours is set, before and after inputs are ignored." ) if last_hours is not None: inds = inds & ( self.onewire_data["time"] > (datetime.now() - timedelta(hours=last_hours)) ) else: if before is not None: inds = inds & (self.onewire_data["time"] < before) if after is not None: inds = inds & (self.onewire_data["time"] > after) data2 = self.onewire_data.loc[inds] out = {} for s in sensors: out[s] = data2.loc[ data2["id"] == self.onewire_keys[s], ["time", "value"] ].values return out