Source code for bobocep.dist.devman

# Copyright (c) 2019-2024 r3w0p
# The following code can be redistributed and/or
# modified under the terms of the MIT License.

"""
Device manager.
"""

from threading import RLock
from typing import List, Tuple

from bobocep.cep.engine.decider.runserial import BoboRunSerial
from bobocep.dist.device import BoboDevice


[docs] class BoboDeviceManager: """ Manages information about a BoboCEP device on the network. """
[docs] def __init__(self, device: BoboDevice, flag_reset: bool): """ :param device: The device to manage. :param flag_reset: If `True`, it indicates to the BoboDistributed instance that the next message to the device should set a flag to reset its data on the sending device; 'False' indicates that no flag should be set. """ super().__init__() self._lock: RLock = RLock() self._device: BoboDevice = device self._flag_reset: bool = flag_reset self._last_comms: int = 0 self._last_attempt: int = 0 self._stash_completed: List[BoboRunSerial] = [] self._stash_halted: List[BoboRunSerial] = [] self._stash_updated: List[BoboRunSerial] = []
@property def addr(self) -> str: """ :return: Device address. """ with self._lock: return self._device.addr @addr.setter def addr(self, addr: str) -> None: """ Set device address. :param addr: The new address. """ with self._lock: self._device.addr = addr @property def port(self) -> int: """ :return: Device port. """ return self._device.port @property def urn(self) -> str: """ :return: Device URN. """ return self._device.urn @property def id_key(self) -> str: """ :return: Device ID key. """ return self._device.id_key @property def flag_reset(self) -> bool: """ :return: `True` if reset flag should be set; `False` otherwise. """ with self._lock: return self._flag_reset @flag_reset.setter def flag_reset(self, flag_reset: bool) -> None: """ :param flag_reset: `True` to set flag for resetting the device. """ with self._lock: self._flag_reset = flag_reset @property def last_comms(self) -> int: """ :return: Time of last communication with device. """ with self._lock: return self._last_comms @last_comms.setter def last_comms(self, last_comms: int) -> None: """ :param last_comms: The last communication with the device. """ with self._lock: self._last_comms = max(0, last_comms) @property def last_attempt(self) -> int: """ :return: Last attempted communication with the device. """ with self._lock: return self._last_attempt @last_attempt.setter def last_attempt(self, last_attempt: int) -> None: """ :return: Time of last communication attempt with device. """ with self._lock: self._last_attempt = max(0, last_attempt)
[docs] def clear_last(self) -> None: """ Clears both the last communication and last attempted communication time by setting them both to 0. """ with self._lock: self._last_comms = 0 self._last_attempt = 0
[docs] def stash(self) -> Tuple[ List[BoboRunSerial], List[BoboRunSerial], List[BoboRunSerial] ]: """ :return: The device's stash. """ with self._lock: return self._stash_completed, \ self._stash_halted, \ self._stash_updated
[docs] def append_stash(self, completed: List[BoboRunSerial], halted: List[BoboRunSerial], updated: List[BoboRunSerial]) -> None: """ Append runs to stash. :param completed: Completed runs to append. :param halted: Halted runs to append. :param updated: Updated runs to append. """ with self._lock: self._stash_completed.extend(completed) self._stash_halted.extend(halted) self._stash_updated.extend(updated)
[docs] def size_stash(self) -> int: """ :return: The stash size, equal to all completed, halted, and updated runs in the stash. """ with self._lock: return len(self._stash_completed) + \ len(self._stash_halted) + \ len(self._stash_updated)
[docs] def clear_stash(self) -> None: """ Removes all items in the stash. """ with self._lock: self._stash_completed = [] self._stash_halted = [] self._stash_updated = []