Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dlq): Added produce policy #57

Merged
merged 32 commits into from
Apr 29, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
c0eff84
Added produce policy
rahul-kumar-saini Apr 6, 2022
1586318
timestamp generated automatically, topic optional
rahul-kumar-saini Apr 7, 2022
c31e7ee
Merge main
rahul-kumar-saini Apr 8, 2022
b80101a
updated produce policy
rahul-kumar-saini Apr 8, 2022
d3bdcf2
Updated count policy to inherit from produce policy
rahul-kumar-saini Apr 8, 2022
3ab4a8a
Merge branch 'main' into feat/produce-policy
rahul-kumar-saini Apr 8, 2022
fa0816d
Fixed count policy
rahul-kumar-saini Apr 8, 2022
5384a62
Produce each invalid message
rahul-kumar-saini Apr 14, 2022
3014ecc
Added reason to invalid messages exception
rahul-kumar-saini Apr 14, 2022
f17f1a3
Merge branch 'main' into feat/produce-policy
rahul-kumar-saini Apr 14, 2022
9a87416
Produce Policy no longer a base class
rahul-kumar-saini Apr 15, 2022
f41f845
Added join method for async tasks
rahul-kumar-saini Apr 15, 2022
808ef0a
Cleanup produce policy
rahul-kumar-saini Apr 15, 2022
1ded612
Prune done futures
rahul-kumar-saini Apr 18, 2022
f6b2a99
Type of message thrown must be str/bytes
rahul-kumar-saini Apr 18, 2022
3613508
Fix tests
rahul-kumar-saini Apr 18, 2022
7de42b9
Introduced InvalidMessage dataclass
rahul-kumar-saini Apr 20, 2022
0733829
Tested Produce policy
rahul-kumar-saini Apr 21, 2022
aa1b713
Tested content of message produced
rahul-kumar-saini Apr 21, 2022
e1d03b6
Fixed type
rahul-kumar-saini Apr 21, 2022
36bf466
Optionals in InvalidMessage and removed message storage getter
rahul-kumar-saini Apr 22, 2022
fe7ded0
Consumer from broker
rahul-kumar-saini Apr 22, 2022
de64e43
original_topic replaced with consumer_group
rahul-kumar-saini Apr 25, 2022
83ea2f1
Base 64 prefix
rahul-kumar-saini Apr 26, 2022
ec0e453
fix datetime encoding, add test for b64 payload
rahul-kumar-saini Apr 27, 2022
154b61a
timeout produce and prune all done futures
rahul-kumar-saini Apr 27, 2022
6e8ebd9
Merge InvalidMessage changes
rahul-kumar-saini Apr 28, 2022
42d0035
Typing
rahul-kumar-saini Apr 28, 2022
3bbe615
Refactored JSONMessageEncoder to policies/abstract.py
rahul-kumar-saini Apr 29, 2022
b79a369
Removed raise TypeError
rahul-kumar-saini Apr 29, 2022
ac71e52
Revert "Removed raise TypeError"
rahul-kumar-saini Apr 29, 2022
e0424b8
__init__
rahul-kumar-saini Apr 29, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions arroyo/processing/strategies/dead_letter_queue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from .policies.abstract import DeadLetterQueuePolicy, InvalidMessages
from .policies.count import CountInvalidMessagePolicy
from .policies.ignore import IgnoreInvalidMessagePolicy
from .policies.produce import ProduceInvalidMessagePolicy
from .policies.raise_e import RaiseInvalidMessagePolicy

__all__ = [
Expand All @@ -11,4 +12,5 @@
"CountInvalidMessagePolicy",
"IgnoreInvalidMessagePolicy",
"RaiseInvalidMessagePolicy",
"ProduceInvalidMessagePolicy",
]
Original file line number Diff line number Diff line change
@@ -1,18 +1,35 @@
from abc import ABC, abstractmethod
from typing import Any, Sequence
from datetime import datetime
from typing import Any, Optional, Sequence


class InvalidMessages(Exception):
"""
An exception to be thrown to pass bad messages to the DLQ
so they are handled correctly.

If the original topic the message(s) were produced to is known,
that should be passed along via this exception.
"""

def __init__(self, messages: Sequence[Any]):
def __init__(
self,
messages: Sequence[Any],
reason: Optional[str] = None,
rahul-kumar-saini marked this conversation as resolved.
Show resolved Hide resolved
original_topic: Optional[str] = None,
):
self.messages = messages
self.reason = reason or "unknown"
self.topic = original_topic or "unknown"
self.timestamp = str(datetime.now())
rahul-kumar-saini marked this conversation as resolved.
Show resolved Hide resolved

def __str__(self) -> str:
return f"Invalid Message(s): {self.messages}"
return (
f"Invalid Message originally produced to: {self.topic}\n"
f"Reason: {self.reason}\n"
f"Exception thrown at: {self.timestamp}\n"
f"Message(s): {self.messages}"
)


class DeadLetterQueuePolicy(ABC):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@
from time import time
from typing import NamedTuple, Optional, Sequence, Tuple

from arroyo.backends.kafka.consumer import KafkaProducer
from arroyo.processing.strategies.dead_letter_queue.policies.abstract import (
DeadLetterQueuePolicy,
InvalidMessages,
)
from arroyo.processing.strategies.dead_letter_queue.policies.produce import (
ProduceInvalidMessagePolicy,
)
from arroyo.types import Topic
from arroyo.utils.metrics import get_metrics


Expand All @@ -18,7 +22,7 @@ class _Bucket(NamedTuple):
hits: int


class CountInvalidMessagePolicy(DeadLetterQueuePolicy):
class CountInvalidMessagePolicy(ProduceInvalidMessagePolicy):
"""
Ignore invalid messages up to a certain limit per time unit window in seconds.
This window is 1 minute by default. The exception associated with the invalid
Expand All @@ -27,14 +31,22 @@ class CountInvalidMessagePolicy(DeadLetterQueuePolicy):
A saved state in the form `[(<timestamp: int>, <hits: int>), ...]` can be passed
on init to load a previously saved state. This state should be aggregated to
1 second buckets.

If a `KafkaProducer` and a `Topic` are passed to this policy, invalid messages
will not be ignored but will be produced to the topic using the producer instead.
"""

def __init__(
self,
limit: int,
seconds: int = 60,
load_state: Optional[Sequence[Tuple[int, int]]] = None,
producer: Optional[KafkaProducer] = None,
dead_letter_topic: Optional[Topic] = None,
) -> None:
self.__produce = producer is not None and dead_letter_topic is not None
if producer is not None and dead_letter_topic is not None:
super().__init__(producer, dead_letter_topic)
self.__limit = limit
self.__seconds = seconds
self.__metrics = get_metrics()
Expand All @@ -49,7 +61,13 @@ def handle_invalid_messages(self, e: InvalidMessages) -> None:
self._add(e)
if self._count() > self.__limit:
raise e
self.__metrics.increment("dlq.dropped_messages", len(e.messages))
self._produce_or_ignore(e)

def _produce_or_ignore(self, e: InvalidMessages) -> None:
if self.__produce:
super().handle_invalid_messages(e)
else:
self.__metrics.increment("dlq.dropped_messages", len(e.messages))

def _add(self, e: InvalidMessages) -> None:
now = int(time())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import json

from arroyo.backends.kafka.consumer import KafkaPayload, KafkaProducer
from arroyo.processing.strategies.dead_letter_queue.policies.abstract import (
DeadLetterQueuePolicy,
InvalidMessages,
)
from arroyo.types import Topic
from arroyo.utils.metrics import get_metrics


class ProduceInvalidMessagePolicy(DeadLetterQueuePolicy):
rahul-kumar-saini marked this conversation as resolved.
Show resolved Hide resolved
"""
Produces given InvalidMessages to a dead letter topic.

Meant to be used as a baseclass for policies needing to produce
invalid messages to a dead letter topic.
"""

def __init__(self, producer: KafkaProducer, dead_letter_topic: Topic) -> None:
rahul-kumar-saini marked this conversation as resolved.
Show resolved Hide resolved
self.__metrics = get_metrics()
self.__dead_letter_topic = dead_letter_topic
self.__producer = producer

def handle_invalid_messages(self, e: InvalidMessages) -> None:
"""
Produces a message to the given dead letter topic for each
invalid message in the form:
{
"topic": <original topic the bad message was produced to>,
"reason": <why the message(s) are bad>
"timestamp": <time at which exception was thrown>,
"message": <original bad message>
}
"""
for message in e.messages:
data = json.dumps(
{
"topic": e.topic,
"reason": e.reason,
"timestamp": e.timestamp,
"message": message,
}
).encode("utf-8")
payload = KafkaPayload(key=None, value=data, headers=[])
self.__producer.produce(
destination=self.__dead_letter_topic, payload=payload
)
rahul-kumar-saini marked this conversation as resolved.
Show resolved Hide resolved
self.__metrics.increment("dlq.produced_messages")