From e3bd128e5631657c58d301943754b9921d3cbe95 Mon Sep 17 00:00:00 2001 From: ekerstens <49325583+ekerstens@users.noreply.github.com> Date: Wed, 17 Nov 2021 07:33:26 -0800 Subject: [PATCH] Fix Recovery._resume_streams (#217) Co-authored-by: Eric Kerstens --- faust/tables/recovery.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/faust/tables/recovery.py b/faust/tables/recovery.py index ecd60b449..e833706d8 100644 --- a/faust/tables/recovery.py +++ b/faust/tables/recovery.py @@ -304,10 +304,6 @@ async def _resume_streams(self, generation_id: int = 0) -> None: app = self.app consumer = app.consumer await app.on_rebalance_complete.send() - # Resume partitions and start fetching. - self.log.info("Resuming flow...") - consumer.resume_flow() - app.flow_control.resume() assignment = consumer.assignment() if self.app.consumer_generation_id != generation_id: self.log.warning("Recovery rebalancing again") @@ -324,6 +320,10 @@ async def _resume_streams(self, generation_id: int = 0) -> None: else: self.log.info("Resuming streams with empty assignment") self.completed.set() + # Resume partitions and start fetching. + self.log.info("Resuming flow...") + consumer.resume_flow() + app.flow_control.resume() # finally make sure the fetcher is running. await cast(_App, app)._fetcher.maybe_start() self.tables.on_actives_ready()