bobocep.dist.tcp.BoboDistributedTCP
- class bobocep.dist.tcp.BoboDistributedTCP(urn: str, decider: bobocep.cep.engine.decider.pubsub.BoboDeciderPublisher, devices: List[bobocep.dist.device.BoboDevice], crypto: bobocep.dist.crypto.crypto.BoboDistributedCrypto, max_size_incoming: int = 0, max_size_outgoing: int = 0, period_ping: int = 30, period_resync: int = 60, attempt_stash: int = 5, attempt_ping: int = 5, attempt_resync: int = 10, max_listen: int = 3, timeout_accept: int = 3, timeout_connect: int = 3, timeout_send: int = 3, timeout_receive: int = 3, recv_bytes: int = 2048, flag_reset: bool = True)[source]
Bases:
bobocep.dist.dist.BoboDistributed,bobocep.dist.pubsub.BoboDistributedPublisher,bobocep.cep.engine.decider.pubsub.BoboDeciderSubscriberAn implementation of distributed BoboCEP that uses TCP for data transmission across the network.
- __init__(urn: str, decider: bobocep.cep.engine.decider.pubsub.BoboDeciderPublisher, devices: List[bobocep.dist.device.BoboDevice], crypto: bobocep.dist.crypto.crypto.BoboDistributedCrypto, max_size_incoming: int = 0, max_size_outgoing: int = 0, period_ping: int = 30, period_resync: int = 60, attempt_stash: int = 5, attempt_ping: int = 5, attempt_resync: int = 10, max_listen: int = 3, timeout_accept: int = 3, timeout_connect: int = 3, timeout_send: int = 3, timeout_receive: int = 3, recv_bytes: int = 2048, flag_reset: bool = True)[source]
- Parameters
urn – A URN that is unique across devices in the network.
decider – The Decider used in the local engine.
devices – Devices in the network (including this device).
crypto – Encryption to use for message exchange.
max_size_incoming – Max queue size for incoming data. Default: 0 (unbounded).
max_size_outgoing – Max queue size for outgoing data. Default: 0 (unbounded).
period_ping – Period of inactivity from another device to warrant pinging the device, in seconds. Default: 30.
period_resync – Period of inactivity from another device to warrant resyncing with the device, in seconds. Default: 60.
attempt_stash – How frequently to attempt to send the sync stash if the stash is not empty.
attempt_ping – How frequently to ping another device if it is within the ping period, in seconds. Default: 10.
attempt_resync – How frequently to resync with another device if it is within the resync period, in seconds. Default: 10.
max_listen – Max number of incoming connections to listen for at a given time, in seconds. Default: 3.
timeout_accept – Timeout for accepting a new incoming connection, in seconds. Default: 3.
timeout_connect – Timeout for connecting to a client when sending data, in seconds. Default: 3.
timeout_send – Timeout for sending data, in seconds. Default: 3.
timeout_receive – Timeout for receiving data, in seconds. Default: 3.
recv_bytes – Number of bytes to receive at a time when receiving data. Default: 2048.
flag_reset – If True, the RESET flag is set to indicate to external devices that it should reset its data on this device, which will trigger a resync.
- on_decider_update(completed: List[bobocep.cep.engine.decider.runserial.BoboRunSerial], halted: List[bobocep.cep.engine.decider.runserial.BoboRunSerial], updated: List[bobocep.cep.engine.decider.runserial.BoboRunSerial], local: bool) None[source]
- Parameters
completed – Locally completed runs.
halted – Locally halted runs.
updated – Locally updated runs.
local – True if the Decider update occurred locally; False if the update occurred on a remote (distributed) instance.
- subscribe(subscriber: bobocep.dist.pubsub.BoboDistributedSubscriber) None[source]
- Parameters
subscriber – Subscriber to the distributed instance.