Source code for bobocep.cep.engine.task

# Copyright (c) 2019-2023 r3w0p
# The following code can be redistributed and/or
# modified under the terms of the MIT License.

"""
CEP engine tasks.
"""

from abc import ABC, abstractmethod

from bobocep import BoboError


[docs]class BoboEngineTaskError(BoboError): """ An engine task error. """
[docs]class BoboEngineTask(ABC): """ An engine task. """
[docs] @abstractmethod def update(self) -> bool: """ :return: `True` if an update occurred in the task; `False` otherwise. """
[docs] @abstractmethod def size(self) -> int: """ :return: The number of events that the task is currently handling. """
[docs] @abstractmethod def close(self) -> None: """ Closes the engine task. """
[docs] @abstractmethod def is_closed(self) -> bool: """ :return: `True` if task is set to close; `False` otherwise. """