Skip to content

Commit

Permalink
test(kafka): Add test for partial cluster failure
Browse files Browse the repository at this point in the history
For INC-599, in followup SNS-2603, add a test that ensures that the
consumer reliably crashes when a broker dies. I will work a bit more on
the CI setup, but basically you need to run `docker-compose up`, run
that one test using `pytest  tests/backends/test_kafka.py -k died -s`,
and observe that it passes.

This is a problem though because it means we cannot reproduce the
failure we have seen in that incident, where the consumer hangs after
broker failure.
  • Loading branch information
untitaker committed Jan 19, 2024
1 parent 7f2eed8 commit 13e9bf3
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 5 deletions.
60 changes: 60 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# copied from https://github.com/confluentinc/kafka-images/blob/ec53840b3673a039a9eae6623f773399ad50c31c/examples/kafka-cluster/docker-compose.yml
# major changes:
# - use getsentry's image mirrors to avoid docker rate limits
# - avoid network_mode: host due to limitations on macos
# - trim down on zookeeper nodes (just for simplicity, might want to add it
# later to test something more specific)
---
version: '3'
name: arroyo

x-zookeeper-common: &zookeeper-common
image: ghcr.io/getsentry/image-mirror-confluentinc-cp-zookeeper:6.2.0

x-zookeeper-common-env: &zookeeper-common-env
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2

x-kafka-common: &kafka-common
image: "ghcr.io/getsentry/image-mirror-confluentinc-cp-kafka:6.2.0"
depends_on:
- zookeeper-1

x-kafka-common-env: &kafka-common-env
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_LISTENERS: INTERNAL://0.0.0.0:9093,EXTERNAL://0.0.0.0:9092

services:
zookeeper-1:
<<: *zookeeper-common
environment:
<<: *zookeeper-common-env
ZOOKEEPER_SERVER_ID: 1

kafka-1:
<<: *kafka-common
ports: ["19092:9092"]
environment:
<<: *kafka-common-env
KAFKA_BROKER_ID: 1
KAFKA_ADVERTISED_LISTENERS: EXTERNAL://127.0.0.1:19092,INTERNAL://kafka-1:9093

kafka-2:
<<: *kafka-common
ports: ["29092:9092"]
environment:
<<: *kafka-common-env
KAFKA_BROKER_ID: 2
KAFKA_ADVERTISED_LISTENERS: EXTERNAL://127.0.0.1:29092,INTERNAL://kafka-2:9093

kafka-3:
<<: *kafka-common
ports: ["39092:9092"]
environment:
<<: *kafka-common-env
KAFKA_BROKER_ID: 3
KAFKA_ADVERTISED_LISTENERS: EXTERNAL://127.0.0.1:39092,INTERNAL://kafka-3:9093
56 changes: 51 additions & 5 deletions tests/backends/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import itertools
import os
import pickle
import subprocess
import time
import uuid
from contextlib import closing
Expand All @@ -14,7 +15,10 @@

from arroyo.backends.kafka import KafkaConsumer, KafkaPayload, KafkaProducer
from arroyo.backends.kafka.commit import CommitCodec
from arroyo.backends.kafka.configuration import build_kafka_configuration
from arroyo.backends.kafka.configuration import (
KafkaBrokerConfig,
build_kafka_configuration,
)
from arroyo.backends.kafka.consumer import as_kafka_configuration_bool
from arroyo.commit import IMMEDIATE, Commit
from arroyo.errors import ConsumerError, EndOfPartition
Expand Down Expand Up @@ -70,10 +74,26 @@ def get_topic(


class TestKafkaStreams(StreamsTestMixin[KafkaPayload]):

configuration = build_kafka_configuration(
{"bootstrap.servers": os.environ.get("DEFAULT_BROKERS", "localhost:9092")}
)
distributed = False

@property
def configuration(self) -> KafkaBrokerConfig:
if self.distributed:
return build_kafka_configuration(
{
"bootstrap.servers": os.environ.get(
"DEFAULT_DISTRIBUTED_BROKERS", "127.0.0.1:19092"
)
}
)
else:
return build_kafka_configuration(
{
"bootstrap.servers": os.environ.get(
"DEFAULT_BROKERS", "127.0.0.1:9092"
)
}
)

@contextlib.contextmanager
def get_topic(self, partitions: int = 1) -> Iterator[Topic]:
Expand Down Expand Up @@ -193,6 +213,32 @@ def test_consumer_stream_processor_shutdown(self) -> None:
with pytest.raises(RuntimeError):
processor.run()

def test_kafka_broker_died(self) -> None:
self.distributed = True

with self.get_topic(partitions=32) as topic:
with closing(self.get_producer()) as producer:
payloads = self.get_payloads()
for _ in range(100):
producer.produce(topic, next(payloads)).result(5.0)

with closing(self.get_consumer(auto_offset_reset="earliest")) as consumer:
consumer.subscribe([topic])

value = consumer.poll(10.0)
assert isinstance(value, BrokerValue)

subprocess.check_call(["docker", "kill", "arroyo-kafka-1-1"])
subprocess.check_call(["docker", "kill", "arroyo-kafka-2-1"])

consumer.stage_offsets(value.committable)

with pytest.raises(Exception):
consumer.commit_offsets()

with pytest.raises(Exception):
consumer.poll(10.0)


def test_commit_codec() -> None:
commit = Commit(
Expand Down

0 comments on commit 13e9bf3

Please sign in to comment.