bobocep.cep.engine.engine.BoboEngine

class bobocep.cep.engine.engine.BoboEngine(receiver: bobocep.cep.engine.receiver.receiver.BoboReceiver, decider: bobocep.cep.engine.decider.decider.BoboDecider, producer: bobocep.cep.engine.producer.producer.BoboProducer, forwarder: bobocep.cep.engine.forwarder.forwarder.BoboForwarder, times_receiver: int = 0, times_decider: int = 0, times_producer: int = 0, times_forwarder: int = 0, early_stop: bool = True)[source]

Bases: object

The engine for complex event processing.

__init__(receiver: bobocep.cep.engine.receiver.receiver.BoboReceiver, decider: bobocep.cep.engine.decider.decider.BoboDecider, producer: bobocep.cep.engine.producer.producer.BoboProducer, forwarder: bobocep.cep.engine.forwarder.forwarder.BoboForwarder, times_receiver: int = 0, times_decider: int = 0, times_producer: int = 0, times_forwarder: int = 0, early_stop: bool = True)[source]
Parameters
  • receiver – The receiver task.

  • decider – The decider task.

  • producer – The producer task.

  • forwarder – The forwarder task.

  • times_receiver – The number of times to run the receiver until moving to the decider task. A value of 0 runs the receiver indefinitely until it no longer performs an update of its internal state.

  • times_decider – The number of times to run the decider until moving to the producer task. A value of 0 runs the decider indefinitely until it no longer performs an update of its internal state.

  • times_producer – The number of times to run the producer until moving to the forwarder task. A value of 0 runs the producer indefinitely until it no longer performs an update of its internal state.

  • times_forwarder – The number of times to run the forwarder until moving to the receiver task. A value of 0 runs the forwarder indefinitely until it no longer performs an update of its internal state.

  • early_stop – If times_* is greater than 0, it will always run the task for the set number of times, even if the task does not update. Setting early_stop to True stops early if no task update occurs.

close() None[source]

Closes the engine.

property decider: bobocep.cep.engine.decider.decider.BoboDecider

Get decider task.

property forwarder: bobocep.cep.engine.forwarder.forwarder.BoboForwarder

Get forwarder task.

is_closed() bool[source]
Returns

True if engine is set to close; False otherwise.

property producer: bobocep.cep.engine.producer.producer.BoboProducer

Get producer task.

property receiver: bobocep.cep.engine.receiver.receiver.BoboReceiver

Get receiver task.

run() None[source]

Runs the engine. This is a blocking operation.

update() bool[source]

Updates the receiver, then the decider, then the producer, and finally to the forwarder. It updates each task n times, depending on how many times were chosen during engine instantiation.

Returns

True if engine is not set to close; False otherwise.