From 59e94a2a71513cb6829cef53ce195c37a58df09d Mon Sep 17 00:00:00 2001 From: Lyn Date: Wed, 30 Mar 2022 16:24:26 -0700 Subject: [PATCH 01/10] feat(consumer): Support incremental cooperative rebalancing KafkaConsumer now takes an `incremental` argument. If set to true, the sticky assignor is used. The goal of this change is to try and reduce the amount of unnecessary rebalances that happen during Kubernetes rolling deployments. --- arroyo/backends/kafka/consumer.py | 75 ++++++++++++++++++-------- tests/backends/mixins.py | 12 ++--- tests/backends/test_kafka.py | 87 +++++++++++++++++++++++++++++++ 3 files changed, 147 insertions(+), 27 deletions(-) diff --git a/arroyo/backends/kafka/consumer.py b/arroyo/backends/kafka/consumer.py index 35c9b0c2..c5b6eb9c 100644 --- a/arroyo/backends/kafka/consumer.py +++ b/arroyo/backends/kafka/consumer.py @@ -143,9 +143,10 @@ class KafkaConsumer(Consumer[KafkaPayload]): def __init__( self, - configuration: Mapping[str, Any], + configuration: MutableMapping[str, Any], *, commit_retry_policy: Optional[RetryPolicy] = None, + incremental: bool = False, ) -> None: if commit_retry_policy is None: commit_retry_policy = NoRetryPolicy() @@ -182,6 +183,11 @@ def __init__( "invalid value for 'enable.auto.offset.store' configuration" ) + self.__incremental = incremental + + if self.__incremental is True: + configuration["partition.assignment.strategy"] = "cooperative-sticky" + # NOTE: Offsets are explicitly managed as part of the assignment # callback, so preemptively resetting offsets is not enabled. self.__consumer = ConfluentConsumer( @@ -262,7 +268,30 @@ def assignment_callback( offsets: MutableMapping[Partition, int] = { Partition(Topic(i.topic), i.partition): i.offset for i in assignment } - self.__seek(offsets) + + if self.__incremental is True: + incremental_assignment: MutableSequence[ + ConfluentTopicPartition + ] = [] + + for partition in partitions: + if partition.offset >= 0: + incremental_assignment.append(partition) + elif partition.offset == OFFSET_INVALID: + incremental_assignment.append( + self.__resolve_partition_starting_offset(partition) + ) + else: + raise ValueError("received unexpected offset") + + self.__incremental_assign( + { + Partition(Topic(i.topic), i.partition): i.offset + for i in assignment + } + ) + else: + self.__assign(offsets) # Ensure that all partitions are resumed on assignment to avoid # carrying over state from a previous assignment. @@ -431,29 +460,33 @@ def __validate_offsets(self, offsets: Mapping[Partition, int]) -> None: if invalid_offsets: raise ConsumerError(f"invalid offsets: {invalid_offsets!r}") + def __assign(self, offsets: Mapping[Partition, int]) -> None: + self.__validate_offsets(offsets) + self.__consumer.assign( + [ + ConfluentTopicPartition(partition.topic.name, partition.index, offset) + for partition, offset in offsets.items() + ] + ) + self.__offsets.update(offsets) + + def __incremental_assign(self, offsets: Mapping[Partition, int]) -> None: + self.__validate_offsets(offsets) + self.__consumer.incremental_assign( + [ + ConfluentTopicPartition(partition.topic.name, partition.index, offset) + for partition, offset in offsets.items() + ] + ) + self.__offsets.update(offsets) + def __seek(self, offsets: Mapping[Partition, int]) -> None: self.__validate_offsets(offsets) - if self.__state is KafkaConsumerState.ASSIGNING: - # Calling ``seek`` on the Confluent consumer from an assignment - # callback will throw an "Erroneous state" error. Instead, - # partition offsets have to be initialized by calling ``assign``. - self.__consumer.assign( - [ - ConfluentTopicPartition( - partition.topic.name, partition.index, offset - ) - for partition, offset in offsets.items() - ] + for partition, offset in offsets.items(): + self.__consumer.seek( + ConfluentTopicPartition(partition.topic.name, partition.index, offset) ) - else: - for partition, offset in offsets.items(): - self.__consumer.seek( - ConfluentTopicPartition( - partition.topic.name, partition.index, offset - ) - ) - self.__offsets.update(offsets) def seek(self, offsets: Mapping[Partition, int]) -> None: diff --git a/tests/backends/mixins.py b/tests/backends/mixins.py index 91bd13fe..e2f4c45b 100644 --- a/tests/backends/mixins.py +++ b/tests/backends/mixins.py @@ -51,10 +51,10 @@ def test_consumer(self) -> None: def _assignment_callback(partitions: Mapping[Partition, int]) -> None: assert partitions == {Partition(topic, 0): messages[0].offset} - consumer.seek({Partition(topic, 0): messages[1].offset}) + # consumer.seek({Partition(topic, 0): messages[1].offset}) - with pytest.raises(ConsumerError): - consumer.seek({Partition(topic, 1): 0}) + # with pytest.raises(ConsumerError): + # consumer.seek({Partition(topic, 1): 0}) assignment_callback = mock.Mock(side_effect=_assignment_callback) @@ -77,14 +77,14 @@ def _revocation_callback(partitions: Sequence[Partition]) -> None: with assert_changes( lambda: assignment_callback.called, False, True ), assert_changes( - consumer.tell, {}, {Partition(topic, 0): messages[1].next_offset} + consumer.tell, {}, {Partition(topic, 0): messages[1].offset} ): message = consumer.poll(10.0) # XXX: getting the subcription is slow assert isinstance(message, Message) assert message.partition == Partition(topic, 0) - assert message.offset == messages[1].offset - assert message.payload == messages[1].payload + assert message.offset == messages[0].offset + assert message.payload == messages[0].payload consumer.seek({Partition(topic, 0): messages[0].offset}) assert consumer.tell() == {Partition(topic, 0): messages[0].offset} diff --git a/tests/backends/test_kafka.py b/tests/backends/test_kafka.py index 6242ddc3..52b0f81d 100644 --- a/tests/backends/test_kafka.py +++ b/tests/backends/test_kafka.py @@ -133,6 +133,93 @@ def test_auto_offset_reset_error(self) -> None: consumer.poll(10.0) # XXX: getting the subcription is slow +class KafkaStreamsCooperativeStickyTestCase(StreamsTestMixin[KafkaPayload], TestCase): + configuration = build_kafka_configuration( + {"bootstrap.servers": os.environ.get("DEFAULT_BROKERS", "localhost:9092")} + ) + + @contextlib.contextmanager + def get_topic(self, partitions: int = 1) -> Iterator[Topic]: + name = f"test-{uuid.uuid1().hex}" + client = AdminClient(self.configuration) + [[key, future]] = client.create_topics( + [NewTopic(name, num_partitions=partitions, replication_factor=1)] + ).items() + assert key == name + assert future.result() is None + try: + yield Topic(name) + finally: + [[key, future]] = client.delete_topics([name]).items() + assert key == name + assert future.result() is None + + def get_consumer( + self, + group: Optional[str] = None, + enable_end_of_partition: bool = True, + auto_offset_reset: str = "earliest", + ) -> KafkaConsumer: + return KafkaConsumer( + { + **self.configuration, + "auto.offset.reset": auto_offset_reset, + "enable.auto.commit": "false", + "enable.auto.offset.store": "false", + "enable.partition.eof": enable_end_of_partition, + "group.id": group if group is not None else uuid.uuid1().hex, + "session.timeout.ms": 10000, + }, + incremental=True, + ) + + def get_producer(self) -> KafkaProducer: + return KafkaProducer(self.configuration) + + def get_payloads(self) -> Iterator[KafkaPayload]: + for i in itertools.count(): + yield KafkaPayload(None, f"{i}".encode("utf8"), []) + + def test_auto_offset_reset_earliest(self) -> None: + with self.get_topic() as topic: + with closing(self.get_producer()) as producer: + producer.produce(topic, next(self.get_payloads())).result(5.0) + + with closing(self.get_consumer(auto_offset_reset="earliest")) as consumer: + consumer.subscribe([topic]) + + message = consumer.poll(10.0) + assert isinstance(message, Message) + assert message.offset == 0 + + def test_auto_offset_reset_latest(self) -> None: + with self.get_topic() as topic: + with closing(self.get_producer()) as producer: + producer.produce(topic, next(self.get_payloads())).result(5.0) + + with closing(self.get_consumer(auto_offset_reset="latest")) as consumer: + consumer.subscribe([topic]) + + try: + consumer.poll(10.0) # XXX: getting the subcription is slow + except EndOfPartition as error: + assert error.partition == Partition(topic, 0) + assert error.offset == 1 + else: + raise AssertionError("expected EndOfPartition error") + + def test_auto_offset_reset_error(self) -> None: + with self.get_topic() as topic: + with closing(self.get_producer()) as producer: + producer.produce(topic, next(self.get_payloads())).result(5.0) + + with closing(self.get_consumer(auto_offset_reset="error")) as consumer: + consumer.subscribe([topic]) + + with pytest.raises(ConsumerError): + consumer.poll(10.0) # XXX: getting the subcription is slow + + def test_commit_codec() -> None: commit = Commit("group", Partition(Topic("topic"), 0), 0, datetime.now()) assert commit_codec.decode(commit_codec.encode(commit)) == commit From 61e51191671a49029ffcd7c204998d99a0ba2ebd Mon Sep 17 00:00:00 2001 From: Lyn Date: Wed, 30 Mar 2022 22:30:52 -0700 Subject: [PATCH 02/10] updates --- arroyo/backends/kafka/consumer.py | 74 +++++++------- tests/backends/test_kafka.py | 164 ++++++++++++++---------------- 2 files changed, 113 insertions(+), 125 deletions(-) diff --git a/arroyo/backends/kafka/consumer.py b/arroyo/backends/kafka/consumer.py index c5b6eb9c..a7baa025 100644 --- a/arroyo/backends/kafka/consumer.py +++ b/arroyo/backends/kafka/consumer.py @@ -252,63 +252,63 @@ def assignment_callback( ) -> None: self.__state = KafkaConsumerState.ASSIGNING - try: - assignment: MutableSequence[ConfluentTopicPartition] = [] + if self.__incremental is True: + incremental_assignment: MutableSequence[ConfluentTopicPartition] = [] - for partition in self.__consumer.committed(partitions): + for partition in partitions: if partition.offset >= 0: - assignment.append(partition) + incremental_assignment.append(partition) elif partition.offset == OFFSET_INVALID: - assignment.append( + incremental_assignment.append( self.__resolve_partition_starting_offset(partition) ) else: raise ValueError("received unexpected offset") - offsets: MutableMapping[Partition, int] = { - Partition(Topic(i.topic), i.partition): i.offset for i in assignment - } + self.__incremental_assign( + { + Partition(Topic(i.topic), i.partition): i.offset + for i in incremental_assignment + } + ) - if self.__incremental is True: - incremental_assignment: MutableSequence[ - ConfluentTopicPartition - ] = [] + else: + try: + assignment: MutableSequence[ConfluentTopicPartition] = [] - for partition in partitions: + for partition in self.__consumer.committed(partitions): if partition.offset >= 0: - incremental_assignment.append(partition) + assignment.append(partition) elif partition.offset == OFFSET_INVALID: - incremental_assignment.append( + assignment.append( self.__resolve_partition_starting_offset(partition) ) else: raise ValueError("received unexpected offset") - self.__incremental_assign( - { - Partition(Topic(i.topic), i.partition): i.offset - for i in assignment - } - ) - else: + offsets: MutableMapping[Partition, int] = { + Partition(Topic(i.topic), i.partition): i.offset + for i in assignment + } + self.__assign(offsets) - # Ensure that all partitions are resumed on assignment to avoid - # carrying over state from a previous assignment. - self.__consumer.resume( - [ - ConfluentTopicPartition( - partition.topic.name, partition.index, offset - ) - for partition, offset in offsets.items() - ] - ) + # Ensure that all partitions are resumed on assignment to avoid + # carrying over state from a previous assignment. + self.__consumer.resume( + [ + ConfluentTopicPartition( + partition.topic.name, partition.index, offset + ) + for partition, offset in offsets.items() + ] + ) - for partition in offsets: - self.__paused.discard(partition) - except Exception: - self.__state = KafkaConsumerState.ERROR - raise + for partition in offsets: + self.__paused.discard(partition) + except Exception: + self.__state = KafkaConsumerState.ERROR + raise try: if on_assign is not None: diff --git a/tests/backends/test_kafka.py b/tests/backends/test_kafka.py index 52b0f81d..d2a6c586 100644 --- a/tests/backends/test_kafka.py +++ b/tests/backends/test_kafka.py @@ -6,7 +6,7 @@ from contextlib import closing from datetime import datetime from pickle import PickleBuffer -from typing import Iterator, MutableSequence, Optional +from typing import Any, Iterator, Mapping, MutableSequence, Optional from unittest import TestCase import pytest @@ -46,6 +46,25 @@ def test_payload_pickle_out_of_band() -> None: assert pickle.loads(data, buffers=[b.raw() for b in buffers]) == payload +@contextlib.contextmanager +def get_topic( + configuration: Mapping[str, Any], partitions_count: int +) -> Iterator[Topic]: + name = f"test-{uuid.uuid1().hex}" + client = AdminClient(configuration) + [[key, future]] = client.create_topics( + [NewTopic(name, num_partitions=partitions_count, replication_factor=1)] + ).items() + assert key == name + assert future.result() is None + try: + yield Topic(name) + finally: + [[key, future]] = client.delete_topics([name]).items() + assert key == name + assert future.result() is None + + class KafkaStreamsTestCase(StreamsTestMixin[KafkaPayload], TestCase): configuration = build_kafka_configuration( @@ -54,19 +73,11 @@ class KafkaStreamsTestCase(StreamsTestMixin[KafkaPayload], TestCase): @contextlib.contextmanager def get_topic(self, partitions: int = 1) -> Iterator[Topic]: - name = f"test-{uuid.uuid1().hex}" - client = AdminClient(self.configuration) - [[key, future]] = client.create_topics( - [NewTopic(name, num_partitions=partitions, replication_factor=1)] - ).items() - assert key == name - assert future.result() is None - try: - yield Topic(name) - finally: - [[key, future]] = client.delete_topics([name]).items() - assert key == name - assert future.result() is None + with get_topic(self.configuration, partitions) as topic: + try: + yield topic + finally: + pass def get_consumer( self, @@ -133,91 +144,68 @@ def test_auto_offset_reset_error(self) -> None: consumer.poll(10.0) # XXX: getting the subcription is slow -class KafkaStreamsCooperativeStickyTestCase(StreamsTestMixin[KafkaPayload], TestCase): +def test_cooperative_rebalancing() -> None: configuration = build_kafka_configuration( {"bootstrap.servers": os.environ.get("DEFAULT_BROKERS", "localhost:9092")} ) - @contextlib.contextmanager - def get_topic(self, partitions: int = 1) -> Iterator[Topic]: - name = f"test-{uuid.uuid1().hex}" - client = AdminClient(self.configuration) - [[key, future]] = client.create_topics( - [NewTopic(name, num_partitions=partitions, replication_factor=1)] - ).items() - assert key == name - assert future.result() is None - try: - yield Topic(name) - finally: - [[key, future]] = client.delete_topics([name]).items() - assert key == name - assert future.result() is None - - def get_consumer( - self, - group: Optional[str] = None, - enable_end_of_partition: bool = True, - auto_offset_reset: str = "earliest", - ) -> KafkaConsumer: - return KafkaConsumer( - { - **self.configuration, - "auto.offset.reset": auto_offset_reset, - "enable.auto.commit": "false", - "enable.auto.offset.store": "false", - "enable.partition.eof": enable_end_of_partition, - "group.id": group if group is not None else uuid.uuid1().hex, - "session.timeout.ms": 10000, - }, - incremental=True, - ) - - def get_producer(self) -> KafkaProducer: - return KafkaProducer(self.configuration) - - def get_payloads(self) -> Iterator[KafkaPayload]: - for i in itertools.count(): - yield KafkaPayload(None, f"{i}".encode("utf8"), []) - - def test_auto_offset_reset_earliest(self) -> None: - with self.get_topic() as topic: - with closing(self.get_producer()) as producer: - producer.produce(topic, next(self.get_payloads())).result(5.0) - - with closing(self.get_consumer(auto_offset_reset="earliest")) as consumer: - consumer.subscribe([topic]) + partitions_count = 2 + + group_id = uuid.uuid1().hex + producer = KafkaProducer(configuration) + + consumer_a = KafkaConsumer( + { + **configuration, + "auto.offset.reset": "earliest", + "enable.auto.commit": False, + "enable.auto.offset.store": False, + "group.id": group_id, + "session.timeout.ms": 10000, + }, + incremental=True, + ) + consumer_b = KafkaConsumer( + { + **configuration, + "auto.offset.reset": "earliest", + "enable.auto.commit": False, + "enable.auto.offset.store": False, + "group.id": group_id, + "session.timeout.ms": 10000, + }, + incremental=True, + ) - message = consumer.poll(10.0) - assert isinstance(message, Message) - assert message.offset == 0 + with get_topic(configuration, partitions_count) as topic, closing( + producer + ), closing(consumer_a), closing(consumer_b): + for i in range(10): + for j in range(partitions_count): + producer.produce( + Partition(topic, 1), + KafkaPayload(None, f"{j}-{i}".encode("utf8"), []), + ) - def test_auto_offset_reset_latest(self) -> None: - with self.get_topic() as topic: - with closing(self.get_producer()) as producer: - producer.produce(topic, next(self.get_payloads())).result(5.0) + consumer_a.subscribe([topic]) - with closing(self.get_consumer(auto_offset_reset="latest")) as consumer: - consumer.subscribe([topic]) + assert consumer_a.poll(10.0) is not None - try: - consumer.poll(10.0) # XXX: getting the subcription is slow - except EndOfPartition as error: - assert error.partition == Partition(topic, 0) - assert error.offset == 1 - else: - raise AssertionError("expected EndOfPartition error") + # Consumer A has 2 partitions assigned, B has none + assert len(consumer_a.tell()) == 2 + assert len(consumer_b.tell()) == 0 - def test_auto_offset_reset_error(self) -> None: - with self.get_topic() as topic: - with closing(self.get_producer()) as producer: - producer.produce(topic, next(self.get_payloads())).result(5.0) + consumer_b.subscribe([topic]) + consumer_a.pause([Partition(topic, 0), Partition(topic, 1)]) - with closing(self.get_consumer(auto_offset_reset="error")) as consumer: - consumer.subscribe([topic]) + # At some point, 1 partition will move to consumer B + for i in range(10): + assert consumer_a.poll(0) is None # attempt to force session timeout + if consumer_b.poll(1.0) is not None: + break - with pytest.raises(ConsumerError): - consumer.poll(10.0) # XXX: getting the subcription is slow + assert len(consumer_a.tell()) == 1 + assert len(consumer_b.tell()) == 1 def test_commit_codec() -> None: From 515e409d9c65b5873b53621b80aca2175f4b1ddf Mon Sep 17 00:00:00 2001 From: Lyn Date: Wed, 30 Mar 2022 22:36:01 -0700 Subject: [PATCH 03/10] revert test changes, rename --- arroyo/backends/kafka/consumer.py | 8 ++++---- tests/backends/mixins.py | 12 ++++++------ tests/backends/test_kafka.py | 4 ++-- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/arroyo/backends/kafka/consumer.py b/arroyo/backends/kafka/consumer.py index a7baa025..b63c20eb 100644 --- a/arroyo/backends/kafka/consumer.py +++ b/arroyo/backends/kafka/consumer.py @@ -146,7 +146,7 @@ def __init__( configuration: MutableMapping[str, Any], *, commit_retry_policy: Optional[RetryPolicy] = None, - incremental: bool = False, + incremental_cooperative: bool = False, ) -> None: if commit_retry_policy is None: commit_retry_policy = NoRetryPolicy() @@ -183,9 +183,9 @@ def __init__( "invalid value for 'enable.auto.offset.store' configuration" ) - self.__incremental = incremental + self.__incremental_cooperative = incremental_cooperative - if self.__incremental is True: + if self.__incremental_cooperative is True: configuration["partition.assignment.strategy"] = "cooperative-sticky" # NOTE: Offsets are explicitly managed as part of the assignment @@ -252,7 +252,7 @@ def assignment_callback( ) -> None: self.__state = KafkaConsumerState.ASSIGNING - if self.__incremental is True: + if self.__incremental_cooperative is True: incremental_assignment: MutableSequence[ConfluentTopicPartition] = [] for partition in partitions: diff --git a/tests/backends/mixins.py b/tests/backends/mixins.py index e2f4c45b..91bd13fe 100644 --- a/tests/backends/mixins.py +++ b/tests/backends/mixins.py @@ -51,10 +51,10 @@ def test_consumer(self) -> None: def _assignment_callback(partitions: Mapping[Partition, int]) -> None: assert partitions == {Partition(topic, 0): messages[0].offset} - # consumer.seek({Partition(topic, 0): messages[1].offset}) + consumer.seek({Partition(topic, 0): messages[1].offset}) - # with pytest.raises(ConsumerError): - # consumer.seek({Partition(topic, 1): 0}) + with pytest.raises(ConsumerError): + consumer.seek({Partition(topic, 1): 0}) assignment_callback = mock.Mock(side_effect=_assignment_callback) @@ -77,14 +77,14 @@ def _revocation_callback(partitions: Sequence[Partition]) -> None: with assert_changes( lambda: assignment_callback.called, False, True ), assert_changes( - consumer.tell, {}, {Partition(topic, 0): messages[1].offset} + consumer.tell, {}, {Partition(topic, 0): messages[1].next_offset} ): message = consumer.poll(10.0) # XXX: getting the subcription is slow assert isinstance(message, Message) assert message.partition == Partition(topic, 0) - assert message.offset == messages[0].offset - assert message.payload == messages[0].payload + assert message.offset == messages[1].offset + assert message.payload == messages[1].payload consumer.seek({Partition(topic, 0): messages[0].offset}) assert consumer.tell() == {Partition(topic, 0): messages[0].offset} diff --git a/tests/backends/test_kafka.py b/tests/backends/test_kafka.py index d2a6c586..a8fbb8a6 100644 --- a/tests/backends/test_kafka.py +++ b/tests/backends/test_kafka.py @@ -163,7 +163,7 @@ def test_cooperative_rebalancing() -> None: "group.id": group_id, "session.timeout.ms": 10000, }, - incremental=True, + incremental_cooperative=True, ) consumer_b = KafkaConsumer( { @@ -174,7 +174,7 @@ def test_cooperative_rebalancing() -> None: "group.id": group_id, "session.timeout.ms": 10000, }, - incremental=True, + incremental_cooperative=True, ) with get_topic(configuration, partitions_count) as topic, closing( From b86f5aad2da752cfc224c5f13d45976d4597d6ef Mon Sep 17 00:00:00 2001 From: Lyn Date: Thu, 31 Mar 2022 10:41:28 -0700 Subject: [PATCH 04/10] set the error status on exception, resume on assignment --- arroyo/backends/kafka/consumer.py | 39 ++++++++++++++++++++----------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/arroyo/backends/kafka/consumer.py b/arroyo/backends/kafka/consumer.py index b63c20eb..0acb33dd 100644 --- a/arroyo/backends/kafka/consumer.py +++ b/arroyo/backends/kafka/consumer.py @@ -253,24 +253,35 @@ def assignment_callback( self.__state = KafkaConsumerState.ASSIGNING if self.__incremental_cooperative is True: - incremental_assignment: MutableSequence[ConfluentTopicPartition] = [] + try: + incremental_assignment: MutableSequence[ + ConfluentTopicPartition + ] = [] - for partition in partitions: - if partition.offset >= 0: - incremental_assignment.append(partition) - elif partition.offset == OFFSET_INVALID: - incremental_assignment.append( - self.__resolve_partition_starting_offset(partition) - ) - else: - raise ValueError("received unexpected offset") + for partition in partitions: + if partition.offset >= 0: + incremental_assignment.append(partition) + elif partition.offset == OFFSET_INVALID: + incremental_assignment.append( + self.__resolve_partition_starting_offset(partition) + ) + else: + raise ValueError("received unexpected offset") - self.__incremental_assign( - { + offsets = { Partition(Topic(i.topic), i.partition): i.offset for i in incremental_assignment } - ) + + self.__incremental_assign(offsets) + + # Ensure that all partitions are resumed on assignment to avoid + # carrying over state from a previous assignment. + self.resume([p for p in offsets]) + + except Exception: + self.__state = KafkaConsumerState.ERROR + raise else: try: @@ -286,7 +297,7 @@ def assignment_callback( else: raise ValueError("received unexpected offset") - offsets: MutableMapping[Partition, int] = { + offsets = { Partition(Topic(i.topic), i.partition): i.offset for i in assignment } From ba121631b31e090dffa3274ef9638b4f3188c0a3 Mon Sep 17 00:00:00 2001 From: Lyn Date: Thu, 31 Mar 2022 10:54:51 -0700 Subject: [PATCH 05/10] produce to both partitions --- tests/backends/test_kafka.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/backends/test_kafka.py b/tests/backends/test_kafka.py index a8fbb8a6..0b6378be 100644 --- a/tests/backends/test_kafka.py +++ b/tests/backends/test_kafka.py @@ -183,7 +183,7 @@ def test_cooperative_rebalancing() -> None: for i in range(10): for j in range(partitions_count): producer.produce( - Partition(topic, 1), + Partition(topic, j), KafkaPayload(None, f"{j}-{i}".encode("utf8"), []), ) From a7cfb465285bf721c715681cce057c5d65fd0339 Mon Sep 17 00:00:00 2001 From: Lyn Date: Thu, 31 Mar 2022 11:53:24 -0700 Subject: [PATCH 06/10] assert each consumer got a message --- tests/backends/test_kafka.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tests/backends/test_kafka.py b/tests/backends/test_kafka.py index 0b6378be..e400fb0d 100644 --- a/tests/backends/test_kafka.py +++ b/tests/backends/test_kafka.py @@ -151,7 +151,7 @@ def test_cooperative_rebalancing() -> None: partitions_count = 2 - group_id = uuid.uuid1().hex + group_id = uuid.uuid4().hex producer = KafkaProducer(configuration) consumer_a = KafkaConsumer( @@ -196,9 +196,9 @@ def test_cooperative_rebalancing() -> None: assert len(consumer_b.tell()) == 0 consumer_b.subscribe([topic]) - consumer_a.pause([Partition(topic, 0), Partition(topic, 1)]) # At some point, 1 partition will move to consumer B + consumer_a.pause([p for p in consumer_a.tell()]) for i in range(10): assert consumer_a.poll(0) is None # attempt to force session timeout if consumer_b.poll(1.0) is not None: @@ -207,6 +207,11 @@ def test_cooperative_rebalancing() -> None: assert len(consumer_a.tell()) == 1 assert len(consumer_b.tell()) == 1 + # Resume A and assert that both consumer_a and consumer_b are getting messages + consumer_a.resume([p for p in consumer_a.tell()]) + assert consumer_a.poll(1.0) is not None + assert consumer_b.poll(1.0) is not None + def test_commit_codec() -> None: commit = Commit("group", Partition(Topic("topic"), 0), 0, datetime.now()) From 5c793b174e3c34d717c6a8c7cb074e7529028fce Mon Sep 17 00:00:00 2001 From: Lyn Date: Thu, 31 Mar 2022 12:01:44 -0700 Subject: [PATCH 07/10] inline __seek function --- arroyo/backends/kafka/consumer.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/arroyo/backends/kafka/consumer.py b/arroyo/backends/kafka/consumer.py index 0acb33dd..5c3b43ae 100644 --- a/arroyo/backends/kafka/consumer.py +++ b/arroyo/backends/kafka/consumer.py @@ -491,15 +491,6 @@ def __incremental_assign(self, offsets: Mapping[Partition, int]) -> None: ) self.__offsets.update(offsets) - def __seek(self, offsets: Mapping[Partition, int]) -> None: - self.__validate_offsets(offsets) - - for partition, offset in offsets.items(): - self.__consumer.seek( - ConfluentTopicPartition(partition.topic.name, partition.index, offset) - ) - self.__offsets.update(offsets) - def seek(self, offsets: Mapping[Partition, int]) -> None: """ Change the read offsets for the provided partitions. @@ -512,7 +503,13 @@ def seek(self, offsets: Mapping[Partition, int]) -> None: if offsets.keys() - self.__offsets.keys(): raise ConsumerError("cannot seek on unassigned partitions") - self.__seek(offsets) + self.__validate_offsets(offsets) + + for partition, offset in offsets.items(): + self.__consumer.seek( + ConfluentTopicPartition(partition.topic.name, partition.index, offset) + ) + self.__offsets.update(offsets) def pause(self, partitions: Sequence[Partition]) -> None: """ From 02af23dac4ee718a24409c30449579102e2f7783 Mon Sep 17 00:00:00 2001 From: Lyn Date: Thu, 31 Mar 2022 12:04:23 -0700 Subject: [PATCH 08/10] reuse the resume function --- arroyo/backends/kafka/consumer.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/arroyo/backends/kafka/consumer.py b/arroyo/backends/kafka/consumer.py index 5c3b43ae..3dd96521 100644 --- a/arroyo/backends/kafka/consumer.py +++ b/arroyo/backends/kafka/consumer.py @@ -306,17 +306,8 @@ def assignment_callback( # Ensure that all partitions are resumed on assignment to avoid # carrying over state from a previous assignment. - self.__consumer.resume( - [ - ConfluentTopicPartition( - partition.topic.name, partition.index, offset - ) - for partition, offset in offsets.items() - ] - ) + self.resume([p for p in offsets]) - for partition in offsets: - self.__paused.discard(partition) except Exception: self.__state = KafkaConsumerState.ERROR raise From 62ad5375b88d506522be23a3e4f54f7fbd27f604 Mon Sep 17 00:00:00 2001 From: Lyn Date: Thu, 31 Mar 2022 12:44:37 -0700 Subject: [PATCH 09/10] pass strategy via configuration --- arroyo/backends/kafka/consumer.py | 10 ++++------ tests/backends/test_kafka.py | 4 ++-- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/arroyo/backends/kafka/consumer.py b/arroyo/backends/kafka/consumer.py index 3dd96521..6a9bb597 100644 --- a/arroyo/backends/kafka/consumer.py +++ b/arroyo/backends/kafka/consumer.py @@ -143,10 +143,9 @@ class KafkaConsumer(Consumer[KafkaPayload]): def __init__( self, - configuration: MutableMapping[str, Any], + configuration: Mapping[str, Any], *, commit_retry_policy: Optional[RetryPolicy] = None, - incremental_cooperative: bool = False, ) -> None: if commit_retry_policy is None: commit_retry_policy = NoRetryPolicy() @@ -183,10 +182,9 @@ def __init__( "invalid value for 'enable.auto.offset.store' configuration" ) - self.__incremental_cooperative = incremental_cooperative - - if self.__incremental_cooperative is True: - configuration["partition.assignment.strategy"] = "cooperative-sticky" + self.__incremental_cooperative = ( + configuration.get("partition.assignment.strategy") == "cooperative-sticky" + ) # NOTE: Offsets are explicitly managed as part of the assignment # callback, so preemptively resetting offsets is not enabled. diff --git a/tests/backends/test_kafka.py b/tests/backends/test_kafka.py index e400fb0d..55388ed4 100644 --- a/tests/backends/test_kafka.py +++ b/tests/backends/test_kafka.py @@ -157,24 +157,24 @@ def test_cooperative_rebalancing() -> None: consumer_a = KafkaConsumer( { **configuration, + "partition.assignment.strategy": "cooperative-sticky", "auto.offset.reset": "earliest", "enable.auto.commit": False, "enable.auto.offset.store": False, "group.id": group_id, "session.timeout.ms": 10000, }, - incremental_cooperative=True, ) consumer_b = KafkaConsumer( { **configuration, + "partition.assignment.strategy": "cooperative-sticky", "auto.offset.reset": "earliest", "enable.auto.commit": False, "enable.auto.offset.store": False, "group.id": group_id, "session.timeout.ms": 10000, }, - incremental_cooperative=True, ) with get_topic(configuration, partitions_count) as topic, closing( From 7001ec9268742c0f2736eda6700e4ac8244b3563 Mon Sep 17 00:00:00 2001 From: Lyn Date: Fri, 1 Apr 2022 10:13:30 -0700 Subject: [PATCH 10/10] update docstring --- arroyo/backends/kafka/consumer.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/arroyo/backends/kafka/consumer.py b/arroyo/backends/kafka/consumer.py index 6a9bb597..6b615faa 100644 --- a/arroyo/backends/kafka/consumer.py +++ b/arroyo/backends/kafka/consumer.py @@ -106,7 +106,8 @@ def as_kafka_configuration_bool(value: Any) -> bool: class KafkaConsumer(Consumer[KafkaPayload]): """ - The behavior of this consumer differs slightly from the Confluent + If a non-cooperative partition assignment strategy is selected, + the behavior of this consumer differs slightly from the Confluent consumer during rebalancing operations. Whenever a partition is assigned to this consumer, offsets are *always* automatically reset to the committed offset for that partition (or if no offsets have been committed @@ -117,6 +118,12 @@ class KafkaConsumer(Consumer[KafkaPayload]): prevent uncommitted messages from being consumed multiple times, ``commit`` should be called in the partition revocation callback. + If the `cooperative-sticky` strategy is used, this won't happen as + only the incremental partitions are passed to the callback during + rebalancing, and any previously assigned partitions will continue + from their previous position without being reset to the last committed + position. + The behavior of ``auto.offset.reset`` also differs slightly from the Confluent consumer as well: offsets are only reset during initial assignment or subsequent rebalancing operations. Any other circumstances