import numpy as np
import re
import csv
import pkg_resources
from datetime import datetime, timedelta
import os
import shutil
import glob
import warnings
import json
import numpy.typing as npt
from gpilib2.rpc import rpc
from typing import Dict, Any, Optional
from gpilib2.util import gpilib2_data_dir, validate_directory, get_tlc_logdir
import pandas # type: ignore
[docs]class sensors:
"""All GPI sensors (onewire + IFS)
Args:
rpc (:py:class:`gpilib2.rpc`):
rpc object. sim status and verbosity will be set
based on its settings.
Attributes:
rpc (:py:class:`gpilib2.rpc`):
rpc object for communications
sensorfile (str):
Full path to sensors list on disk
sensornames (numpy.ndarray):
Array of all sensor names
sensorfields (numpy.ndarray):
Array of GMB fields corresponding to sensornames
onewire_data (pandas.DataFrame):
oneWire log data (only available after
:py:meth:`~gpilib2.sensors.sensors.get_onewire_logs` is run)
onewire_keys (dict):
Dictionary or sensor names/log ids. (only available after
:py:meth:`~gpilib2.sensors.sensors.get_onewire_logs` is run)
"""
def __init__(self, rpc: rpc) -> None:
"""Define all sensors
Args:
rpc (:py:class:`gpilib2.rpc`):
rpc object. sim status and verbosity will be set
based on its settings.
"""
self.rpc = rpc
self.sensorfile = pkg_resources.resource_filename(
"gpilib2.gpilib2", "sensors.csv"
)
if not os.path.exists(self.sensorfile):
warnings.warn(
(
"File {} not found. You must run update_sensor_list"
"with write permissions to the gpilib2 library path.".format(
self.sensorfile
)
)
)
return
sensorfields = []
sensornames = []
with open(self.sensorfile, "r") as f:
reader = csv.reader(f)
for j in range(2):
next(reader, None) # expect 2 header rows
for row in reader:
sensorfields.append(row[0])
sensornames.append(row[1])
self.sensorfields = np.array(sensorfields)
self.sensornames = np.array(sensornames)
[docs] def update_sensor_list(self) -> None:
"""Query TLC for all oneWire fields with values and descriptions and update
sensor list on disk.
.. warning::
Write permissions required to the gpilib2 install directory. This is
intended for use only in developer installations and only during
integration.
"""
# grab all tlc fields and filter out the oneWire
tlcfields = self.rpc.list_gmb_fields(server="tlc")
# looking for all oneWire fields with values
p = re.compile(r"(tlc\.oneWireAss\.\S+)\.val")
onewirefields = []
for t in tlcfields:
tmp = p.match(t)
if tmp:
onewirefields.append(tmp.group(1))
# now let's get their descriptions
querynames = [o + ".desc" for o in onewirefields]
sensornames = self.rpc.read_gmb_values(querynames)
# we only want those with descriptions
sensorfields = np.array(onewirefields)[sensornames != ""]
sensornames = sensornames[sensornames != ""]
# sensorserials = self.rpc.read_gmb_values([f + ".serialNumStr" for f in sensorfields])
sensorfields = np.array([f + ".val" for f in sensorfields])
# TODO: revisit IFS sensors
# now the ifs ones there are 10 temperature sensors and the 2 cryo tips
ifstempnames = [
"ifs.temperature.sensorLocation{0:02}".format(j) for j in range(1, 11)
]
ifstempfields = [
"ifs.temperature.temperatureInKelvinSensor{0:02}".format(j)
for j in range(1, 11)
]
cryonames = ["Cryo Tip {}".format(j) for j in range(1, 3)]
cryofields = [
"ifs.cryoStat.cryo{}TemperatureVal".format(j) for j in range(1, 3)
]
sensornames = np.hstack(
(sensornames, self.rpc.read_gmb_values(ifstempnames), cryonames)
)
sensorfields = np.hstack(
(sensorfields, np.array(ifstempfields), np.array(cryofields))
)
if os.path.exists(self.sensorfile):
bckfile = pkg_resources.resource_filename(
"gpilib2.gpilib2",
"sensors.bck.{}".format(datetime.now().strftime("%Y%m%d_%H%M%S")),
)
print("Backing up current sensors file to: {}".format(bckfile))
shutil.copyfile(self.sensorfile, bckfile)
print("Updating sensors file: {}".format(self.sensorfile))
with open(self.sensorfile, "w") as f:
f.write(
"#DO NOT EDIT. This file is automatically generated by gpilib2.sensors.update_sensor_list().\n"
)
f.write("#GMB Field, Description\n")
writer = csv.writer(f)
for field, name in zip(sensorfields, sensornames):
writer.writerow([field, name])
self.sensorfields = sensorfields
self.sensornames = sensornames
def __str__(self) -> str:
"""Generate pretty string representation of all sensors"""
res = self.get_values(self.sensornames, exact=True)
keys = list(res.keys())
vals = list(res.values())
mxkeylen = len(max(keys, key=len))
out = ""
for k, v in zip(keys, vals):
out += "{0: >{1}}: {2}\n".format(k, mxkeylen, v)
return out
[docs] def get_values(
self, sensors: npt.ArrayLike, fieldname: bool = False, exact: bool = False
) -> Dict[str, float]:
"""Get sensor values
Args:
sensors (:py:data:`~numpy.typing.ArrayLike`):
Sensors to query
fieldname (bool):
Treat the inputs as exact GMB field names rather than descriptions.
Defaults False. If True, matching is exact on the full field name.
exact (bool):
Only match exact descriptions rather than searching for all matches.
Defaults False
Returns:
dict:
Dictionary of field description:sensor value key:value pairs. All
values will be floats.
Notes:
"""
# First figure out exactly what we're querying
if fieldname:
fields = np.array(sensors, ndmin=1)
names = []
for f in fields:
assert f in self.sensorfields, "{} not in sensor GMB field list."
names.append(self.sensornames[self.sensorfields == f][0])
names = np.array(names) # type: ignore
else:
if exact:
names = np.array(sensors, ndmin=1) # type: ignore
fields = [] # type: ignore
for n in names:
assert n in self.sensornames, "{} not in sensor descriptions."
fields.append(self.sensorfields[self.sensornames == n][0]) # type: ignore
fields = np.array(fields)
else:
tmp = np.array(sensors, ndmin=1)
names = []
fields = [] # type: ignore
for t in tmp:
tmp2 = list(
filter(
re.compile(r"{}".format(t), re.IGNORECASE).search,
self.sensornames,
)
)
if len(tmp2) == 0:
warnings.warn("Query '{}' produced zero matches.".format(t))
else:
names += tmp2
for t2 in tmp2:
fields.append(self.sensorfields[self.sensornames == t2][0]) # type: ignore
names = np.array(names) # type: ignore
fields = np.array(fields)
# now actually do the query
vals = self.rpc.read_gmb_values(fields)
out = {}
for n, v in zip(names, vals.astype(float)):
out[n] = v
return out
[docs] def get_onewire_logs(self) -> None:
"""Reads oneWire logs from TLC cross-mount and updates internal dataframe"""
logs = glob.glob(os.path.join(get_tlc_logdir(), "oneWire*.log"))
onewiredir = os.path.join(gpilib2_data_dir(), "oneWire")
validate_directory(onewiredir, perms="rwx")
colnames = ["time", "id", "value", "alarm", "alarm_type", "err_msg"]
# grab anything existing on disk or create new dataframe
pkl_file = os.path.join(onewiredir, "oneWireLogData.pkl")
if os.path.exists(pkl_file):
data = pandas.read_pickle(pkl_file)
else:
data = pandas.DataFrame(columns=colnames)
# need to check last modification of each file
logdictfile = os.path.join(onewiredir, "oneWireLogs.json")
if os.path.exists(logdictfile):
with open(logdictfile, "r") as f:
logdict = json.load(f)
else:
logdict = {}
for log in logs:
# only bother reading if modified later than last recorded mtime
mtime = os.path.getmtime(log)
if (log not in logdict) or (logdict[log] < mtime):
tmp = pandas.read_csv(log, names=colnames, comment="#")
logdict[log] = mtime
# drop any rows with null ids
tmp = tmp[tmp["id"].notnull()].reset_index(drop=True)
# strip spaces from id col
tmp["id"] = tmp["id"].str.strip()
data = pandas.concat([data, tmp])
# remove duplictes, strip spaces, and updated files on disk
data.sort_values("time", inplace=True)
data.drop_duplicates(inplace=True)
data.reset_index(drop=True, inplace=True)
data.to_pickle(pkl_file)
with open(logdictfile, "w") as f:
f.write(json.dumps(logdict))
# convert time to true time
data["time"] = pandas.to_datetime(data["time"])
self.onewire_data = data
keydictfile = os.path.join(onewiredir, "oneWireKeys.json")
if os.path.exists(keydictfile):
with open(keydictfile, "r") as f:
keydict = json.load(f)
else:
keyp = re.compile(r"#\s*(\S*) - (.*)")
keydict = {}
with open(logs[0], "r") as f:
txt = f.readlines()
for t in txt:
tmp = keyp.match(t)
if tmp:
keydict[tmp.group(2)] = tmp.group(1)
with open(keydictfile, "w") as f:
f.write(json.dumps(keydict))
self.onewire_keys = keydict
[docs] def query_onewire_logs(
self,
names: npt.ArrayLike,
before: Optional[str] = None,
after: Optional[str] = None,
last_hours: Optional[float] = None,
force_update: bool = False,
) -> Dict[str, npt.NDArray[np.float_]]:
"""Retrieve data from oneWire logs
Args:
names (str or list or numpy.array):
Names or search patterns of sensors to retrieve. For full list of names
see ``sensors.onewire_keys.keys()``
before (str or None):
Return data before this date/time. Can be: "YYYY-MM-DD" or
"YYYY-MM-DD hh:mm:ss"
after (str or None):
Return data after this date time. Can be: "YYYY-MM-DD" or
"YYYY-MM-DD hh:mm:ss"
last_hours (float or None):
Query data from the last last_hours. If set, before and after are
ignored.
force_update (bool):
If True, re-read log data, even if it has already been loaded.
Defaults False
.. note::
The names input can be an exact sensor name as found in
``sensors.onewire_keys.keys()`` or a case insensitive part of the name, or
any valid regular expression. For example, input ``ccr`` will match sensors
``CCR Body temperature, IFS CCR Glycol Output Temp, IFS CCR Glycol Input Temp``
while input ``ccr.*body`` will match only ``CCR Body temperature``.
"""
if not (hasattr(self, "onewire_data")) or force_update:
self.get_onewire_logs()
names = np.array(names, ndmin=1).astype(str)
sensors = []
for n in names:
sensors += list(
filter(
re.compile(r"{}".format(n), re.IGNORECASE).search,
self.onewire_keys.keys(),
)
)
assert len(sensors) > 0, "Could not match any sensors to inputs."
# filter by time
inds = self.onewire_data["time"].notnull()
if (last_hours is not None) and ((before is not None) or (after is not None)):
warnings.warn(
"When last_hours is set, before and after inputs are ignored."
)
if last_hours is not None:
inds = inds & (
self.onewire_data["time"]
> (datetime.now() - timedelta(hours=last_hours))
)
else:
if before is not None:
inds = inds & (self.onewire_data["time"] < before)
if after is not None:
inds = inds & (self.onewire_data["time"] > after)
data2 = self.onewire_data.loc[inds]
out = {}
for s in sensors:
out[s] = data2.loc[
data2["id"] == self.onewire_keys[s], ["time", "value"]
].values
return out