diff --git a/faust/transport/consumer.py b/faust/transport/consumer.py index 96252e911..4452d85ff 100644 --- a/faust/transport/consumer.py +++ b/faust/transport/consumer.py @@ -848,13 +848,12 @@ async def _commit_livelock_detector(self) -> None: # pragma: no cover await self.sleep(interval) async for sleep_time in self.itertimer(interval, name="livelock"): if not self.app.rebalancing: - await self.app.loop.run_in_executor( - None, self.verify_all_partitions_active - ) + await self.verify_all_partitions_active() - def verify_all_partitions_active(self) -> None: + async def verify_all_partitions_active(self) -> None: now = monotonic() for tp in self.assignment(): + await self.sleep(0) if not self.should_stop: self.verify_event_path(now, tp) diff --git a/tests/unit/transport/test_consumer.py b/tests/unit/transport/test_consumer.py index 6af22e1c8..b4cc668f7 100644 --- a/tests/unit/transport/test_consumer.py +++ b/tests/unit/transport/test_consumer.py @@ -1386,7 +1386,7 @@ async def test_verify_all_partitions_active(self, *, consumer): with patch("faust.transport.consumer.monotonic") as monotonic: now = monotonic.return_value = 391243.231 - consumer.verify_all_partitions_active() + await consumer.verify_all_partitions_active() consumer.verify_event_path.assert_has_calls( [ @@ -1405,13 +1405,13 @@ async def test_verify_all_partitions_active__bail_on_sleep(self, *, consumer): consumer.sleep = AsyncMock() async def on_sleep(secs): - if consumer.sleep.call_count == 2: + if consumer.sleep.call_count == 4: consumer._stopped.set() consumer.sleep.side_effect = on_sleep with patch("faust.transport.consumer.monotonic") as monotonic: now = monotonic.return_value = 391243.231 - consumer.verify_all_partitions_active() + await consumer.verify_all_partitions_active() consumer.verify_event_path.assert_called_with(now, TP3)