import numpy as np
import re
import numpy.typing as npt
import time
import subprocess
from typing import List, Tuple
from gpilib2.sensors import sensors
[docs]class powerbars:
"""All GPI powerbars and outlets
Args:
sensors (:py:class:`gpilib2.sensors`):
sensors object. sim status and verbosity will be set
based on the sensors.rpc settings.
Attributes:
sensors (:py:class:`gpilib2.sensors`):
Sensors object used for safety checks.
rpc (:py:class:`gpilib2.rpc`):
Pointer to ``self.sensors.rpc``. rpc object for all communications.
npowbar (int):
Number of powerbars (4).
noutlet (int):
Number of outlets per powerbar (8).
pbarre (:py:class:`re.Pattern`):
regex that can extract numerical values from powerbar GMB fieldnames
outletfields (~numpy.ndarray):
32-element array of all outlet GMB field names
outletnums (~numpy.ndarray):
32x2 array of all powerbar/outlet command numbers
.. note::
There are 4 powerbars with 8 outlets each. The GMB numbers these with
0-based indexing (so that Powerbar 3 Outlet 6 is
'tlc.powBarAss.bar[2].outlet[5]' but the commands and GUIs use 1-based indexing
(so you would command 3,6 for the same outlet). Annoying. Because of the
danger of using stale powerbar name info, current outlet names are queried at
every instantiation of this class.
"""
def __init__(self, sensors: sensors) -> None:
"""Define all powerbars and outlets
Args:
sensors (gpilib2.sensors):
sensors object. sim status and verbosity will be set
based on the sensors.rpc settings.
"""
self.sensors = sensors
self.rpc = self.sensors.rpc
self.npowbar = 4
self.noutlet = 8
self.pbarre = re.compile(r"tlc\.powBarAss\.bar\[(\d)\].outlet\[(\d)\]")
# synthesize outlet numbers and GMB fields and query their current names
outletfields = []
outletnums = []
for ii in range(self.npowbar):
for jj in range(self.noutlet):
outletfields.append("tlc.powBarAss.bar[{}].outlet[{}]".format(ii, jj))
outletnums.append((ii + 1, jj + 1))
self.outletfields = np.array(outletfields)
self.outletnums = np.array(outletnums)
self.update_outlet_names()
[docs] def update_outlet_names(self) -> None:
"""Query and save the names of all outlets"""
# synthesize queries
namefields = [n + ".name" for n in self.outletfields]
self.outletnames = self.rpc.read_gmb_values(namefields)
def __str__(self) -> str:
"""Generate pretty string representation of powerbars and outlets"""
nstates = self.get_outlet_state(self.outletfields).reshape(
self.npowbar, self.noutlet
)
names = self.outletnames.reshape(self.npowbar, self.noutlet)
states = np.zeros(nstates.shape, dtype="<U3")
states[nstates == 0] = "ON"
states[nstates == 1] = "OFF"
# find longest name in each powerbar
mxlens = [len(max(n, key=len)) + 1 for n in names]
hline = "-" * (sum(mxlens) + 7 + 16) + "\n"
out = hline + "| "
for j, l in enumerate(mxlens):
out += "{0: >{1}}|".format(j + 1, l + 4)
out += "\n" + hline
for j, (o, ss) in enumerate(zip(names.transpose(), states.transpose())):
out += "|{}|".format(j + 1)
for l, n, s in zip(mxlens, o, ss):
out += "{0: >{1}}:{2: >3}|".format(n, l, s)
out += "\n"
out += hline
return out
[docs] def identify_outlet_by_name(
self, names: npt.ArrayLike
) -> Tuple[List[str], List[str], npt.NDArray[np.int_]]:
"""Identify outlet/powerbar locations by name
Args:
names (:py:data:`~numpy.typing.ArrayLike`):
Name strings (see note)
Returns:
tuple:
list:
Exact names of outlets requested.
list:
Sub-array of self.outletfields corresponding to queried names
~numpy.ndarray:
Sub-array of self.outletnums corresponding to queried names
.. note::
Name matching must be unique to prevent accidentally toggling unintended
outlets. Names can be partial, but must uniquely identify the outlet (i.e.,
'CAL' is not acceptable as it matches 5 different devices).
"""
names = np.array(names, ndmin=1)
outnames = []
fields = []
nums = []
for n in names:
tmp = list(
filter(
re.compile(r"{}".format(n), re.IGNORECASE).search, self.outletnames
)
)
assert len(tmp) == 1, "{} is vague, matching devices:{}".format(
n, ", ".join(tmp)
)
outnames.append(tmp[0])
fields.append(self.outletfields[self.outletnames == tmp[0]][0])
nums.append(self.outletnums[self.outletnames == tmp[0]][0])
return outnames, fields, np.array(nums)
[docs] def get_outlet_state(self, fields: npt.ArrayLike) -> npt.NDArray[np.int_]:
"""Query the current outlet state
Args:
fields (:py:data:`~numpy.typing.ArrayLike`):
Outlet fields to query
Returns:
~numpy.ndarray:
Current states of outlets. 1 = OFF, 0 = ON, 2 = REBOOT
Notes:
"""
statefields = [f + ".state" for f in np.array(fields, ndmin=1)]
return self.rpc.read_gmb_values(statefields).astype(int)
[docs] def send_powerbar_command(self, nums: npt.NDArray[np.int_], state: int) -> None:
"""Wrapper to gpPowBarAssClientTester and helper method for on/off/reboot
Args:
nums (~numpy.ndarray):
n x 2 array of powerbar/outlet numbers
state (int):
0: On
1: Off
2: Reboot
Returns:
None
Notes:
Remember, the commands and GUIs use 1-based, so the numbers here must be
1-based. So powerbar 3, outlet 6 is [3,6], etc. See also notes in
__init__.
"""
# now we can assemble the commands
for n in nums:
# cmd=1, id=-1, dir=1, mode=1, level=0, powbar, outlet, state
cmdlist = [
self.rpc.binaries["gpPowBarAssClientTester"],
"tlc",
"1",
"-1",
"1",
"1",
"0",
"{}".format(n[0]),
"{}".format(n[1]),
"{}".format(state),
]
_ = subprocess.check_output(cmdlist, stderr=subprocess.STDOUT)
# now wait until completion
state = 1
while state == 1:
state = int(
self.rpc.read_gmb_value(
"tlc.powBarAss.powBarAssCmd_POWER_OUTLET.subCmd.state" # type: ignore
)
)
if self.rpc.verb:
print("powBarAssCmd_POWER_OUTLET.subCmd.state = {}".format(state))
time.sleep(0.5)
[docs] def on(self, names: npt.ArrayLike) -> None:
"""Toggle on outlets by name
Args:
names (:py:data:`~numpy.typing.ArrayLike`):
Name strings (see notes in identify_outlet_by_name)
Returns:
None
"""
# Before doing anything else, make sure the input air is below 35 degrees
heatexchintemp = list(
self.sensors.get_values(
"Max. heat exchanger input air temperature"
).values()
)[0]
assert heatexchintemp < 35, (
"Heat Exchanger Inpute= Temperature is >35 " "degrees. Aborting."
)
names, fields, nums = self.identify_outlet_by_name(names)
# If we're touching the MEMS, let's check the humidity first.
if "MEMS" in names:
memshum = list(
self.sensors.get_values("Max. MEMS relative humidity").values()
)[0]
assert memshum < 15, "MEMS relative humidity is > 15%. Aborting."
states = self.get_outlet_state(fields)
if np.any(states == 0):
print("Already on: {}".format(", ".join(np.array(names)[states == 0])))
if np.all(states == 0):
return
self.send_powerbar_command(nums[states == 1], 0)
# make sure it worked
states = self.get_outlet_state(fields)
assert np.all(states == 0), "Not all requested outlets turned on."
[docs] def off(self, names: npt.ArrayLike) -> None:
"""Toggle off outlets by name
Args:
names (:py:data:`~numpy.typing.ArrayLike`):
Name strings (see notes in identify_outlet_by_name)
Returns:
None
"""
# TODO: Add checks for loops closed, somehow check for signal on MEMS?
names, fields, nums = self.identify_outlet_by_name(names)
states = self.get_outlet_state(fields)
if np.any(states == 1):
print("Already off: {}".format(", ".join(np.array(names)[states == 1])))
if np.all(states == 1):
return
self.send_powerbar_command(nums[states == 0], 1)
# make sure it worked
states = self.get_outlet_state(fields)
assert np.all(states == 1), "Not all requested outlets turned off."
[docs] def reboot(self, names: npt.ArrayLike) -> None:
"""Reboot outlets by name
Args:
names (:py:data:`~numpy.typing.ArrayLike`):
Name strings (see notes in identify_outlet_by_name)
Returns:
None
"""
names, fields, nums = self.identify_outlet_by_name(names)
states = self.get_outlet_state(fields)
if np.any(states == 1):
print(
"{} off. Not rebooting.".format(", ".join(np.array(names)[states == 1]))
)
if np.all(states == 1):
return
self.send_powerbar_command(nums[states == 0], 2)
# TODO: Add checks for loops closed, somehow check for signal on MEMS?