bobocep.cep.engine.engine.BoboEngine

class bobocep.cep.engine.engine.BoboEngine(receiver: BoboReceiver, decider: BoboDecider, producer: BoboProducer, forwarder: BoboForwarder, times_receiver: int = 0, times_decider: int = 0, times_producer: int = 0, times_forwarder: int = 0, early_stop: bool = True)[source]

Bases: object

The engine for complex event processing.

__init__(receiver: BoboReceiver, decider: BoboDecider, producer: BoboProducer, forwarder: BoboForwarder, times_receiver: int = 0, times_decider: int = 0, times_producer: int = 0, times_forwarder: int = 0, early_stop: bool = True)[source]
Parameters:
  • receiver – The receiver task.

  • decider – The decider task.

  • producer – The producer task.

  • forwarder – The forwarder task.

  • times_receiver – The number of times to run the receiver until moving to the decider task. A value of 0 runs the receiver indefinitely until it no longer performs an update of its internal state.

  • times_decider – The number of times to run the decider until moving to the producer task. A value of 0 runs the decider indefinitely until it no longer performs an update of its internal state.

  • times_producer – The number of times to run the producer until moving to the forwarder task. A value of 0 runs the producer indefinitely until it no longer performs an update of its internal state.

  • times_forwarder – The number of times to run the forwarder until moving to the receiver task. A value of 0 runs the forwarder indefinitely until it no longer performs an update of its internal state.

  • early_stop – If times_* is greater than 0, it will always run the task for the set number of times, even if the task does not update. Setting early_stop to True stops early if no task update occurs.

close() None[source]

Closes the engine.

property decider: BoboDecider

Get decider task.

property forwarder: BoboForwarder

Get forwarder task.

is_closed() bool[source]
Returns:

True if engine is set to close; False otherwise.

property producer: BoboProducer

Get producer task.

property receiver: BoboReceiver

Get receiver task.

run() None[source]

Runs the engine. This is a blocking operation.

update() bool[source]

Updates the receiver, then the decider, then the producer, and finally to the forwarder. It updates each task n times, depending on how many times were chosen during engine instantiation.

Returns:

True if engine is not set to close; False otherwise.