Actions

On the completion of a pattern’s run, the Producer is notified and produces a complex event in response, which represents the detection of the pattern’s phenomenon.

The Producer then notifies the Forwarder that the phenomenon’s action should be executed in response.

def execute(self, event: BoboEventComplex) -> Tuple[bool, Any]:
    ...

On action execution, the execute function is provided with a copy of the complex event, and is expected to return two things:

  1. Whether the action was successful or not: True or False, accordingly.

  2. Any additional data, or None. Note that, if using Distributed BoboCEP, the data type should be JSONable. See Distributed for more information.

Handlers

In BoboCEP, Forwarder contains an action handler which is responsible for executing actions and passing the action’s response back. The Forwarder then generates an action event which is sent to Receiver.

Note

In Distributed BoboCEP, only the instance that first completes a run will be the instance that handles the action.

The default action handlers provided by BoboCEP are as follows.

Blocking

The blocking handler blocks the thread on which BoboCEP is running on while it executes its actions. This is useful when BoboCEP is required to execute actions deterministically, one action at a time, in the order that they are sent to Forwarder.

from bobocep.cep.action import BoboActionHandlerBlocking

handler = BoboActionHandlerBlocking()

Multithreading

The multithreading handler uses n threads to execute actions concurrently.

from bobocep.cep.action import BoboActionHandlerMultithreading

handler = BoboActionHandlerMultithreading(threads=5)

Multiprocessing

The multiprocessing handler utilises multicore processing by specifying uses n processes on which to execute actions simultaneously.

from bobocep.cep.action import BoboActionHandlerMultiprocessing
from multiprocessing import cpu_count

# Processes equal to one less than the maximum system CPUs available.
handler = BoboActionHandlerMultiprocessing(processes=max(1, cpu_count() - 1))