From 6d3b11a1f8bb317bfeb10891910e9a7455f33b35 Mon Sep 17 00:00:00 2001 From: Vikram Patki 24489 Date: Fri, 28 Oct 2022 11:33:42 -0400 Subject: [PATCH] using aiokafka's default partition strategy when table_standy_replicas is 0 --- faust/transport/drivers/aiokafka.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/faust/transport/drivers/aiokafka.py b/faust/transport/drivers/aiokafka.py index 003f0170f..ce08c771b 100644 --- a/faust/transport/drivers/aiokafka.py +++ b/faust/transport/drivers/aiokafka.py @@ -36,6 +36,7 @@ from aiokafka.structs import OffsetAndMetadata, TopicPartition as _TopicPartition from aiokafka.util import parse_kafka_version from kafka import TopicPartition +from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor from kafka.errors import ( NotControllerError, TopicAlreadyExistsError as TopicExistsError, @@ -495,7 +496,11 @@ def _create_worker_consumer( conf = self.app.conf if self.consumer.in_transaction: isolation_level = "read_committed" - self._assignor = self.app.assignor + self._assignor = ( + self.app.assignor + if self.app.conf.table_standby_replicas > 0 + else RoundRobinPartitionAssignor + ) auth_settings = credentials_to_aiokafka_auth( conf.broker_credentials, conf.ssl_context )