bobocep.dist.tcp.BoboDistributedTCP

class bobocep.dist.tcp.BoboDistributedTCP(urn: str, decider: bobocep.cep.engine.decider.pubsub.BoboDeciderPublisher, devices: List[bobocep.dist.device.BoboDevice], crypto: bobocep.dist.crypto.crypto.BoboDistributedCrypto, max_size_incoming: int = 0, max_size_outgoing: int = 0, period_ping: int = 30, period_resync: int = 60, attempt_stash: int = 5, attempt_ping: int = 5, attempt_resync: int = 10, max_listen: int = 3, timeout_accept: int = 3, timeout_connect: int = 3, timeout_send: int = 3, timeout_receive: int = 3, recv_bytes: int = 2048, flag_reset: bool = True)[source]

Bases: bobocep.dist.dist.BoboDistributed, bobocep.dist.pubsub.BoboDistributedPublisher, bobocep.cep.engine.decider.pubsub.BoboDeciderSubscriber

An implementation of distributed BoboCEP that uses TCP for data transmission across the network.

__init__(urn: str, decider: bobocep.cep.engine.decider.pubsub.BoboDeciderPublisher, devices: List[bobocep.dist.device.BoboDevice], crypto: bobocep.dist.crypto.crypto.BoboDistributedCrypto, max_size_incoming: int = 0, max_size_outgoing: int = 0, period_ping: int = 30, period_resync: int = 60, attempt_stash: int = 5, attempt_ping: int = 5, attempt_resync: int = 10, max_listen: int = 3, timeout_accept: int = 3, timeout_connect: int = 3, timeout_send: int = 3, timeout_receive: int = 3, recv_bytes: int = 2048, flag_reset: bool = True)[source]
Parameters
  • urn – A URN that is unique across devices in the network.

  • decider – The Decider used in the local engine.

  • devices – Devices in the network (including this device).

  • crypto – Encryption to use for message exchange.

  • max_size_incoming – Max queue size for incoming data. Default: 0 (unbounded).

  • max_size_outgoing – Max queue size for outgoing data. Default: 0 (unbounded).

  • period_ping – Period of inactivity from another device to warrant pinging the device, in seconds. Default: 30.

  • period_resync – Period of inactivity from another device to warrant resyncing with the device, in seconds. Default: 60.

  • attempt_stash – How frequently to attempt to send the sync stash if the stash is not empty.

  • attempt_ping – How frequently to ping another device if it is within the ping period, in seconds. Default: 10.

  • attempt_resync – How frequently to resync with another device if it is within the resync period, in seconds. Default: 10.

  • max_listen – Max number of incoming connections to listen for at a given time, in seconds. Default: 3.

  • timeout_accept – Timeout for accepting a new incoming connection, in seconds. Default: 3.

  • timeout_connect – Timeout for connecting to a client when sending data, in seconds. Default: 3.

  • timeout_send – Timeout for sending data, in seconds. Default: 3.

  • timeout_receive – Timeout for receiving data, in seconds. Default: 3.

  • recv_bytes – Number of bytes to receive at a time when receiving data. Default: 2048.

  • flag_reset – If True, the RESET flag is set to indicate to external devices that it should reset its data on this device, which will trigger a resync.

close() None[source]

Closes the distributed instance.

is_closed() bool[source]
Returns

True if distributed is closed; False otherwise.

join() None[source]

Joins with the incoming and outgoing threads.

on_decider_update(completed: List[bobocep.cep.engine.decider.runserial.BoboRunSerial], halted: List[bobocep.cep.engine.decider.runserial.BoboRunSerial], updated: List[bobocep.cep.engine.decider.runserial.BoboRunSerial], local: bool) None[source]
Parameters
  • completed – Locally completed runs.

  • halted – Locally halted runs.

  • updated – Locally updated runs.

  • localTrue if the Decider update occurred locally; False if the update occurred on a remote (distributed) instance.

run() None[source]

Runs the distributed instance.

size_incoming() int[source]
Returns

Size of incoming queue.

size_outgoing() int[source]
Returns

Size of outgoing queue

subscribe(subscriber: bobocep.dist.pubsub.BoboDistributedSubscriber) None[source]
Parameters

subscriber – Subscriber to the distributed instance.