-
-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(consumer): Support incremental cooperative rebalancing #53
Changes from 4 commits
59e94a2
61e5119
515e409
b86f5aa
ba12163
a7cfb46
5c793b1
02af23d
62ad537
7001ec9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -143,9 +143,10 @@ class KafkaConsumer(Consumer[KafkaPayload]): | |
|
||
def __init__( | ||
self, | ||
configuration: Mapping[str, Any], | ||
configuration: MutableMapping[str, Any], | ||
*, | ||
commit_retry_policy: Optional[RetryPolicy] = None, | ||
incremental_cooperative: bool = False, | ||
) -> None: | ||
if commit_retry_policy is None: | ||
commit_retry_policy = NoRetryPolicy() | ||
|
@@ -182,6 +183,11 @@ def __init__( | |
"invalid value for 'enable.auto.offset.store' configuration" | ||
) | ||
|
||
self.__incremental_cooperative = incremental_cooperative | ||
|
||
if self.__incremental_cooperative is True: | ||
configuration["partition.assignment.strategy"] = "cooperative-sticky" | ||
|
||
# NOTE: Offsets are explicitly managed as part of the assignment | ||
# callback, so preemptively resetting offsets is not enabled. | ||
self.__consumer = ConfluentConsumer( | ||
|
@@ -246,40 +252,74 @@ def assignment_callback( | |
) -> None: | ||
self.__state = KafkaConsumerState.ASSIGNING | ||
|
||
try: | ||
assignment: MutableSequence[ConfluentTopicPartition] = [] | ||
|
||
for partition in self.__consumer.committed(partitions): | ||
if partition.offset >= 0: | ||
assignment.append(partition) | ||
elif partition.offset == OFFSET_INVALID: | ||
assignment.append( | ||
self.__resolve_partition_starting_offset(partition) | ||
) | ||
else: | ||
raise ValueError("received unexpected offset") | ||
|
||
offsets: MutableMapping[Partition, int] = { | ||
Partition(Topic(i.topic), i.partition): i.offset for i in assignment | ||
} | ||
self.__seek(offsets) | ||
|
||
# Ensure that all partitions are resumed on assignment to avoid | ||
# carrying over state from a previous assignment. | ||
self.__consumer.resume( | ||
[ | ||
ConfluentTopicPartition( | ||
partition.topic.name, partition.index, offset | ||
) | ||
for partition, offset in offsets.items() | ||
] | ||
) | ||
if self.__incremental_cooperative is True: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This diff got pretty weird because of indentation. This block from here to line 273 is new though, and could use a close review. It's quite different to what happens with eager rebalancing (lines 276-305), which is essentially unchanged. |
||
try: | ||
incremental_assignment: MutableSequence[ | ||
ConfluentTopicPartition | ||
] = [] | ||
|
||
for partition in partitions: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I am getting this right, you are not asking the broker anymore for the last committed offset. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think you can receive a partition you already owned, but in any case I don't quite get why we would need to check the offset since the offset you are receiving should already be the correct one. The other issue I noticed with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will be important to verify. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I don't think this is true. The offsets passed to the assignment callback are already the committed ones not just the ones that are consumed. Kafka guarantees that
The reason for committing offsets during revoke is to avoid double processing them so the new consumer doesn't get that same offset again. But we wouldn't be skipping it either way - only processing it either once or twice. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Are you sure. I thought this was the reason for making this consumer behave differently from the standard Kafka consumer https://github.com/getsentry/arroyo/blob/main/arroyo/backends/kafka/consumer.py#L109-L118. Anyway, if Kafka guarantees that only new partitions are passed and not previously owned one, we should be good. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, but this only applies to the partitions that are being newly assigned to the consumer. Previously all of the partitions (including those already assigned to the consumer) would be provided in the callback, now only the incremental ones are. So we are no longer rewinding back those partitions which the consumer is maintaining across a rebalance like we used to, those just continue from the same place. |
||
if partition.offset >= 0: | ||
incremental_assignment.append(partition) | ||
elif partition.offset == OFFSET_INVALID: | ||
incremental_assignment.append( | ||
self.__resolve_partition_starting_offset(partition) | ||
) | ||
else: | ||
raise ValueError("received unexpected offset") | ||
|
||
offsets = { | ||
Partition(Topic(i.topic), i.partition): i.offset | ||
for i in incremental_assignment | ||
} | ||
|
||
self.__incremental_assign(offsets) | ||
|
||
# Ensure that all partitions are resumed on assignment to avoid | ||
# carrying over state from a previous assignment. | ||
self.resume([p for p in offsets]) | ||
|
||
except Exception: | ||
self.__state = KafkaConsumerState.ERROR | ||
raise | ||
|
||
for partition in offsets: | ||
self.__paused.discard(partition) | ||
except Exception: | ||
self.__state = KafkaConsumerState.ERROR | ||
raise | ||
else: | ||
try: | ||
assignment: MutableSequence[ConfluentTopicPartition] = [] | ||
|
||
for partition in self.__consumer.committed(partitions): | ||
if partition.offset >= 0: | ||
assignment.append(partition) | ||
elif partition.offset == OFFSET_INVALID: | ||
assignment.append( | ||
self.__resolve_partition_starting_offset(partition) | ||
) | ||
else: | ||
raise ValueError("received unexpected offset") | ||
|
||
offsets = { | ||
Partition(Topic(i.topic), i.partition): i.offset | ||
for i in assignment | ||
} | ||
|
||
self.__assign(offsets) | ||
|
||
# Ensure that all partitions are resumed on assignment to avoid | ||
# carrying over state from a previous assignment. | ||
self.__consumer.resume( | ||
[ | ||
ConfluentTopicPartition( | ||
partition.topic.name, partition.index, offset | ||
) | ||
for partition, offset in offsets.items() | ||
] | ||
) | ||
|
||
for partition in offsets: | ||
self.__paused.discard(partition) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason for relying on the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No reason, I just mostly avoided touching this implementation (it's just indented differently so it shows up here). But yes it's cleaner to reuse |
||
except Exception: | ||
self.__state = KafkaConsumerState.ERROR | ||
raise | ||
|
||
try: | ||
if on_assign is not None: | ||
|
@@ -431,29 +471,33 @@ def __validate_offsets(self, offsets: Mapping[Partition, int]) -> None: | |
if invalid_offsets: | ||
raise ConsumerError(f"invalid offsets: {invalid_offsets!r}") | ||
|
||
def __assign(self, offsets: Mapping[Partition, int]) -> None: | ||
self.__validate_offsets(offsets) | ||
self.__consumer.assign( | ||
[ | ||
ConfluentTopicPartition(partition.topic.name, partition.index, offset) | ||
for partition, offset in offsets.items() | ||
] | ||
) | ||
self.__offsets.update(offsets) | ||
|
||
def __incremental_assign(self, offsets: Mapping[Partition, int]) -> None: | ||
self.__validate_offsets(offsets) | ||
self.__consumer.incremental_assign( | ||
[ | ||
ConfluentTopicPartition(partition.topic.name, partition.index, offset) | ||
for partition, offset in offsets.items() | ||
] | ||
) | ||
self.__offsets.update(offsets) | ||
|
||
def __seek(self, offsets: Mapping[Partition, int]) -> None: | ||
self.__validate_offsets(offsets) | ||
|
||
if self.__state is KafkaConsumerState.ASSIGNING: | ||
# Calling ``seek`` on the Confluent consumer from an assignment | ||
# callback will throw an "Erroneous state" error. Instead, | ||
# partition offsets have to be initialized by calling ``assign``. | ||
self.__consumer.assign( | ||
[ | ||
ConfluentTopicPartition( | ||
partition.topic.name, partition.index, offset | ||
) | ||
for partition, offset in offsets.items() | ||
] | ||
for partition, offset in offsets.items(): | ||
self.__consumer.seek( | ||
ConfluentTopicPartition(partition.topic.name, partition.index, offset) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why this change ? Are we never going to get to this method during assignment ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I found it pretty unintuitive that the same There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, now this method is only called by the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was avoiding touching a lot of code earlier, but yes it is better inline since no one else calls it now. Updated. |
||
) | ||
else: | ||
for partition, offset in offsets.items(): | ||
self.__consumer.seek( | ||
ConfluentTopicPartition( | ||
partition.topic.name, partition.index, offset | ||
) | ||
) | ||
|
||
self.__offsets.update(offsets) | ||
|
||
def seek(self, offsets: Mapping[Partition, int]) -> None: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,7 +6,7 @@ | |
from contextlib import closing | ||
from datetime import datetime | ||
from pickle import PickleBuffer | ||
from typing import Iterator, MutableSequence, Optional | ||
from typing import Any, Iterator, Mapping, MutableSequence, Optional | ||
from unittest import TestCase | ||
|
||
import pytest | ||
|
@@ -46,6 +46,25 @@ def test_payload_pickle_out_of_band() -> None: | |
assert pickle.loads(data, buffers=[b.raw() for b in buffers]) == payload | ||
|
||
|
||
@contextlib.contextmanager | ||
def get_topic( | ||
configuration: Mapping[str, Any], partitions_count: int | ||
) -> Iterator[Topic]: | ||
name = f"test-{uuid.uuid1().hex}" | ||
client = AdminClient(configuration) | ||
[[key, future]] = client.create_topics( | ||
[NewTopic(name, num_partitions=partitions_count, replication_factor=1)] | ||
).items() | ||
assert key == name | ||
assert future.result() is None | ||
try: | ||
yield Topic(name) | ||
finally: | ||
[[key, future]] = client.delete_topics([name]).items() | ||
assert key == name | ||
assert future.result() is None | ||
|
||
|
||
class KafkaStreamsTestCase(StreamsTestMixin[KafkaPayload], TestCase): | ||
|
||
configuration = build_kafka_configuration( | ||
|
@@ -54,19 +73,11 @@ class KafkaStreamsTestCase(StreamsTestMixin[KafkaPayload], TestCase): | |
|
||
@contextlib.contextmanager | ||
def get_topic(self, partitions: int = 1) -> Iterator[Topic]: | ||
name = f"test-{uuid.uuid1().hex}" | ||
client = AdminClient(self.configuration) | ||
[[key, future]] = client.create_topics( | ||
[NewTopic(name, num_partitions=partitions, replication_factor=1)] | ||
).items() | ||
assert key == name | ||
assert future.result() is None | ||
try: | ||
yield Topic(name) | ||
finally: | ||
[[key, future]] = client.delete_topics([name]).items() | ||
assert key == name | ||
assert future.result() is None | ||
with get_topic(self.configuration, partitions) as topic: | ||
try: | ||
yield topic | ||
finally: | ||
pass | ||
|
||
def get_consumer( | ||
self, | ||
|
@@ -133,6 +144,70 @@ def test_auto_offset_reset_error(self) -> None: | |
consumer.poll(10.0) # XXX: getting the subcription is slow | ||
|
||
|
||
def test_cooperative_rebalancing() -> None: | ||
configuration = build_kafka_configuration( | ||
{"bootstrap.servers": os.environ.get("DEFAULT_BROKERS", "localhost:9092")} | ||
) | ||
|
||
partitions_count = 2 | ||
|
||
group_id = uuid.uuid1().hex | ||
producer = KafkaProducer(configuration) | ||
|
||
consumer_a = KafkaConsumer( | ||
{ | ||
**configuration, | ||
"auto.offset.reset": "earliest", | ||
"enable.auto.commit": False, | ||
"enable.auto.offset.store": False, | ||
"group.id": group_id, | ||
"session.timeout.ms": 10000, | ||
}, | ||
incremental_cooperative=True, | ||
) | ||
consumer_b = KafkaConsumer( | ||
{ | ||
**configuration, | ||
"auto.offset.reset": "earliest", | ||
"enable.auto.commit": False, | ||
"enable.auto.offset.store": False, | ||
"group.id": group_id, | ||
"session.timeout.ms": 10000, | ||
}, | ||
incremental_cooperative=True, | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can't combine cooperative and non cooperative in the same group. I'm not sure of other solutions apart from stopping them all and restarting all with the new configuration. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, let's do it one day I am on PTO. |
||
|
||
with get_topic(configuration, partitions_count) as topic, closing( | ||
producer | ||
), closing(consumer_a), closing(consumer_b): | ||
for i in range(10): | ||
for j in range(partitions_count): | ||
producer.produce( | ||
Partition(topic, 1), | ||
KafkaPayload(None, f"{j}-{i}".encode("utf8"), []), | ||
) | ||
|
||
consumer_a.subscribe([topic]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not providing callbacks so that you can assert that only one partition is transferred ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess that could've worked too. Still this method of checking the assigned partitions via consumer.tell() works well enough in checking the same thing and it's copied from the test of the eager consumer. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right but there is no way to tell whether all partitions were revoked and then reassigned or whether the incremental assignment actually works. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah this was not about testing Kafka consumer internals, just that we end up with one partition assigned to each consumer and consuming on both works ok. |
||
|
||
assert consumer_a.poll(10.0) is not None | ||
|
||
# Consumer A has 2 partitions assigned, B has none | ||
assert len(consumer_a.tell()) == 2 | ||
assert len(consumer_b.tell()) == 0 | ||
|
||
consumer_b.subscribe([topic]) | ||
consumer_a.pause([Partition(topic, 0), Partition(topic, 1)]) | ||
|
||
# At some point, 1 partition will move to consumer B | ||
for i in range(10): | ||
assert consumer_a.poll(0) is None # attempt to force session timeout | ||
if consumer_b.poll(1.0) is not None: | ||
break | ||
|
||
assert len(consumer_a.tell()) == 1 | ||
assert len(consumer_b.tell()) == 1 | ||
|
||
|
||
def test_commit_codec() -> None: | ||
commit = Commit("group", Partition(Topic("topic"), 0), 0, datetime.now()) | ||
assert commit_codec.decode(commit_codec.encode(commit)) == commit | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why providing
incremental_cooperative
as a dedicated flag instead of passing it through theconfiguration
mapping like all the others config parameters ?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had it the other way originally but figured it was easier to pass a boolean than remember these strings then check for it to decide whether to apply incremental_assign. Thinking about it again, I might switch back to avoid changing the interface of this class though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also we would not reintroduce two separate ways to provide Kafka config that took us long to clean up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was only one way. Passing the flag was the only way that would have worked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am talking about all those fields we were passing via CLI that we are slowly moving to settings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately passing via CLI is still the easiest option currently as we don't have a mechanism yet to provide different settings for consumers/producers of a topic via settings and this is a consumer-only configuration.