From 06755bc76c901e67b6f5c02e6a3e90373e445b64 Mon Sep 17 00:00:00 2001 From: Vikram Patki 24489 Date: Fri, 6 Nov 2020 20:04:02 -0500 Subject: [PATCH] fixed recovery hang and undefined set_close method in aiokafka --- faust/tables/recovery.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/faust/tables/recovery.py b/faust/tables/recovery.py index 1e4f60564..e1b19cb2d 100644 --- a/faust/tables/recovery.py +++ b/faust/tables/recovery.py @@ -246,6 +246,7 @@ def on_partitions_revoked(self, revoked: Set[TP]) -> None: T = traced_from_parent_span() T(self.flush_buffers)() self.signal_recovery_reset.set() + self.signal_recovery_start.set() async def on_rebalance( self, assigned: Set[TP], revoked: Set[TP], newly_assigned: Set[TP] @@ -294,7 +295,7 @@ async def on_rebalance( child_of=rebalancing_span, ) app._span_add_default_tags(self._recovery_span) - self.signal_recovery_reset.clear() + self.signal_recovery_reset.set() self.signal_recovery_start.set() async def _resume_streams(self) -> None: