diff --git a/faust/tables/recovery.py b/faust/tables/recovery.py index f54afbca8..1adbcf4b5 100644 --- a/faust/tables/recovery.py +++ b/faust/tables/recovery.py @@ -470,6 +470,14 @@ async def _restart_recovery(self) -> None: T(self.app.flow_control.resume)() T(consumer.resume_flow)() self._set_recovery_ended() + + # The changelog partitions only in the active_tps set need to be resumed + active_only_partitions = active_tps - standby_tps + if active_only_partitions: + T(consumer.resume_partitions)(active_only_partitions) + T(self.app.flow_control.resume)() + T(consumer.resume_flow)() + self.log.info("Recovery complete") if span: span.set_tag("Recovery-Completed", True) diff --git a/tests/bench/base.py b/tests/bench/base.py index f686feb11..9bad097d6 100644 --- a/tests/bench/base.py +++ b/tests/bench/base.py @@ -94,7 +94,7 @@ async def produce(self, max_latency: float, max_messages: int, **kwargs): time_start = monotonic() time_1st = monotonic() - def on_published(meta): + def on_published(meta, time_1st=time_1st): print(f"1ST OK: {meta} AFTER {monotonic() - time_1st}s") callback = on_published