# Copyright (c) 2019-2024 r3w0p
# The following code can be redistributed and/or
# modified under the terms of the MIT License.
"""
Simple event.
"""
from json import loads, dumps
from typing import Any
from bobocep.cep.event.event import BoboEvent
[docs]
class BoboEventSimple(BoboEvent):
"""
A simple event.
"""
TYPE_SIMPLE = "type_simple"
[docs]
def __init__(self,
event_id: str,
timestamp: int,
data: Any):
"""
:param event_id: The event ID.
:param timestamp: The event timestamp.
:param data: The event data.
"""
super().__init__(
event_id=event_id,
timestamp=timestamp,
data=data)
[docs]
def cast(self, dtype: type) -> 'BoboEventSimple':
"""
:param dtype: The type to which the event's data is cast.
:return: A new BoboEventSimple instance with its data cast to `dtype`
and all other properties identical to the original event.
"""
return BoboEventSimple(
event_id=self._event_id,
timestamp=self._timestamp,
data=dtype(self._data)
)
[docs]
def to_json_dict(self) -> dict:
"""
:return: A JSON `dict` representation of the event.
"""
return {
self.EVENT_TYPE: self.TYPE_SIMPLE,
self.EVENT_ID: self.event_id,
self.TIMESTAMP: self.timestamp,
self.DATA: self.data
}
[docs]
def to_json_str(self) -> str:
"""
:return: A JSON `str` representation of the event.
"""
return dumps(self.to_json_dict(), default=lambda o: o.to_json_str())
[docs]
@staticmethod
def from_json_str(j: str) -> 'BoboEventSimple':
"""
:param j: A JSON `str` representation of the event.
:return: A new instance of the event type.
"""
return BoboEventSimple.from_json_dict(loads(j))
[docs]
@staticmethod
def from_json_dict(d: dict) -> 'BoboEventSimple':
"""
:param d: A JSON `dict` representation of the event.
:return: A new instance of the event type.
"""
return BoboEventSimple(
event_id=d[BoboEventSimple.EVENT_ID],
timestamp=d[BoboEventSimple.TIMESTAMP],
data=d[BoboEventSimple.DATA]
)
def __str__(self) -> str:
"""
:return: A JSON `str` representation of the event.
"""
return self.to_json_str()