Skip to content

Commit

Permalink
fix for consumer errors in app #166 (#167)
Browse files Browse the repository at this point in the history
* fix for consumer errors in app #166

* fix for consumer errors in app #166
  • Loading branch information
patkivikram authored Jun 29, 2021
1 parent 6f8f0b7 commit 761713b
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 7 deletions.
7 changes: 3 additions & 4 deletions faust/transport/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -848,13 +848,12 @@ async def _commit_livelock_detector(self) -> None: # pragma: no cover
await self.sleep(interval)
async for sleep_time in self.itertimer(interval, name="livelock"):
if not self.app.rebalancing:
await self.app.loop.run_in_executor(
None, self.verify_all_partitions_active
)
await self.verify_all_partitions_active()

def verify_all_partitions_active(self) -> None:
async def verify_all_partitions_active(self) -> None:
now = monotonic()
for tp in self.assignment():
await self.sleep(0)
if not self.should_stop:
self.verify_event_path(now, tp)

Expand Down
6 changes: 3 additions & 3 deletions tests/unit/transport/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1386,7 +1386,7 @@ async def test_verify_all_partitions_active(self, *, consumer):

with patch("faust.transport.consumer.monotonic") as monotonic:
now = monotonic.return_value = 391243.231
consumer.verify_all_partitions_active()
await consumer.verify_all_partitions_active()

consumer.verify_event_path.assert_has_calls(
[
Expand All @@ -1405,13 +1405,13 @@ async def test_verify_all_partitions_active__bail_on_sleep(self, *, consumer):
consumer.sleep = AsyncMock()

async def on_sleep(secs):
if consumer.sleep.call_count == 2:
if consumer.sleep.call_count == 4:
consumer._stopped.set()

consumer.sleep.side_effect = on_sleep

with patch("faust.transport.consumer.monotonic") as monotonic:
now = monotonic.return_value = 391243.231
consumer.verify_all_partitions_active()
await consumer.verify_all_partitions_active()

consumer.verify_event_path.assert_called_with(now, TP3)

0 comments on commit 761713b

Please sign in to comment.