diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 00000000..02eb405f --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,60 @@ +# copied from https://github.com/confluentinc/kafka-images/blob/ec53840b3673a039a9eae6623f773399ad50c31c/examples/kafka-cluster/docker-compose.yml +# major changes: +# - use getsentry's image mirrors to avoid docker rate limits +# - avoid network_mode: host due to limitations on macos +# - trim down on zookeeper nodes (just for simplicity, might want to add it +# later to test something more specific) +--- +version: '3' +name: arroyo + +x-zookeeper-common: &zookeeper-common + image: ghcr.io/getsentry/image-mirror-confluentinc-cp-zookeeper:6.2.0 + +x-zookeeper-common-env: &zookeeper-common-env + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + ZOOKEEPER_INIT_LIMIT: 5 + ZOOKEEPER_SYNC_LIMIT: 2 + +x-kafka-common: &kafka-common + image: "ghcr.io/getsentry/image-mirror-confluentinc-cp-kafka:6.2.0" + depends_on: + - zookeeper-1 + +x-kafka-common-env: &kafka-common-env + KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT" + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_LISTENERS: INTERNAL://0.0.0.0:9093,EXTERNAL://0.0.0.0:9092 + +services: + zookeeper-1: + <<: *zookeeper-common + environment: + <<: *zookeeper-common-env + ZOOKEEPER_SERVER_ID: 1 + + kafka-1: + <<: *kafka-common + ports: ["19092:9092"] + environment: + <<: *kafka-common-env + KAFKA_BROKER_ID: 1 + KAFKA_ADVERTISED_LISTENERS: EXTERNAL://127.0.0.1:19092,INTERNAL://kafka-1:9093 + + kafka-2: + <<: *kafka-common + ports: ["29092:9092"] + environment: + <<: *kafka-common-env + KAFKA_BROKER_ID: 2 + KAFKA_ADVERTISED_LISTENERS: EXTERNAL://127.0.0.1:29092,INTERNAL://kafka-2:9093 + + kafka-3: + <<: *kafka-common + ports: ["39092:9092"] + environment: + <<: *kafka-common-env + KAFKA_BROKER_ID: 3 + KAFKA_ADVERTISED_LISTENERS: EXTERNAL://127.0.0.1:39092,INTERNAL://kafka-3:9093 diff --git a/tests/backends/test_kafka.py b/tests/backends/test_kafka.py index 296766b6..4c82c1e4 100644 --- a/tests/backends/test_kafka.py +++ b/tests/backends/test_kafka.py @@ -2,6 +2,7 @@ import itertools import os import pickle +import subprocess import time import uuid from contextlib import closing @@ -14,7 +15,10 @@ from arroyo.backends.kafka import KafkaConsumer, KafkaPayload, KafkaProducer from arroyo.backends.kafka.commit import CommitCodec -from arroyo.backends.kafka.configuration import build_kafka_configuration +from arroyo.backends.kafka.configuration import ( + KafkaBrokerConfig, + build_kafka_configuration, +) from arroyo.backends.kafka.consumer import as_kafka_configuration_bool from arroyo.commit import IMMEDIATE, Commit from arroyo.errors import ConsumerError, EndOfPartition @@ -70,10 +74,26 @@ def get_topic( class TestKafkaStreams(StreamsTestMixin[KafkaPayload]): - - configuration = build_kafka_configuration( - {"bootstrap.servers": os.environ.get("DEFAULT_BROKERS", "localhost:9092")} - ) + distributed = False + + @property + def configuration(self) -> KafkaBrokerConfig: + if self.distributed: + return build_kafka_configuration( + { + "bootstrap.servers": os.environ.get( + "DEFAULT_DISTRIBUTED_BROKERS", "127.0.0.1:19092" + ) + } + ) + else: + return build_kafka_configuration( + { + "bootstrap.servers": os.environ.get( + "DEFAULT_BROKERS", "127.0.0.1:9092" + ) + } + ) @contextlib.contextmanager def get_topic(self, partitions: int = 1) -> Iterator[Topic]: @@ -193,6 +213,32 @@ def test_consumer_stream_processor_shutdown(self) -> None: with pytest.raises(RuntimeError): processor.run() + def test_kafka_broker_died(self) -> None: + self.distributed = True + + with self.get_topic(partitions=32) as topic: + with closing(self.get_producer()) as producer: + payloads = self.get_payloads() + for _ in range(100): + producer.produce(topic, next(payloads)).result(5.0) + + with closing(self.get_consumer(auto_offset_reset="earliest")) as consumer: + consumer.subscribe([topic]) + + value = consumer.poll(10.0) + assert isinstance(value, BrokerValue) + + subprocess.check_call(["docker", "kill", "arroyo-kafka-1-1"]) + subprocess.check_call(["docker", "kill", "arroyo-kafka-2-1"]) + + consumer.stage_offsets(value.committable) + + with pytest.raises(Exception): + consumer.commit_offsets() + + with pytest.raises(Exception): + consumer.poll(10.0) + def test_commit_codec() -> None: commit = Commit(