# Copyright (c) 2019-2023 r3w0p
# The following code can be redistributed and/or
# modified under the terms of the MIT License.
"""
Engine task that forwards actions to external sources and generates
action events.
"""
from queue import Queue
from threading import RLock
from typing import Dict, List, Optional
from bobocep.cep.action.handler import BoboActionHandler, BoboHandlerResponse
from bobocep.cep.engine.forwarder.pubsub import BoboForwarderPublisher, \
BoboForwarderSubscriber
from bobocep.cep.engine.producer.pubsub import BoboProducerSubscriber
from bobocep.cep.engine.task import BoboEngineTaskError, BoboEngineTask
from bobocep.cep.event import BoboEventAction, BoboEventComplex
from bobocep.cep.gen import BoboGenTimestamp
from bobocep.cep.gen.event_id import BoboGenEventID
from bobocep.cep.phenom.phenom import BoboPhenomenon
_EXC_PHENOM_NAME_DUP = "duplicate name in phenomena: {}"
_EXC_QUEUE_FULL = "queue is full (max size: {})"
[docs]class BoboForwarderError(BoboEngineTaskError):
"""
A forwarder task error.
"""
[docs]class BoboForwarder(BoboEngineTask,
BoboForwarderPublisher,
BoboProducerSubscriber):
"""
A forwarder task.
"""
[docs] def __init__(self,
phenomena: List[BoboPhenomenon],
handler: BoboActionHandler,
gen_event_id: BoboGenEventID,
gen_timestamp: BoboGenTimestamp,
local_only: bool = True,
max_size: int = 0):
"""
:param phenomena: List of phenomena.
:param handler: Action handler.
:param gen_event_id: Event ID generator.
:param gen_timestamp: Timestamp generator.
:param local_only: If `True`, forwarder only executes actions for
locally-completed complex events i.e. not complex events that
were completed on a distributed instance. **Note**: if `False`,
the action may be executed more than once: once by the instance
that generated the remote event, and another by this instance.
:param max_size: Maximum queue size.
Default: 0 (unbounded).
"""
super().__init__()
self._lock: RLock = RLock()
self._closed: bool = False
self._subscribers: List[BoboForwarderSubscriber] = []
self._phenomena: Dict[str, BoboPhenomenon] = {}
for phenom in phenomena:
if phenom.name not in self._phenomena:
self._phenomena[phenom.name] = phenom
else:
raise BoboForwarderError(
_EXC_PHENOM_NAME_DUP.format(phenom.name))
self._handler: BoboActionHandler = handler
self._gen_event_id: BoboGenEventID = gen_event_id
self._gen_timestamp: BoboGenTimestamp = gen_timestamp
self._max_size: int = max(0, max_size)
self._local_only: bool = local_only
self._queue: Queue[BoboEventComplex] = Queue(self._max_size)
[docs] def subscribe(self, subscriber: BoboForwarderSubscriber):
"""
:param subscriber: Subscriber to Forwarder data.
"""
with self._lock:
if subscriber not in self._subscribers:
self._subscribers.append(subscriber)
[docs] def update(self) -> bool:
"""
:return: `True` if an internal update occurred; `False` otherwise.
"""
with self._lock:
if self._closed:
return False
handle = self._update_handler()
response = self._update_responses()
return handle or response
[docs] def close(self) -> None:
"""
Closes the Forwarder.
"""
with self._lock:
self._closed = True
[docs] def is_closed(self) -> bool:
"""
:return: `True` if Forwarder is closed; `False` otherwise.
"""
with self._lock:
return self._closed
def _update_handler(self) -> bool:
"""
:return: `True` if update occurred; `False` otherwise.
"""
if not self._queue.empty():
event: BoboEventComplex = self._queue.get_nowait()
if event.phenomenon_name in self._phenomena:
phenom: BoboPhenomenon = \
self._phenomena[event.phenomenon_name]
if phenom.action is not None:
self._handler.handle(
action=phenom.action,
event=event)
return True
return False
def _update_responses(self) -> bool:
"""
:return: `True` if subscribers were notified of an action event
from the action handler; `False` otherwise.
"""
hres: Optional[BoboHandlerResponse] = \
self._handler.get_handler_response()
if hres is not None:
# Generate action event from handler response
event = BoboEventAction(
event_id=self._gen_event_id.generate(),
timestamp=self._gen_timestamp.generate(),
data=hres.data,
phenomenon_name=hres.complex_event.phenomenon_name,
pattern_name=hres.complex_event.pattern_name,
action_name=hres.action_name,
success=hres.success)
for subscriber in self._subscribers:
subscriber.on_forwarder_update(event)
return True
return False
[docs] def on_producer_update(
self,
event: BoboEventComplex,
local: bool
) -> None:
"""
:param event: Complex event generated by Producer.
:param local: `True` if the complex event was generated using
a locally-completed run; `False` otherwise.
"""
with self._lock:
if self._closed:
return
# Forwarder set to only execute actions for locally-generated
# complex events.
if (not local) and self._local_only:
return
if not self._queue.full():
self._queue.put(event)
else:
raise BoboForwarderError(
_EXC_QUEUE_FULL.format(self._max_size))
[docs] def size(self) -> int:
"""
:return: Queue size.
"""
with self._lock:
return self._queue.qsize()