Source code for bobocep.cep.event.factory

# Copyright (c) 2019-2024 r3w0p
# The following code can be redistributed and/or
# modified under the terms of the MIT License.

"""
Event factory.
"""

from json import loads

from bobocep.cep.event.event import BoboEventError, BoboEvent


[docs] class BoboEventFactoryError(BoboEventError): """ An event factory error. """
[docs] class BoboEventFactory: """ A BoboEvent factory that generates instances from JSON representations of events. """
[docs] @staticmethod def from_json_str(j: str) -> BoboEvent: """ :param j: A JSON `str` representation of the event. :return: A new instance of the event type. :raises BoboEventFactoryError: If `EVENT_TYPE` key is missing from JSON. :raises BoboEventFactoryError: If `EVENT_TYPE` value is an unknown event type. """ from bobocep.cep.event.action import BoboEventAction from bobocep.cep.event.complex import BoboEventComplex from bobocep.cep.event.simple import BoboEventSimple d: dict = loads(j) if BoboEvent.EVENT_TYPE not in d: raise BoboEventFactoryError( "Missing key '{}'.".format(BoboEvent.EVENT_TYPE)) if d[BoboEvent.EVENT_TYPE] == BoboEventSimple.TYPE_SIMPLE: return BoboEventSimple.from_json_dict(d) if d[BoboEvent.EVENT_TYPE] == BoboEventComplex.TYPE_COMPLEX: return BoboEventComplex.from_json_dict(d) if d[BoboEvent.EVENT_TYPE] == BoboEventAction.TYPE_ACTION: return BoboEventAction.from_json_dict(d) raise BoboEventFactoryError( "Unknown event type '{}'.".format(d[BoboEvent.EVENT_TYPE]))