diff --git a/faust/tables/recovery.py b/faust/tables/recovery.py index ecd60b449..e833706d8 100644 --- a/faust/tables/recovery.py +++ b/faust/tables/recovery.py @@ -304,10 +304,6 @@ async def _resume_streams(self, generation_id: int = 0) -> None: app = self.app consumer = app.consumer await app.on_rebalance_complete.send() - # Resume partitions and start fetching. - self.log.info("Resuming flow...") - consumer.resume_flow() - app.flow_control.resume() assignment = consumer.assignment() if self.app.consumer_generation_id != generation_id: self.log.warning("Recovery rebalancing again") @@ -324,6 +320,10 @@ async def _resume_streams(self, generation_id: int = 0) -> None: else: self.log.info("Resuming streams with empty assignment") self.completed.set() + # Resume partitions and start fetching. + self.log.info("Resuming flow...") + consumer.resume_flow() + app.flow_control.resume() # finally make sure the fetcher is running. await cast(_App, app)._fetcher.maybe_start() self.tables.on_actives_ready()