From 601e594458ed1b2b5f2ae22b5d47e535ca2a3a2b Mon Sep 17 00:00:00 2001 From: Lyn Date: Thu, 21 Apr 2022 16:17:55 -0700 Subject: [PATCH] feat: Avoid unnecessarily recreating processing strategy If the assignment or revocation callback is called with no partitions we don't need to close and recreate the processing strategy. This happens frequently with incremental rebalancing. --- arroyo/processing/processor.py | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/arroyo/processing/processor.py b/arroyo/processing/processor.py index 7be94509..edf95446 100644 --- a/arroyo/processing/processor.py +++ b/arroyo/processing/processor.py @@ -79,21 +79,24 @@ def _create_strategy() -> None: def on_partitions_assigned(partitions: Mapping[Partition, int]) -> None: logger.info("New partitions assigned: %r", partitions) - if self.__processing_strategy is not None: - _close_strategy() - _create_strategy() + if partitions: + if self.__processing_strategy is not None: + _close_strategy() + _create_strategy() def on_partitions_revoked(partitions: Sequence[Partition]) -> None: logger.info("Partitions revoked: %r", partitions) - _close_strategy() - # Recreate the strategy if the consumer still has other partitions - # assigned and is not closed or errored - try: - if self.__consumer.tell().keys() - set(partitions): - _create_strategy() - except RuntimeError: - pass + if partitions: + _close_strategy() + + # Recreate the strategy if the consumer still has other partitions + # assigned and is not closed or errored + try: + if self.__consumer.tell().keys() - set(partitions): + _create_strategy() + except RuntimeError: + pass self.__consumer.subscribe( [topic], on_assign=on_partitions_assigned, on_revoke=on_partitions_revoked