From 5e799c2e6746745da07e67ae35fa6abf3279bb0c Mon Sep 17 00:00:00 2001 From: Bob Haddleton Date: Fri, 5 Feb 2021 15:38:26 -0600 Subject: [PATCH] fix commit exceptions (#94) (#95) * fix partition assignment issues (#93) and commit exceptions (#94) * fix partition assignment issues (#93) and commit exceptions (#94) * fix black formatting * fix unused variable in test * fix unit test * fix unused import * revert partition assignor changes --- faust/transport/consumer.py | 12 ++++++++++-- faust/transport/drivers/aiokafka.py | 6 ++++-- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/faust/transport/consumer.py b/faust/transport/consumer.py index 1037c890f..baad160df 100644 --- a/faust/transport/consumer.py +++ b/faust/transport/consumer.py @@ -613,6 +613,14 @@ async def on_partitions_revoked(self, revoked: Set[TP]) -> None: if self._active_partitions is not None: self._active_partitions.difference_update(revoked) self._paused_partitions.difference_update(revoked) + # Remove the revoked partitions from local data structures + for tp in revoked: + self._gap.pop(tp, None) + self._acked.pop(tp, None) + self._acked_index.pop(tp, None) + self._read_offset.pop(tp, None) + self._committed_offset.pop(tp, None) + await T(self._on_partitions_revoked, partitions=revoked)(revoked) @Service.transitions_to(CONSUMER_PARTITIONS_ASSIGNED) @@ -1019,11 +1027,11 @@ def _new_offset(self, tp: TP) -> Optional[int]: # then return the offset before that. # For example if acked[tp] is: # 1 2 3 4 5 6 7 8 9 - # the return value will be: 9 + # the return value will be: 10 # If acked[tp] is: # 34 35 36 40 41 42 43 44 # ^--- gap - # the return value will be: 36 + # the return value will be: 37 if acked: max_offset = max(acked) gap_for_tp = self._gap[tp] diff --git a/faust/transport/drivers/aiokafka.py b/faust/transport/drivers/aiokafka.py index d05511de4..6351c3c1f 100644 --- a/faust/transport/drivers/aiokafka.py +++ b/faust/transport/drivers/aiokafka.py @@ -663,9 +663,11 @@ async def _commit(self, offsets: Mapping[TP, int]) -> bool: now = monotonic() try: aiokafka_offsets = { - tp: OffsetAndMetadata(offset, "") for tp, offset in offsets.items() + tp: OffsetAndMetadata(offset, "") + for tp, offset in offsets.items() + if tp in self.assignment() } - self.tp_last_committed_at.update({tp: now for tp in offsets}) + self.tp_last_committed_at.update({tp: now for tp in aiokafka_offsets}) await consumer.commit(aiokafka_offsets) except CommitFailedError as exc: if "already rebalanced" in str(exc):