bobocep.cep.engine.receiver.receiver.BoboReceiver

class bobocep.cep.engine.receiver.receiver.BoboReceiver(validator: BoboValidator, gen_event_id: BoboGenEventID, gen_timestamp: BoboGenTimestamp, gen_event: BoboGenEvent | None = None, max_size: int = 0)[source]

Bases: BoboEngineTask, BoboReceiverPublisher, BoboProducerSubscriber, BoboForwarderSubscriber

A receiver task.

__init__(validator: BoboValidator, gen_event_id: BoboGenEventID, gen_timestamp: BoboGenTimestamp, gen_event: BoboGenEvent | None = None, max_size: int = 0)[source]
Parameters:
  • validator – Incoming data validator.

  • gen_event_id – Event ID generator.

  • gen_timestamp – Timestamp generator.

  • gen_event – Event generator (optional).

  • max_size – Maximum queue size. Default: 0 (unbounded).

add_data(data: Any) None[source]
Parameters:

data – Data to add to the receiver.

Raises:

BoboReceiverError – If receiver queue is full.

close() None[source]

Closes the Receiver.

is_closed() bool[source]
Returns:

True if Receiver is closed; False otherwise.

on_forwarder_update(event: BoboEventAction) None[source]
Parameters:

event – Action event generated by Forwarder.

on_producer_update(event: BoboEventComplex, local: bool) None[source]
Parameters:
  • event – Complex event generated by Producer.

  • localTrue if the complex event was generated using a locally-completed run; False otherwise.

size() int[source]
Returns:

Queue size.

subscribe(subscriber: BoboReceiverSubscriber) None[source]
Parameters:

subscriber – Subscriber to Receiver data.

update() bool[source]

Processes data from its queue, if any. Also processes a generated event if receiver is set to generate any. A BoboEventSimple instance is produced for data if they pass validation and are not already BoboEvent instances. Valid data are then sent to receiver subscribers.

Returns:

True if queue or generated data are processed; False otherwise.