From be1e6dbe2507fd068a40391dcd2ddd436372661e Mon Sep 17 00:00:00 2001 From: Vikram Patki <54442035+patkivikram@users.noreply.github.com> Date: Mon, 30 Nov 2020 11:14:27 -0500 Subject: [PATCH] Fix recovery issue in transaction and reprocessing message in consumer (#49) * Fixing issues https://github.com/faust-streaming/faust/issues/47 and https://github.com/faust-streaming/faust/issues/48 * fix linting --- faust/tables/recovery.py | 29 ++++++++++++++++++++++++++--- faust/transport/consumer.py | 17 +++++++++++------ 2 files changed, 37 insertions(+), 9 deletions(-) diff --git a/faust/tables/recovery.py b/faust/tables/recovery.py index ea03d3214..e83e6942f 100644 --- a/faust/tables/recovery.py +++ b/faust/tables/recovery.py @@ -712,7 +712,12 @@ async def _slurp_changelogs(self) -> None: buffer_sizes = self.buffer_sizes processing_times = self._processing_times - def _maybe_signal_recovery_end() -> None: + async def _maybe_signal_recovery_end(timeout=False, timeout_count=0) -> None: + # lets wait at least 2 consecutive cycles for the queue to be + # empty to avoid race conditions between + # the aiokafka consumer position and draining of the queue + if timeout and self.app.in_transaction and timeout_count > 1: + await detect_aborted_tx() if not self.active_remaining_total(): # apply anything stuck in the buffers self.flush_buffers() @@ -722,6 +727,20 @@ def _maybe_signal_recovery_end() -> None: logger.debug("Setting recovery end") self.signal_recovery_end.set() + async def detect_aborted_tx(): + highwaters = self.active_highwaters + offsets = self.active_offsets + for tp, highwater in highwaters.items(): + if ( + highwater is not None + and offsets[tp] is not None + and offsets[tp] < highwater + ): + if await self.app.consumer.position(tp) >= highwater: + logger.info(f"Aborted tx until highwater for {tp}") + offsets[tp] = highwater + + timeout_count = 0 while not self.should_stop: self.signal_recovery_end.clear() try: @@ -729,11 +748,15 @@ def _maybe_signal_recovery_end() -> None: changelog_queue.get(), timeout=5.0 ) except asyncio.TimeoutError: + timeout_count += 1 if self.should_stop: return - _maybe_signal_recovery_end() + await _maybe_signal_recovery_end( + timeout=True, timeout_count=timeout_count + ) continue now = monotonic() + timeout_count = 0 message = event.message tp = message.tp offset = message.offset @@ -781,7 +804,7 @@ def _maybe_signal_recovery_end() -> None: processing_times.popleft() self._last_active_event_processed_at = now_after - _maybe_signal_recovery_end() + await _maybe_signal_recovery_end() if not self.standby_remaining_total(): logger.debug("Completed standby partition fetch") diff --git a/faust/transport/consumer.py b/faust/transport/consumer.py index 6b8be6e43..3337542f5 100644 --- a/faust/transport/consumer.py +++ b/faust/transport/consumer.py @@ -71,6 +71,7 @@ ) from weakref import WeakSet +from aiokafka.errors import ProducerFenced from mode import Service, ServiceT, flight_recorder, get_logger from mode.threads import MethodQueue, QueueServiceThread from mode.utils.futures import notify @@ -334,11 +335,15 @@ async def commit( by_transactional_id[transactional_id][tp] = offset if by_transactional_id: - await producer.commit_transactions( - by_transactional_id, - group_id, - start_new_transaction=start_new_transaction, - ) + try: + await producer.commit_transactions( + by_transactional_id, + group_id, + start_new_transaction=start_new_transaction, + ) + except ProducerFenced as pf: + logger.warning(f"ProducerFenced {pf}") + await self.app.crash(pf) return True def key_partition(self, topic: str, key: bytes) -> TP: @@ -1031,7 +1036,7 @@ def _new_offset(self, tp: TP) -> Optional[int]: acked[: len(batch)] = [] self._acked_index[tp].difference_update(batch) # return the highest commit offset - return batch[-1] + return batch[-1] + 1 return None async def on_task_error(self, exc: BaseException) -> None: