From 5d4714057f0dd088025af55803386549f8eb099d Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Wed, 11 Dec 2024 18:34:41 -0300 Subject: [PATCH 1/2] streamer: Serialize all status timestamps as ms --- runner/app/live/streamer/streamer.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/runner/app/live/streamer/streamer.py b/runner/app/live/streamer/streamer.py index ed7f045f..5b6dc836 100644 --- a/runner/app/live/streamer/streamer.py +++ b/runner/app/live/streamer/streamer.py @@ -43,6 +43,19 @@ def update_params(self, params: dict): self.last_params_hash = str(hash(str(sorted(params.items())))) return self + def model_dump(self, **kwargs): + data = super().model_dump(**kwargs) + # Convert all fields ending with _time to milliseconds + for field, value in data.items(): + if field.endswith('_time'): + data[field] = _timestamp_to_ms(value) + return data + + +def _timestamp_to_ms(v: float | None) -> int | None: + return int(v * 1000) if v is not None else None + + class PipelineStreamer: def __init__(self, protocol: StreamProtocol, pipeline: str, input_timeout: int, params: dict): self.protocol = protocol From 83b47fa5e77a58b25acf5df919051714905c64f5 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Wed, 11 Dec 2024 18:40:30 -0300 Subject: [PATCH 2/2] streamer: Add timestamp to all emitted events --- runner/app/live/streamer/streamer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/runner/app/live/streamer/streamer.py b/runner/app/live/streamer/streamer.py index 5b6dc836..78be4cc7 100644 --- a/runner/app/live/streamer/streamer.py +++ b/runner/app/live/streamer/streamer.py @@ -188,6 +188,7 @@ async def report_status_loop(self): async def _emit_monitoring_event(self, event: dict): """Protected method to emit monitoring event with lock""" + event["timestamp"] = _timestamp_to_ms(time.time()) async with self.report_status_lock: try: await self.protocol.emit_monitoring_event(event)