Skip to content

Commit

Permalink
streamer: Improve timestamp logic on status reporting (#354)
Browse files Browse the repository at this point in the history
* streamer: Serialize all status timestamps as ms

* streamer: Add timestamp to all emitted events
  • Loading branch information
victorges authored Dec 12, 2024
1 parent 49c8462 commit 23a10c7
Showing 1 changed file with 14 additions and 0 deletions.
14 changes: 14 additions & 0 deletions runner/app/live/streamer/streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,19 @@ def update_params(self, params: dict):
self.last_params_hash = str(hash(str(sorted(params.items()))))
return self

def model_dump(self, **kwargs):
data = super().model_dump(**kwargs)
# Convert all fields ending with _time to milliseconds
for field, value in data.items():
if field.endswith('_time'):
data[field] = _timestamp_to_ms(value)
return data


def _timestamp_to_ms(v: float | None) -> int | None:
return int(v * 1000) if v is not None else None


class PipelineStreamer:
def __init__(self, protocol: StreamProtocol, pipeline: str, input_timeout: int, params: dict):
self.protocol = protocol
Expand Down Expand Up @@ -175,6 +188,7 @@ async def report_status_loop(self):

async def _emit_monitoring_event(self, event: dict):
"""Protected method to emit monitoring event with lock"""
event["timestamp"] = _timestamp_to_ms(time.time())
async with self.report_status_lock:
try:
await self.protocol.emit_monitoring_event(event)
Expand Down

0 comments on commit 23a10c7

Please sign in to comment.