diff --git a/faust/transport/drivers/aiokafka.py b/faust/transport/drivers/aiokafka.py index 003f0170f..ce08c771b 100644 --- a/faust/transport/drivers/aiokafka.py +++ b/faust/transport/drivers/aiokafka.py @@ -36,6 +36,7 @@ from aiokafka.structs import OffsetAndMetadata, TopicPartition as _TopicPartition from aiokafka.util import parse_kafka_version from kafka import TopicPartition +from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor from kafka.errors import ( NotControllerError, TopicAlreadyExistsError as TopicExistsError, @@ -495,7 +496,11 @@ def _create_worker_consumer( conf = self.app.conf if self.consumer.in_transaction: isolation_level = "read_committed" - self._assignor = self.app.assignor + self._assignor = ( + self.app.assignor + if self.app.conf.table_standby_replicas > 0 + else RoundRobinPartitionAssignor + ) auth_settings = credentials_to_aiokafka_auth( conf.broker_credentials, conf.ssl_context )