Source code for bobocep.cep.gen.event

# Copyright (c) 2019-2023 r3w0p
# The following code can be redistributed and/or
# modified under the terms of the MIT License.

"""
Generates BoboEvent objects.
"""

from abc import ABC, abstractmethod
from threading import RLock
from types import MethodType
from typing import Optional, Callable

from bobocep.cep.event import BoboEvent, BoboEventSimple
from bobocep.cep.gen.timestamp import BoboGenTimestamp, BoboGenTimestampEpoch


[docs]class BoboGenEvent(ABC): """ An event generator. """
[docs] @abstractmethod def maybe_generate(self, event_id: str) -> Optional[BoboEvent]: """ :param event_id: The event ID to use in the generated event. :return: Either a generated BoboEvent instance or None. """
[docs]class BoboGenEventNone(BoboGenEvent): """ An event generator that always returns None. """
[docs] def maybe_generate(self, event_id: str) -> Optional[BoboEvent]: """ :param event_id: An event ID (always ignored). :return: None. """ return None
[docs]class BoboGenEventTime(BoboGenEvent): """ An event generator that returns a BoboEventSimple time event if a given amount of time has elapsed. If the time has not elapsed, None is returned instead. """
[docs] def __init__(self, millis: int, datagen: Optional[Callable] = None, gen_timestamp: Optional[BoboGenTimestamp] = None, from_now: bool = True, tz=None): """ :param millis: Milliseconds between event generations. Once the millisecond timer has elapsed, an event can be generated. :param datagen: Datagen to use in generated event. :param gen_timestamp: Custom timestamp generator. If None, a BoboGenTimestampEpoch instance is used to generate timestamps. :param from_now: If `True`, sets the millisecond timer to the current time; otherwise, the timer is set to 0. :param tz: Timezone data. """ super().__init__() self._lock: RLock = RLock() self._millis: int = millis self._datagen: Optional[Callable] = datagen self._gen_ts_internal: BoboGenTimestampEpoch = BoboGenTimestampEpoch() self._tz = tz self._last: int = self._gen_ts_internal.generate() if from_now else 0 self._gen_ts_event: BoboGenTimestamp = gen_timestamp \ if gen_timestamp is not None else self._gen_ts_internal if self._datagen is not None: # Prevent garbage collection of object if callable is a method self._obj = datagen.__self__ \ if isinstance(datagen, MethodType) else None
[docs] def maybe_generate(self, event_id: str) -> Optional[BoboEvent]: """ :param event_id: An event ID. :return: A BoboEventSimple instance if one is due to be generated, or `None` if not. """ with self._lock: now = self._gen_ts_internal.generate() if (now - self._last) > self._millis: self._last = now return BoboEventSimple( event_id=event_id, timestamp=self._gen_ts_event.generate(), data=self._datagen() if self._datagen is not None else None) else: return None