Source code for bobocep.cep.engine.producer.producer

# Copyright (c) 2019-2023 r3w0p
# The following code can be redistributed and/or
# modified under the terms of the MIT License.

"""
Engine task that generates complex events and triggers actions.
"""

from queue import Queue
from threading import RLock
from typing import List, Dict, Tuple

from bobocep.cep.engine.decider.pubsub import BoboDeciderSubscriber
from bobocep.cep.engine.decider.runserial import BoboRunSerial
from bobocep.cep.engine.producer.pubsub import BoboProducerPublisher, \
    BoboProducerSubscriber
from bobocep.cep.engine.task import BoboEngineTaskError, BoboEngineTask
from bobocep.cep.event import BoboEventComplex
from bobocep.cep.gen.event_id import BoboGenEventID
from bobocep.cep.gen.timestamp import BoboGenTimestamp
from bobocep.cep.phenom.phenom import BoboPhenomenon

_EXC_PHENOM_NAME_DUP = "duplicate name in phenomena: {}"
_EXC_QUEUE_FULL = "queue is full (max size: {})"


[docs]class BoboProducerError(BoboEngineTaskError): """ A producer task error. """
[docs]class BoboProducer(BoboEngineTask, BoboProducerPublisher, BoboDeciderSubscriber): """ A producer task. """
[docs] def __init__(self, phenomena: List[BoboPhenomenon], gen_event_id: BoboGenEventID, gen_timestamp: BoboGenTimestamp, max_size: int = 0): """ :param phenomena: List of phenomena. :param gen_event_id: Event ID generator. :param gen_timestamp: Timestamp generator. :param max_size: Maximum queue size. Default: 0 (unbounded). """ super().__init__() self._lock: RLock = RLock() self._closed: bool = False self._subscribers: List[BoboProducerSubscriber] = [] self._phenomena: Dict[str, BoboPhenomenon] = {} for phenom in phenomena: if phenom.name not in self._phenomena: self._phenomena[phenom.name] = phenom else: raise BoboProducerError( _EXC_PHENOM_NAME_DUP.format(phenom.name)) self._gen_event_id: BoboGenEventID = gen_event_id self._gen_timestamp: BoboGenTimestamp = gen_timestamp self._max_size: int = max(0, max_size) self._queue: Queue[Tuple[BoboRunSerial, bool]] = Queue(self._max_size)
[docs] def subscribe(self, subscriber: BoboProducerSubscriber): """ :param subscriber: Subscriber to Producer data. """ with self._lock: if subscriber not in self._subscribers: self._subscribers.append(subscriber)
[docs] def update(self) -> bool: """ :return: `True` if an internal update occurred; `False` otherwise. """ with self._lock: if self._closed: return False if not self._queue.empty(): event, local = self._queue.get_nowait() self._handle_completed_run(event, local) return True return False
[docs] def close(self) -> None: """ Closes the Producer. """ with self._lock: self._closed = True
[docs] def is_closed(self) -> bool: """ :return: `True` if Producer is closed; `False` otherwise. """ with self._lock: return self._closed
def _handle_completed_run( self, runserial: BoboRunSerial, local: bool ) -> None: """ :param runserial: The run to handle. :param local: `True` if the run was completed locally; `False` if the run was completed on a remote (distributed) instance. """ if runserial.phenomenon_name not in self._phenomena: raise BoboProducerError(runserial.phenomenon_name) phenom: BoboPhenomenon = self._phenomena[runserial.phenomenon_name] event_complex = BoboEventComplex( event_id=self._gen_event_id.generate(), timestamp=self._gen_timestamp.generate(), data=phenom.datagen(phenom, runserial.history) if phenom.datagen is not None else None, phenomenon_name=runserial.phenomenon_name, pattern_name=runserial.pattern_name, history=runserial.history) for subscriber in self._subscribers: subscriber.on_producer_update(event=event_complex, local=local)
[docs] def on_decider_update( self, completed: List[BoboRunSerial], halted: List[BoboRunSerial], updated: List[BoboRunSerial], local: bool ) -> None: """ :param completed: Completed runs. :param halted: Halted runs. :param updated: Updated runs. :param local: `True` if the Decider update occurred locally; `False` if the update occurred on a remote (distributed) instance. """ with self._lock: if self._closed: return for run in completed: if not self._queue.full(): self._queue.put((run, local)) else: raise BoboProducerError( _EXC_QUEUE_FULL.format(self._max_size))
[docs] def size(self) -> int: """ :return: Queue size. """ with self._lock: return self._queue.qsize()