Skip to content

Commit

Permalink
fix: Rebalancing fixes for kip-848 (#419)
Browse files Browse the repository at this point in the history
Changes to support kip-848
- previously we were calling the wrong api if `group.protocol=consumer` was used. it should be consumer.assign() not consumer.incremental_assign()
- retry if we get the new stale member epoch error on commit
- adds tests
- the TestKafkaStreamsKip848 are now unskipped except for test_pause_resume_rebalancing which still needs to be fixed
- the kafka version running in CI is updated to support this
  • Loading branch information
lynnagara authored Jan 10, 2025
1 parent 472c657 commit 354a5b7
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 35 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:
- uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python }}
- name: Run Zookeeper and Kafka
- name: Run Kafka
run: sh scripts/run-kafka.sh
- name: Install dependencies
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/rust-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
steps:
- uses: actions/checkout@v3
name: Checkout code
- name: Run Zookeeper and Kafka
- name: Run Kafka
run: sh scripts/run-kafka.sh
- name: Run tests
run: cargo test
6 changes: 3 additions & 3 deletions arroyo/backends/kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,13 @@ def __init__(
KafkaError.REQUEST_TIMED_OUT,
KafkaError.NOT_COORDINATOR,
KafkaError._WAIT_COORD,
KafkaError.STALE_MEMBER_EPOCH, # kip-848
),
)

configuration = dict(configuration)
self.__is_incremental = (
self.__is_cooperative_sticky = (
configuration.get("partition.assignment.strategy") == "cooperative-sticky"
or configuration.get("group.protocol") == "consumer"
)
auto_offset_reset = configuration.get("auto.offset.reset", "largest")

Expand Down Expand Up @@ -463,7 +463,7 @@ def __assign(self, offsets: Mapping[Partition, int]) -> None:
ConfluentTopicPartition(partition.topic.name, partition.index, offset)
for partition, offset in offsets.items()
]
if self.__is_incremental:
if self.__is_cooperative_sticky:
self.__consumer.incremental_assign(partitions)
else:
self.__consumer.assign(partitions)
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
confluent-kafka>=2.3.0
confluent-kafka>=2.7.0
1 change: 1 addition & 0 deletions rust-arroyo/src/backends/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,7 @@ mod tests {
#[test]
fn test_tell() {
let topic = TestTopic::create("test-tell");
sleep(Duration::from_secs(1));
let configuration = KafkaConfig::new_consumer_config(
vec![std::env::var("DEFAULT_BROKERS").unwrap_or("127.0.0.1:9092".to_string())],
"my-group-1".to_string(),
Expand Down
26 changes: 13 additions & 13 deletions scripts/run-kafka.sh
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
#!/bin/sh

docker run \
--name sentry_zookeeper \
--name arroyo_kafka \
-d --network host \
-e ZOOKEEPER_CLIENT_PORT=2181 \
confluentinc/cp-zookeeper:6.2.0

docker run \
--name sentry_kafka \
-d --network host \
-e KAFKA_ZOOKEEPER_CONNECT=127.0.0.1:2181 \
-e KAFKA_LISTENERS=INTERNAL://0.0.0.0:9093,EXTERNAL://0.0.0.0:9092 \
-e KAFKA_ADVERTISED_LISTENERS=INTERNAL://127.0.0.1:9093,EXTERNAL://127.0.0.1:9092 \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT \
-e KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL \
-e KAFKA_PROCESS_ROLES=broker,controller \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093 \
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_NODE_ID=1 \
-e CLUSTER_ID=MkU3OEVBNTcwNTJENDM2Qk \
-e KAFKA_LISTENERS=PLAINTEXT://127.0.0.1:9092,CONTROLLER://127.0.0.1:9093 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT \
-e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
confluentinc/cp-kafka:6.2.0
-e KAFKA_GROUP_COORDINATOR_REBALANCE_PROTOCOLS=classic,consumer \
-e KAFKA_TRANSACTION_PARTITION_VERIFICATION_ENABLE=false \
confluentinc/cp-kafka:7.8.0
40 changes: 25 additions & 15 deletions tests/backends/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

class StreamsTestMixin(ABC, Generic[TStrategyPayload]):
cooperative_sticky = False
kip_848 = False

@abstractmethod
def get_topic(self, partitions: int = 1) -> ContextManager[Topic]:
Expand Down Expand Up @@ -421,7 +422,7 @@ def test_pause_resume_rebalancing(self) -> None:
def wait_until_rebalancing(
from_consumer: Consumer[Any], to_consumer: Consumer[Any]
) -> None:
for _ in range(10):
for _ in range(20):
assert from_consumer.poll(0) is None
if to_consumer.poll(1.0) is not None:
return
Expand Down Expand Up @@ -453,9 +454,10 @@ def wait_until_rebalancing(

wait_until_rebalancing(consumer_a, consumer_b)

if self.cooperative_sticky:
if self.cooperative_sticky or self.kip_848:
# within incremental rebalancing, only one partition should have been reassigned to the consumer_b, and consumer_a should remain paused
assert consumer_a.paused() == [Partition(topic, 1)]
# Either partition 0 or 1 might be the paused one
assert len(consumer_a.paused()) == 1
assert consumer_a.poll(10.0) is None
else:
# The first consumer should have had its offsets rolled back, as
Expand All @@ -481,20 +483,28 @@ def wait_until_rebalancing(

assert len(consumer_b.tell()) == 2

if self.cooperative_sticky:
if self.cooperative_sticky or self.kip_848:
consumer_a_on_assign.assert_has_calls(
[
mock.call({Partition(topic, 0): 0, Partition(topic, 1): 0}),
]
)

assert consumer_a_on_assign.mock_calls == [
mock.call({Partition(topic, 0): 0, Partition(topic, 1): 0}),
]
assert consumer_a_on_revoke.mock_calls == [
mock.call([Partition(topic, 0)]),
mock.call([Partition(topic, 1)]),
]
consumer_a_on_revoke.assert_has_calls(
[
mock.call([Partition(topic, 0)]),
mock.call([Partition(topic, 1)]),
],
any_order=True,
)

assert consumer_b_on_assign.mock_calls == [
mock.call({Partition(topic, 0): 0}),
mock.call({Partition(topic, 1): 0}),
]
consumer_b_on_assign.assert_has_calls(
[
mock.call({Partition(topic, 0): 0}),
mock.call({Partition(topic, 1): 0}),
],
any_order=True,
)
assert consumer_b_on_revoke.mock_calls == []
else:
assert consumer_a_on_assign.mock_calls == [
Expand Down
5 changes: 4 additions & 1 deletion tests/backends/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,13 @@ class TestKafkaStreamsIncrementalRebalancing(TestKafkaStreams):
cooperative_sticky = True


@pytest.mark.skip("kip-848 not functional yet")
class TestKafkaStreamsKip848(TestKafkaStreams):
kip_848 = True

@pytest.mark.xfail(reason="To be fixed")
def test_pause_resume_rebalancing(self) -> None:
super().test_pause_resume_rebalancing()


def test_commit_codec() -> None:
commit = Commit(
Expand Down
114 changes: 114 additions & 0 deletions tests/test_kip848_e2e.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
from typing import Any

import time
import contextlib
from contextlib import closing
import os
import threading
import logging
from typing import Iterator, Mapping

from confluent_kafka.admin import AdminClient, NewTopic
from arroyo.types import Commit, Message, Partition, Topic
from arroyo.backends.kafka.configuration import build_kafka_consumer_configuration
from arroyo.backends.kafka.consumer import KafkaConsumer, KafkaPayload
from arroyo.processing.strategies import RunTask, CommitOffsets, ProcessingStrategy
from arroyo.processing.strategies.abstract import ProcessingStrategyFactory
from arroyo.processing.processor import StreamProcessor
from arroyo.backends.kafka import KafkaProducer

logging.basicConfig(level=logging.INFO)

TOPIC = "test-kip848"


@contextlib.contextmanager
def get_topic(
configuration: Mapping[str, Any], partitions_count: int
) -> Iterator[Topic]:
name = TOPIC
configuration = dict(configuration)
client = AdminClient(configuration)
[[key, future]] = client.create_topics(
[NewTopic(name, num_partitions=partitions_count, replication_factor=1)]
).items()
assert key == name
assert future.result() is None
try:
yield Topic(name)
finally:
[[key, future]] = client.delete_topics([name]).items()
assert key == name
assert future.result() is None


def test_kip848_e2e() -> None:
counter = 0

def print_msg(message: Message[Any]) -> Message[Any]:
nonlocal counter
((partition, offset),) = message.committable.items()
print(f"message: {partition.index}-{offset}")
counter += 1
return message

class Strat(RunTask[Any, Any]):
def join(self, *args: Any, **kwargs: Any) -> None:
print("joining strategy, sleeping 5 seconds")
time.sleep(5)
print("joining strategy, sleeping 5 seconds -- DONE")
return super().join(*args, **kwargs)

class Factory(ProcessingStrategyFactory[KafkaPayload]):
def create_with_partitions(
self, commit: Commit, partitions: Mapping[Partition, int]
) -> ProcessingStrategy[KafkaPayload]:
print("assign: ", [p.index for p in partitions])
return Strat(print_msg, CommitOffsets(commit))

default_config = {
"bootstrap.servers": os.environ.get("DEFAULT_BROKERS", "localhost:9092")
}

with get_topic(default_config, 2) as topic:
producer = KafkaProducer(default_config)

with closing(producer):
for i in range(30):
message = KafkaPayload(None, i.to_bytes(1, "big"), [])
producer.produce(topic, message).result()

consumer_config = build_kafka_consumer_configuration(
default_config,
group_id="kip848",
)

consumer_config["group.protocol"] = "consumer"
consumer_config.pop("session.timeout.ms", None)
consumer_config.pop("max.poll.interval.ms", None)
consumer_config.pop("partition.assignment.strategy", None)
consumer_config.pop("group.protocol.type", None)
consumer_config.pop("heartbeat.interval.ms", None)

consumer = KafkaConsumer(consumer_config)

processor = StreamProcessor(
consumer=consumer, topic=Topic(TOPIC), processor_factory=Factory()
)

def shutdown() -> None:
for i in range(100):
time.sleep(0.1)
if counter == 30:
break
print("shutting down")
processor.signal_shutdown()

t = threading.Thread(target=shutdown)
t.start()

processor.run()

assert counter == 30

t.join()

0 comments on commit 354a5b7

Please sign in to comment.