Skip to content

Commit

Permalink
Merge remote-tracking branch 'faust-streaming/master' into fix/buffer…
Browse files Browse the repository at this point in the history
…-unit-tests
  • Loading branch information
cbrand committed Feb 27, 2022
2 parents fb31a73 + 89c1614 commit 5a68732
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 12 deletions.
2 changes: 1 addition & 1 deletion faust/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import typing
from typing import Any, Mapping, NamedTuple, Optional, Sequence, Tuple

__version__ = "0.8.3"
__version__ = "0.8.5"
__author__ = "Robinhood Markets, Inc."
__contact__ = "schrohm@gmail.com, vpatki@wayfair.com"
__homepage__ = "https://github.com/faust-streaming/faust"
Expand Down
1 change: 1 addition & 0 deletions faust/transport/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,7 @@ async def _commit_offsets(
start_new_transaction=start_new_transaction,
)
else:
await self.app.producer.flush()
did_commit = await self._commit(committable_offsets)
on_timeout.info("-consumer.commit()")
if did_commit:
Expand Down
7 changes: 5 additions & 2 deletions faust/transport/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,11 @@ async def flush_atmost(self, max_messages: Optional[int]) -> int:
(max_messages is None or flushed_messages < max_messages)
):
self.message_sent.clear()
await self.message_sent.wait()
flushed_messages += 1
try:
await asyncio.wait_for(self.message_sent.wait(), timeout=0.1)
flushed_messages += 1
except asyncio.TimeoutError:
return flushed_messages
else:
return flushed_messages

Expand Down
5 changes: 5 additions & 0 deletions tests/unit/transport/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,7 @@ async def test_force_commit(self, *, consumer):
async def test_commit_tps(self, *, consumer):
consumer._handle_attached = AsyncMock(name="_handle_attached")
consumer._commit_offsets = AsyncMock(name="_commit_offsets")
consumer.app.producer.flush = AsyncMock()
consumer._filter_committable_offsets = Mock(name="filt")
consumer._filter_committable_offsets.return_value = {
TP1: 4,
Expand Down Expand Up @@ -942,6 +943,7 @@ async def test_handle_attached(self, *, consumer):
async def test_commit_offsets(self, *, consumer):
consumer._commit = AsyncMock(name="_commit")
consumer.current_assignment.update({TP1, TP2})
consumer.app.producer.flush = AsyncMock()
await consumer._commit_offsets(
{
TP1: 3003,
Expand All @@ -959,6 +961,7 @@ async def test_commit_offsets(self, *, consumer):
async def test_commit_offsets__did_not_commit(self, *, consumer):
consumer.in_transaction = False
consumer._commit = AsyncMock(return_value=False)
consumer.app.producer.flush = AsyncMock()
consumer.current_assignment.update({TP1, TP2})
consumer.app.tables = Mock(name="app.tables")
await consumer._commit_offsets(
Expand All @@ -973,6 +976,7 @@ async def test_commit_offsets__did_not_commit(self, *, consumer):
@pytest.mark.asyncio
async def test_commit_offsets__in_transaction(self, *, consumer):
consumer.in_transaction = True
consumer.app.producer.flush = AsyncMock()
consumer.transactions.commit = AsyncMock()
consumer.current_assignment.update({TP1, TP2})
ret = await consumer._commit_offsets(
Expand All @@ -991,6 +995,7 @@ async def test_commit_offsets__in_transaction(self, *, consumer):
@pytest.mark.asyncio
async def test_commit_offsets__no_committable_offsets(self, *, consumer):
consumer.current_assignment.clear()
consumer.app.producer.flush = AsyncMock()
assert not await consumer._commit_offsets(
{
TP1: 3003,
Expand Down
70 changes: 61 additions & 9 deletions tests/unit/transport/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,24 +83,27 @@ async def flush_atmost(max_messages: Optional[int]) -> int:
task = asyncio.create_task(buf.wait_until_ebb())
await asyncio.sleep(0)
assert flush_atmost_call_count == 1
assert (
not task.done()
), "The wait_until_ebb has been finished even though flush atmost did not return"
assert not task.done(), (
"The wait_until_ebb has been finished even "
"though flush atmost did not return"
)

buf.__class__.size = PropertyMock(return_value=10)
await asyncio.sleep(0)
assert (
task.done()
), "The wait_until_ebb did not complete even though the size is beneath the max size"
assert task.done(), (
"The wait_until_ebb did not complete even "
"though the size is beneath the max size"
)

assert (
flush_atmost_call_count > 0
), "The wait_until_ebb did not call the flush_atmost function"
task = asyncio.create_task(buf.wait_until_ebb())
await asyncio.sleep(0)
assert (
task.done()
), "The wait_until_ebb function did not finish even though the buffer is small enough"
assert task.done(), (
"The wait_until_ebb function did not finish even "
"though the buffer is small enough"
)
finally:
buf.__class__.size = original_size
await buf.stop()
Expand Down Expand Up @@ -154,6 +157,55 @@ async def _inner(*args: Any):
finally:
await buf.stop()

@pytest.mark.asyncio
async def test_flush_atmost_with_simulated_threaded_behavior(self, *, buf):
def create_send_pending_mock(max_messages):
sent_messages = 0

async def _inner(*args: Any):
nonlocal sent_messages
if sent_messages < max_messages:
sent_messages += 1
return
else:
await asyncio.Future()

return _inner

buf._send_pending = create_send_pending_mock(10)

class WaitForEverEvent(asyncio.Event):
test_stopped: bool = False

def stop_test(self) -> None:
self.test_stopped = True

async def wait(self) -> None:
while not self.test_stopped:
await asyncio.sleep(1.0)

wait_for_event = buf.message_sent = WaitForEverEvent()
await buf.start()

try:
original_size_property = buf.__class__.size
buf.__class__.size = PropertyMock(return_value=10)
waiting = buf.flush_atmost(10)
loop = asyncio.get_event_loop()
task = loop.create_task(waiting)
await asyncio.sleep(0)
assert (
not task.done()
), "Task has completed even though not all events have been issued"
buf.__class__.size = PropertyMock(return_value=0)
await asyncio.sleep(0.2)
assert task.done(), "Task has not been completed even though queue is None"
assert isinstance(task.result(), int)
finally:
wait_for_event.stop_test()
buf.__class__.size = original_size_property
await buf.stop()


class ProducerTests:
@pytest.mark.asyncio
Expand Down

0 comments on commit 5a68732

Please sign in to comment.