Source code for gpilib2.powerbars

import numpy as np
import re
import numpy.typing as npt
import time
import subprocess
from typing import List, Tuple
from gpilib2.sensors import sensors


[docs]class powerbars: """All GPI powerbars and outlets Args: sensors (:py:class:`gpilib2.sensors`): sensors object. sim status and verbosity will be set based on the sensors.rpc settings. Attributes: sensors (:py:class:`gpilib2.sensors`): Sensors object used for safety checks. rpc (:py:class:`gpilib2.rpc`): Pointer to ``self.sensors.rpc``. rpc object for all communications. npowbar (int): Number of powerbars (4). noutlet (int): Number of outlets per powerbar (8). pbarre (:py:class:`re.Pattern`): regex that can extract numerical values from powerbar GMB fieldnames outletfields (~numpy.ndarray): 32-element array of all outlet GMB field names outletnums (~numpy.ndarray): 32x2 array of all powerbar/outlet command numbers .. note:: There are 4 powerbars with 8 outlets each. The GMB numbers these with 0-based indexing (so that Powerbar 3 Outlet 6 is 'tlc.powBarAss.bar[2].outlet[5]' but the commands and GUIs use 1-based indexing (so you would command 3,6 for the same outlet). Annoying. Because of the danger of using stale powerbar name info, current outlet names are queried at every instantiation of this class. """ def __init__(self, sensors: sensors) -> None: """Define all powerbars and outlets Args: sensors (gpilib2.sensors): sensors object. sim status and verbosity will be set based on the sensors.rpc settings. """ self.sensors = sensors self.rpc = self.sensors.rpc self.npowbar = 4 self.noutlet = 8 self.pbarre = re.compile(r"tlc\.powBarAss\.bar\[(\d)\].outlet\[(\d)\]") # synthesize outlet numbers and GMB fields and query their current names outletfields = [] outletnums = [] for ii in range(self.npowbar): for jj in range(self.noutlet): outletfields.append("tlc.powBarAss.bar[{}].outlet[{}]".format(ii, jj)) outletnums.append((ii + 1, jj + 1)) self.outletfields = np.array(outletfields) self.outletnums = np.array(outletnums) self.update_outlet_names()
[docs] def update_outlet_names(self) -> None: """Query and save the names of all outlets""" # synthesize queries namefields = [n + ".name" for n in self.outletfields] self.outletnames = self.rpc.read_gmb_values(namefields)
def __str__(self) -> str: """Generate pretty string representation of powerbars and outlets""" nstates = self.get_outlet_state(self.outletfields).reshape( self.npowbar, self.noutlet ) names = self.outletnames.reshape(self.npowbar, self.noutlet) states = np.zeros(nstates.shape, dtype="<U3") states[nstates == 0] = "ON" states[nstates == 1] = "OFF" # find longest name in each powerbar mxlens = [len(max(n, key=len)) + 1 for n in names] hline = "-" * (sum(mxlens) + 7 + 16) + "\n" out = hline + "| " for j, l in enumerate(mxlens): out += "{0: >{1}}|".format(j + 1, l + 4) out += "\n" + hline for j, (o, ss) in enumerate(zip(names.transpose(), states.transpose())): out += "|{}|".format(j + 1) for l, n, s in zip(mxlens, o, ss): out += "{0: >{1}}:{2: >3}|".format(n, l, s) out += "\n" out += hline return out
[docs] def identify_outlet_by_name( self, names: npt.ArrayLike ) -> Tuple[List[str], List[str], npt.NDArray[np.int_]]: """Identify outlet/powerbar locations by name Args: names (:py:data:`~numpy.typing.ArrayLike`): Name strings (see note) Returns: tuple: list: Exact names of outlets requested. list: Sub-array of self.outletfields corresponding to queried names ~numpy.ndarray: Sub-array of self.outletnums corresponding to queried names .. note:: Name matching must be unique to prevent accidentally toggling unintended outlets. Names can be partial, but must uniquely identify the outlet (i.e., 'CAL' is not acceptable as it matches 5 different devices). """ names = np.array(names, ndmin=1) outnames = [] fields = [] nums = [] for n in names: tmp = list( filter( re.compile(r"{}".format(n), re.IGNORECASE).search, self.outletnames ) ) assert len(tmp) == 1, "{} is vague, matching devices:{}".format( n, ", ".join(tmp) ) outnames.append(tmp[0]) fields.append(self.outletfields[self.outletnames == tmp[0]][0]) nums.append(self.outletnums[self.outletnames == tmp[0]][0]) return outnames, fields, np.array(nums)
[docs] def get_outlet_state(self, fields: npt.ArrayLike) -> npt.NDArray[np.int_]: """Query the current outlet state Args: fields (:py:data:`~numpy.typing.ArrayLike`): Outlet fields to query Returns: ~numpy.ndarray: Current states of outlets. 1 = OFF, 0 = ON, 2 = REBOOT Notes: """ statefields = [f + ".state" for f in np.array(fields, ndmin=1)] return self.rpc.read_gmb_values(statefields).astype(int)
[docs] def send_powerbar_command(self, nums: npt.NDArray[np.int_], state: int) -> None: """Wrapper to gpPowBarAssClientTester and helper method for on/off/reboot Args: nums (~numpy.ndarray): n x 2 array of powerbar/outlet numbers state (int): 0: On 1: Off 2: Reboot Returns: None Notes: Remember, the commands and GUIs use 1-based, so the numbers here must be 1-based. So powerbar 3, outlet 6 is [3,6], etc. See also notes in __init__. """ # now we can assemble the commands for n in nums: # cmd=1, id=-1, dir=1, mode=1, level=0, powbar, outlet, state cmdlist = [ self.rpc.binaries["gpPowBarAssClientTester"], "tlc", "1", "-1", "1", "1", "0", "{}".format(n[0]), "{}".format(n[1]), "{}".format(state), ] _ = subprocess.check_output(cmdlist, stderr=subprocess.STDOUT) # now wait until completion state = 1 while state == 1: state = int( self.rpc.read_gmb_value( "tlc.powBarAss.powBarAssCmd_POWER_OUTLET.subCmd.state" # type: ignore ) ) if self.rpc.verb: print("powBarAssCmd_POWER_OUTLET.subCmd.state = {}".format(state)) time.sleep(0.5)
[docs] def on(self, names: npt.ArrayLike) -> None: """Toggle on outlets by name Args: names (:py:data:`~numpy.typing.ArrayLike`): Name strings (see notes in identify_outlet_by_name) Returns: None """ # Before doing anything else, make sure the input air is below 35 degrees heatexchintemp = list( self.sensors.get_values( "Max. heat exchanger input air temperature" ).values() )[0] assert heatexchintemp < 35, ( "Heat Exchanger Inpute= Temperature is >35 " "degrees. Aborting." ) names, fields, nums = self.identify_outlet_by_name(names) # If we're touching the MEMS, let's check the humidity first. if "MEMS" in names: memshum = list( self.sensors.get_values("Max. MEMS relative humidity").values() )[0] assert memshum < 15, "MEMS relative humidity is > 15%. Aborting." states = self.get_outlet_state(fields) if np.any(states == 0): print("Already on: {}".format(", ".join(np.array(names)[states == 0]))) if np.all(states == 0): return self.send_powerbar_command(nums[states == 1], 0) # make sure it worked states = self.get_outlet_state(fields) assert np.all(states == 0), "Not all requested outlets turned on."
[docs] def off(self, names: npt.ArrayLike) -> None: """Toggle off outlets by name Args: names (:py:data:`~numpy.typing.ArrayLike`): Name strings (see notes in identify_outlet_by_name) Returns: None """ # TODO: Add checks for loops closed, somehow check for signal on MEMS? names, fields, nums = self.identify_outlet_by_name(names) states = self.get_outlet_state(fields) if np.any(states == 1): print("Already off: {}".format(", ".join(np.array(names)[states == 1]))) if np.all(states == 1): return self.send_powerbar_command(nums[states == 0], 1) # make sure it worked states = self.get_outlet_state(fields) assert np.all(states == 1), "Not all requested outlets turned off."
[docs] def reboot(self, names: npt.ArrayLike) -> None: """Reboot outlets by name Args: names (:py:data:`~numpy.typing.ArrayLike`): Name strings (see notes in identify_outlet_by_name) Returns: None """ names, fields, nums = self.identify_outlet_by_name(names) states = self.get_outlet_state(fields) if np.any(states == 1): print( "{} off. Not rebooting.".format(", ".join(np.array(names)[states == 1])) ) if np.all(states == 1): return self.send_powerbar_command(nums[states == 0], 2)
# TODO: Add checks for loops closed, somehow check for signal on MEMS?