Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dlq): Added produce policy #57

Merged
merged 32 commits into from
Apr 29, 2022
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
c0eff84
Added produce policy
rahul-kumar-saini Apr 6, 2022
1586318
timestamp generated automatically, topic optional
rahul-kumar-saini Apr 7, 2022
c31e7ee
Merge main
rahul-kumar-saini Apr 8, 2022
b80101a
updated produce policy
rahul-kumar-saini Apr 8, 2022
d3bdcf2
Updated count policy to inherit from produce policy
rahul-kumar-saini Apr 8, 2022
3ab4a8a
Merge branch 'main' into feat/produce-policy
rahul-kumar-saini Apr 8, 2022
fa0816d
Fixed count policy
rahul-kumar-saini Apr 8, 2022
5384a62
Produce each invalid message
rahul-kumar-saini Apr 14, 2022
3014ecc
Added reason to invalid messages exception
rahul-kumar-saini Apr 14, 2022
f17f1a3
Merge branch 'main' into feat/produce-policy
rahul-kumar-saini Apr 14, 2022
9a87416
Produce Policy no longer a base class
rahul-kumar-saini Apr 15, 2022
f41f845
Added join method for async tasks
rahul-kumar-saini Apr 15, 2022
808ef0a
Cleanup produce policy
rahul-kumar-saini Apr 15, 2022
1ded612
Prune done futures
rahul-kumar-saini Apr 18, 2022
f6b2a99
Type of message thrown must be str/bytes
rahul-kumar-saini Apr 18, 2022
3613508
Fix tests
rahul-kumar-saini Apr 18, 2022
7de42b9
Introduced InvalidMessage dataclass
rahul-kumar-saini Apr 20, 2022
0733829
Tested Produce policy
rahul-kumar-saini Apr 21, 2022
aa1b713
Tested content of message produced
rahul-kumar-saini Apr 21, 2022
e1d03b6
Fixed type
rahul-kumar-saini Apr 21, 2022
36bf466
Optionals in InvalidMessage and removed message storage getter
rahul-kumar-saini Apr 22, 2022
fe7ded0
Consumer from broker
rahul-kumar-saini Apr 22, 2022
de64e43
original_topic replaced with consumer_group
rahul-kumar-saini Apr 25, 2022
83ea2f1
Base 64 prefix
rahul-kumar-saini Apr 26, 2022
ec0e453
fix datetime encoding, add test for b64 payload
rahul-kumar-saini Apr 27, 2022
154b61a
timeout produce and prune all done futures
rahul-kumar-saini Apr 27, 2022
6e8ebd9
Merge InvalidMessage changes
rahul-kumar-saini Apr 28, 2022
42d0035
Typing
rahul-kumar-saini Apr 28, 2022
3bbe615
Refactored JSONMessageEncoder to policies/abstract.py
rahul-kumar-saini Apr 29, 2022
b79a369
Removed raise TypeError
rahul-kumar-saini Apr 29, 2022
ac71e52
Revert "Removed raise TypeError"
rahul-kumar-saini Apr 29, 2022
e0424b8
__init__
rahul-kumar-saini Apr 29, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions arroyo/processing/strategies/dead_letter_queue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from .policies.abstract import DeadLetterQueuePolicy, InvalidMessages
from .policies.count import CountInvalidMessagePolicy
from .policies.ignore import IgnoreInvalidMessagePolicy
from .policies.produce import ProduceInvalidMessagePolicy
from .policies.raise_e import RaiseInvalidMessagePolicy

__all__ = [
Expand All @@ -11,4 +12,5 @@
"CountInvalidMessagePolicy",
"IgnoreInvalidMessagePolicy",
"RaiseInvalidMessagePolicy",
"ProduceInvalidMessagePolicy",
]
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,6 @@ def terminate(self) -> None:
self.__next_step.terminate()

def join(self, timeout: Optional[float] = None) -> None:
self.__policy.join(timeout)
self.__next_step.close()
self.__next_step.join(timeout)
Original file line number Diff line number Diff line change
@@ -1,18 +1,37 @@
from abc import ABC, abstractmethod
from typing import Any, Sequence
from datetime import datetime
from typing import Optional, Sequence, Union

Serializable = Union[str, bytes]


class InvalidMessages(Exception):
"""
An exception to be thrown to pass bad messages to the DLQ
so they are handled correctly.

If the original topic the message(s) were produced to is known,
that should be passed along via this exception.
"""

def __init__(self, messages: Sequence[Any]):
def __init__(
self,
messages: Sequence[Serializable],
rahul-kumar-saini marked this conversation as resolved.
Show resolved Hide resolved
reason: Optional[str] = None,
rahul-kumar-saini marked this conversation as resolved.
Show resolved Hide resolved
original_topic: Optional[str] = None,
):
self.messages = messages
self.reason = reason or "unknown"
self.topic = original_topic or "unknown"
self.timestamp = str(datetime.now())
rahul-kumar-saini marked this conversation as resolved.
Show resolved Hide resolved

def __str__(self) -> str:
return f"Invalid Message(s): {self.messages}"
return (
f"Invalid Message originally produced to: {self.topic}\n"
f"Reason: {self.reason}\n"
f"Exception thrown at: {self.timestamp}\n"
f"Message(s): {self.messages}"
)


class DeadLetterQueuePolicy(ABC):
Expand All @@ -25,4 +44,11 @@ def handle_invalid_messages(self, e: InvalidMessages) -> None:
"""
Decide what to do with invalid messages.
"""
pass
raise NotImplementedError()

@abstractmethod
def join(self, timeout: Optional[float]) -> None:
"""
Cleanup any asynchronous tasks that may be running.
"""
raise NotImplementedError()
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
DeadLetterQueuePolicy,
InvalidMessages,
)
from arroyo.utils.metrics import get_metrics


class _Bucket(NamedTuple):
Expand All @@ -20,9 +19,14 @@ class _Bucket(NamedTuple):

class CountInvalidMessagePolicy(DeadLetterQueuePolicy):
"""
Ignore invalid messages up to a certain limit per time unit window in seconds.
This window is 1 minute by default. The exception associated with the invalid
messages is raised for all incoming invalid messages which go past this limit.
Does not raise invalid messages up to a certain limit per time unit window
in seconds. This window is 1 minute by default. The exception associated with
the invalid messages is raised for all incoming invalid messages which go past
this limit.

A `next_policy` (a DLQ Policy) must be passed to handle any exception which
remains within the per second limit. This gives full control over what happens
to exceptions within the limit.

A saved state in the form `[(<timestamp: int>, <hits: int>), ...]` can be passed
on init to load a previously saved state. This state should be aggregated to
Expand All @@ -31,13 +35,14 @@ class CountInvalidMessagePolicy(DeadLetterQueuePolicy):

def __init__(
self,
next_policy: DeadLetterQueuePolicy,
limit: int,
seconds: int = 60,
load_state: Optional[Sequence[Tuple[int, int]]] = None,
) -> None:
self.__limit = limit
self.__seconds = seconds
self.__metrics = get_metrics()
self.__next_policy = next_policy
if load_state is None:
load_state = []
self.__hits = deque(
Expand All @@ -49,7 +54,7 @@ def handle_invalid_messages(self, e: InvalidMessages) -> None:
self._add(e)
if self._count() > self.__limit:
raise e
self.__metrics.increment("dlq.dropped_messages", len(e.messages))
self.__next_policy.handle_invalid_messages(e)

def _add(self, e: InvalidMessages) -> None:
now = int(time())
Expand All @@ -63,3 +68,6 @@ def _add(self, e: InvalidMessages) -> None:
def _count(self) -> int:
start = int(time()) - self.__seconds
return sum(bucket.hits for bucket in self.__hits if bucket.timestamp >= start)

def join(self, timeout: Optional[float]) -> None:
self.__next_policy.join(timeout)
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

from arroyo.processing.strategies.dead_letter_queue.policies.abstract import (
DeadLetterQueuePolicy,
InvalidMessages,
Expand All @@ -11,3 +13,6 @@ def __init__(self) -> None:

def handle_invalid_messages(self, e: InvalidMessages) -> None:
self.__metrics.increment("dlq.dropped_messages", len(e.messages))

def join(self, timeout: Optional[float]) -> None:
return
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import base64
import json
import time
from collections import deque
from concurrent.futures import Future
from typing import Deque, Optional

from arroyo.backends.kafka.consumer import KafkaPayload, KafkaProducer
from arroyo.processing.strategies.dead_letter_queue.policies.abstract import (
DeadLetterQueuePolicy,
InvalidMessages,
Serializable,
)
from arroyo.types import Message, Topic
from arroyo.utils.metrics import get_metrics

MAX_QUEUE_SIZE = 5000


class ProduceInvalidMessagePolicy(DeadLetterQueuePolicy):
rahul-kumar-saini marked this conversation as resolved.
Show resolved Hide resolved
"""
Produces given InvalidMessages to a dead letter topic.
"""

def __init__(self, producer: KafkaProducer, dead_letter_topic: Topic) -> None:
rahul-kumar-saini marked this conversation as resolved.
Show resolved Hide resolved
self.__metrics = get_metrics()
self.__dead_letter_topic = dead_letter_topic
self.__producer = producer
self.__futures: Deque[Future[Message[KafkaPayload]]] = deque()

def handle_invalid_messages(self, e: InvalidMessages) -> None:
"""
Produces a message to the given dead letter topic for each
invalid message in the form:

{
"topic": <original topic the bad message was produced to>,
"reason": <why the message(s) are bad>
"timestamp": <time at which exception was thrown>,
"message": <original bad message>
}
"""
for message in e.messages:
payload = self._build_payload(e, message)
self._produce(payload)
self.__metrics.increment("dlq.produced_messages", len(e.messages))

def _build_payload(self, e: InvalidMessages, message: Serializable) -> KafkaPayload:
if isinstance(message, bytes):
message = base64.b64encode(message).decode("utf-8")
rahul-kumar-saini marked this conversation as resolved.
Show resolved Hide resolved
data = json.dumps(
{
"topic": e.topic,
"reason": e.reason,
"timestamp": e.timestamp,
"message": message,
}
rahul-kumar-saini marked this conversation as resolved.
Show resolved Hide resolved
).encode("utf-8")
return KafkaPayload(key=None, value=data, headers=[])

def _produce(self, payload: KafkaPayload) -> None:
"""
Prune done futures from queue if filled, then asynchronously
produce the message, adding the process (future) to the queue.
"""
self._prune_done_futures()
self.__futures.append(
self.__producer.produce(
destination=self.__dead_letter_topic, payload=payload
)
rahul-kumar-saini marked this conversation as resolved.
Show resolved Hide resolved
)

def _prune_done_futures(self) -> None:
"""
Filter futures deque, should iterate only once except
for rare edge case all processes are still running.
"""
while len(self.__futures) >= MAX_QUEUE_SIZE:
self.__futures = deque(
[future for future in self.__futures if not future.done()]
)

rahul-kumar-saini marked this conversation as resolved.
Show resolved Hide resolved
def join(self, timeout: Optional[float] = None) -> None:
start = time.perf_counter()
while self.__futures:
if self.__futures[0].done():
self.__futures.popleft()
if timeout is not None and time.perf_counter() - start > timeout:
break
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

from arroyo.processing.strategies.dead_letter_queue.policies.abstract import (
DeadLetterQueuePolicy,
InvalidMessages,
Expand All @@ -7,3 +9,6 @@
class RaiseInvalidMessagePolicy(DeadLetterQueuePolicy):
def handle_invalid_messages(self, e: InvalidMessages) -> None:
raise e

def join(self, timeout: Optional[float]) -> None:
return
43 changes: 28 additions & 15 deletions tests/processing/strategies/test_dead_letter_queue.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import time
from datetime import datetime
from typing import MutableSequence, Optional, Tuple
Expand Down Expand Up @@ -30,16 +31,7 @@ class FakeProcessingStep(ProcessingStrategy[KafkaPayload]):
"""

def poll(self) -> None:
raise InvalidMessages(
[
Message(
Partition(Topic(""), 0),
0,
KafkaPayload(None, b"", []),
datetime.now(),
)
]
)
raise InvalidMessages(["a bad message"])

def join(self, timeout: Optional[float] = None) -> None:
pass
Expand All @@ -55,7 +47,19 @@ def submit(self, message: Message[KafkaPayload]) -> None:
Valid message is one with a key.
"""
if message.payload.key is None:
raise InvalidMessages([message])
raise InvalidMessages(
[
json.dumps(
{
"payload": str(message.payload),
"partition": message.partition.index,
"topic": message.partition.topic.name,
"offset": message.offset,
"timestamp": str(message.timestamp),
}
).encode("utf-8")
]
)


class FakeBatchingProcessingStep(FakeProcessingStep):
Expand All @@ -76,7 +80,9 @@ def _submit_multiple(self) -> None:
Valid message is one with a key.
"""
bad_messages = [
message for message in self._batch if message.payload.key is None
json.dumps(str(message.payload))
for message in self._batch
if message.payload.key is None
]
self._batch = []
if bad_messages:
Expand Down Expand Up @@ -133,7 +139,8 @@ def test_count(
processing_step: FakeProcessingStep,
) -> None:
dlq_count: DeadLetterQueue[KafkaPayload] = DeadLetterQueue(
processing_step, CountInvalidMessagePolicy(5)
processing_step,
CountInvalidMessagePolicy(next_policy=IgnoreInvalidMessagePolicy(), limit=5),
)
dlq_count.submit(valid_message)
for _ in range(5):
Expand All @@ -148,7 +155,10 @@ def test_count_short(
processing_step: FakeProcessingStep,
) -> None:
dlq_count_short: DeadLetterQueue[KafkaPayload] = DeadLetterQueue(
processing_step, CountInvalidMessagePolicy(5, 1)
processing_step,
CountInvalidMessagePolicy(
next_policy=IgnoreInvalidMessagePolicy(), limit=5, seconds=1
),
)
dlq_count_short.submit(valid_message)
for _ in range(5):
Expand All @@ -172,6 +182,7 @@ def test_stateful_count(
dlq_count_load_state: DeadLetterQueue[KafkaPayload] = DeadLetterQueue(
processing_step,
CountInvalidMessagePolicy(
next_policy=IgnoreInvalidMessagePolicy(),
limit=5,
load_state=state,
),
Expand All @@ -192,7 +203,9 @@ def test_multiple_invalid_messages(
invalid_message: Message[KafkaPayload],
) -> None:
fake_batching_processor = FakeBatchingProcessingStep()
count_policy = CountInvalidMessagePolicy(5)
count_policy = CountInvalidMessagePolicy(
next_policy=IgnoreInvalidMessagePolicy(), limit=5
)
dlq_count: DeadLetterQueue[KafkaPayload] = DeadLetterQueue(
fake_batching_processor, count_policy
)
Expand Down