Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
lynnagara committed Jan 8, 2025
1 parent b2ac7c2 commit 13e9ee0
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 34 deletions.
17 changes: 6 additions & 11 deletions arroyo/backends/kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,12 @@ def __init__(
KafkaError.REQUEST_TIMED_OUT,
KafkaError.NOT_COORDINATOR,
KafkaError._WAIT_COORD,
KafkaError.STALE_MEMBER_EPOCH # kip-848
),
)

configuration = dict(configuration)
self.__is_incremental = (
configuration.get("partition.assignment.strategy") == "cooperative-sticky"
or configuration.get("group.protocol") == "consumer"
)
self.__is_cooperative_sticky = configuration.get("partition.assignment.strategy") == "cooperative-sticky"
auto_offset_reset = configuration.get("auto.offset.reset", "largest")

# This is a special flag that controls the auto offset behavior for
Expand Down Expand Up @@ -317,16 +315,13 @@ def revocation_callback(
) -> None:
self.__state = KafkaConsumerState.REVOKING

arroyo_partitions = [Partition(Topic(i.topic), i.partition) for i in partitions]
partitions = [Partition(Topic(i.topic), i.partition) for i in partitions]

try:
if on_revoke is not None:
on_revoke(arroyo_partitions)
on_revoke(partitions)
finally:
if self.__is_incremental:
self.__consumer.incremental_unassign(partitions)

for partition in arroyo_partitions:
for partition in partitions:
# Staged offsets are deleted during partition revocation to
# prevent later committing offsets for partitions that are
# no longer owned by this consumer.
Expand Down Expand Up @@ -466,7 +461,7 @@ def __assign(self, offsets: Mapping[Partition, int]) -> None:
ConfluentTopicPartition(partition.topic.name, partition.index, offset)
for partition, offset in offsets.items()
]
if self.__is_incremental:
if self.__is_cooperative_sticky:
self.__consumer.incremental_assign(partitions)
else:
self.__consumer.assign(partitions)
Expand Down
24 changes: 1 addition & 23 deletions tests/test_kip848_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import contextlib
from contextlib import closing
import os
import signal
import threading
import logging
from typing import Iterator, Mapping

Expand Down Expand Up @@ -73,7 +71,7 @@ def create_with_partitions(self, commit: Commit, partitions: Mapping[Partition,

with closing(producer):
for i in range(30):
message = KafkaPayload(None, i.to_bytes(1, "big"), [])
message = KafkaPayload(None, i.to_bytes(), [])
producer.produce(topic, message).result()

consumer_config = build_kafka_consumer_configuration(
Expand All @@ -82,7 +80,6 @@ def create_with_partitions(self, commit: Commit, partitions: Mapping[Partition,
)

consumer_config["group.protocol"] = "consumer"
consumer_config["group.remote.assignor"] = "range"
consumer_config.pop("session.timeout.ms", None)
consumer_config.pop("max.poll.interval.ms", None)
consumer_config.pop("partition.assignment.strategy", None)
Expand All @@ -95,23 +92,4 @@ def create_with_partitions(self, commit: Commit, partitions: Mapping[Partition,
consumer=consumer, topic=Topic(TOPIC), processor_factory=Factory()
)

def handler(signum: int, frame: Any) -> None:
processor.signal_shutdown()

signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)

def shutdown() -> None:
for i in range(100):
time.sleep(0.1)
if counter == 30:
break
print("shutting down")
processor.signal_shutdown()

t = threading.Thread(target=shutdown, daemon=True)
t.start()

processor.run()

assert counter == 30

0 comments on commit 13e9ee0

Please sign in to comment.