From 0ddc8f2390c6837f575b5bc7752d15e189b05b43 Mon Sep 17 00:00:00 2001 From: Lyn Date: Thu, 12 Oct 2023 14:37:47 -0700 Subject: [PATCH 1/3] feat: Add optional received_p99 timestamp to commit log The value from the received field can be used in the future for subscription scheduling if this is provided. This is better than the `orig_message_ts` field as `received` is assigned at the very start of the pipeline when Sentry receives the event (as opposed to when Snuba gets the event). Switching to this field means any delays in ingestion will be properly accounted for when determining the window on which to schedule subscriptions. This PR also: - deprecates the legacy decoder since we have fully switched over to the new format - switches orig_message_ts from datetime to float. Converting between the two in encode/decode is pointless, and it introduces the possibility of timezone issues. Simpler to just keep it a unix timestamp everywhere. --- arroyo/backends/kafka/commit.py | 41 ++++++++------------------------- arroyo/commit.py | 6 ++--- tests/backends/test_commit.py | 14 ++++------- 3 files changed, 17 insertions(+), 44 deletions(-) diff --git a/arroyo/backends/kafka/commit.py b/arroyo/backends/kafka/commit.py index b15ead84..151ea0b4 100644 --- a/arroyo/backends/kafka/commit.py +++ b/arroyo/backends/kafka/commit.py @@ -1,15 +1,10 @@ import json -from datetime import datetime from arroyo.backends.kafka import KafkaPayload from arroyo.commit import Commit from arroyo.types import Partition, Topic from arroyo.utils.codecs import Codec -# Kept in decode method for backward compatibility. Will be -# remove in a future release of Arroyo -DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" - class CommitCodec(Codec[KafkaPayload, Commit]): def encode(self, value: Commit) -> KafkaPayload: @@ -18,7 +13,8 @@ def encode(self, value: Commit) -> KafkaPayload: payload = json.dumps( { "offset": value.offset, - "orig_message_ts": datetime.timestamp(value.orig_message_ts), + "orig_message_ts": value.orig_message_ts, + "received_p99": value.received_p99, } ).encode("utf-8") @@ -30,28 +26,6 @@ def encode(self, value: Commit) -> KafkaPayload: [], ) - def decode_legacy(self, value: KafkaPayload) -> Commit: - key = value.key - if not isinstance(key, bytes): - raise TypeError("payload key must be a bytes object") - - val = value.value - if not isinstance(val, bytes): - raise TypeError("payload value must be a bytes object") - - headers = {k: v for (k, v) in value.headers} - orig_message_ts = datetime.strptime( - headers["orig_message_ts"].decode("utf-8"), DATETIME_FORMAT - ) - topic_name, partition_index, group = key.decode("utf-8").split(":", 3) - offset = int(val.decode("utf-8")) - return Commit( - group, - Partition(Topic(topic_name), int(partition_index)), - offset, - orig_message_ts, - ) - def decode(self, value: KafkaPayload) -> Commit: key = value.key if not isinstance(key, bytes): @@ -63,12 +37,14 @@ def decode(self, value: KafkaPayload) -> Commit: payload = val.decode("utf-8") - if payload.isnumeric(): - return self.decode_legacy(value) - decoded = json.loads(payload) offset = decoded["offset"] - orig_message_ts = datetime.fromtimestamp(decoded["orig_message_ts"]) + orig_message_ts = decoded["orig_message_ts"] + + if decoded.get("received_p99"): + received_ts = decoded["received_p99"] + else: + received_ts = None topic_name, partition_index, group = key.decode("utf-8").split(":", 3) @@ -77,4 +53,5 @@ def decode(self, value: KafkaPayload) -> Commit: Partition(Topic(topic_name), int(partition_index)), offset, orig_message_ts, + received_ts, ) diff --git a/arroyo/commit.py b/arroyo/commit.py index 691305ea..25b97577 100644 --- a/arroyo/commit.py +++ b/arroyo/commit.py @@ -2,7 +2,6 @@ import time from dataclasses import dataclass, field -from datetime import datetime from typing import Mapping, MutableMapping, Optional from arroyo.types import Partition @@ -61,9 +60,10 @@ def did_commit(self, now: float, offsets: Mapping[Partition, int]) -> None: @dataclass(frozen=True) class Commit: - __slots__ = ["group", "partition", "offset", "orig_message_ts"] + __slots__ = ["group", "partition", "offset", "orig_message_ts", "received_p99"] group: str partition: Partition offset: int - orig_message_ts: datetime + orig_message_ts: float + received_p99: Optional[float] diff --git a/tests/backends/test_commit.py b/tests/backends/test_commit.py index 2e1d958d..6269ff99 100644 --- a/tests/backends/test_commit.py +++ b/tests/backends/test_commit.py @@ -1,7 +1,6 @@ -from datetime import datetime +from datetime import datetime, timedelta from arroyo.backends.kafka.commit import CommitCodec -from arroyo.backends.kafka import KafkaPayload from arroyo.commit import Commit from arroyo.types import Partition, Topic @@ -12,19 +11,16 @@ def test_encode_decode() -> None: offset_to_commit = 5 + now = datetime.now() + commit = Commit( "leader-a", Partition(topic, 0), offset_to_commit, - datetime.now(), + now, + now - timedelta(seconds=5), ) encoded = commit_codec.encode(commit) assert commit_codec.decode(encoded) == commit - -def test_decode_legacy() -> None: - legacy = KafkaPayload(b"topic:0:leader-a", b"5", [('orig_message_ts', b'2023-09-26T21:58:14.191325Z')]) - decoded = CommitCodec().decode(legacy) - assert decoded.offset == 5 - assert decoded.group == "leader-a" From 4a498f13f01fb877f57b6845e2c3a44a4bb2e762 Mon Sep 17 00:00:00 2001 From: Lyn Date: Mon, 16 Oct 2023 13:31:02 -0700 Subject: [PATCH 2/3] fix test --- tests/backends/test_commit.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/backends/test_commit.py b/tests/backends/test_commit.py index 6269ff99..43888ab2 100644 --- a/tests/backends/test_commit.py +++ b/tests/backends/test_commit.py @@ -1,4 +1,4 @@ -from datetime import datetime, timedelta +import time from arroyo.backends.kafka.commit import CommitCodec from arroyo.commit import Commit @@ -11,14 +11,14 @@ def test_encode_decode() -> None: offset_to_commit = 5 - now = datetime.now() + now = time.time() commit = Commit( "leader-a", Partition(topic, 0), offset_to_commit, now, - now - timedelta(seconds=5), + now - 5, ) encoded = commit_codec.encode(commit) From 8e10c87dfd8cf599ec603a7191b07218b643f65d Mon Sep 17 00:00:00 2001 From: Lyn Date: Mon, 16 Oct 2023 13:48:35 -0700 Subject: [PATCH 3/3] update another test --- tests/backends/test_kafka.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/backends/test_kafka.py b/tests/backends/test_kafka.py index 56c99278..296766b6 100644 --- a/tests/backends/test_kafka.py +++ b/tests/backends/test_kafka.py @@ -2,9 +2,9 @@ import itertools import os import pickle +import time import uuid from contextlib import closing -from datetime import datetime from pickle import PickleBuffer from typing import Any, Iterator, Mapping, MutableSequence, Optional from unittest import mock @@ -195,7 +195,9 @@ def test_consumer_stream_processor_shutdown(self) -> None: def test_commit_codec() -> None: - commit = Commit("group", Partition(Topic("topic"), 0), 0, datetime.now()) + commit = Commit( + "group", Partition(Topic("topic"), 0), 0, time.time(), time.time() - 5 + ) assert commit_codec.decode(commit_codec.encode(commit)) == commit