diff --git a/faust/__init__.py b/faust/__init__.py index 4333e61b0..5ad783884 100644 --- a/faust/__init__.py +++ b/faust/__init__.py @@ -23,7 +23,7 @@ import typing from typing import Any, Mapping, NamedTuple, Optional, Sequence, Tuple -__version__ = "0.8.3" +__version__ = "0.8.4" __author__ = "Robinhood Markets, Inc." __contact__ = "schrohm@gmail.com, vpatki@wayfair.com" __homepage__ = "https://github.com/faust-streaming/faust" diff --git a/faust/transport/consumer.py b/faust/transport/consumer.py index 4531da79c..e533beff4 100644 --- a/faust/transport/consumer.py +++ b/faust/transport/consumer.py @@ -1047,6 +1047,7 @@ async def _commit_offsets( start_new_transaction=start_new_transaction, ) else: + await self.app.producer.flush() did_commit = await self._commit(committable_offsets) on_timeout.info("-consumer.commit()") if did_commit: diff --git a/tests/unit/transport/test_consumer.py b/tests/unit/transport/test_consumer.py index 485e47610..088acfeff 100644 --- a/tests/unit/transport/test_consumer.py +++ b/tests/unit/transport/test_consumer.py @@ -840,6 +840,7 @@ async def test_force_commit(self, *, consumer): async def test_commit_tps(self, *, consumer): consumer._handle_attached = AsyncMock(name="_handle_attached") consumer._commit_offsets = AsyncMock(name="_commit_offsets") + consumer.app.producer.flush = AsyncMock() consumer._filter_committable_offsets = Mock(name="filt") consumer._filter_committable_offsets.return_value = { TP1: 4, @@ -942,6 +943,7 @@ async def test_handle_attached(self, *, consumer): async def test_commit_offsets(self, *, consumer): consumer._commit = AsyncMock(name="_commit") consumer.current_assignment.update({TP1, TP2}) + consumer.app.producer.flush = AsyncMock() await consumer._commit_offsets( { TP1: 3003, @@ -959,6 +961,7 @@ async def test_commit_offsets(self, *, consumer): async def test_commit_offsets__did_not_commit(self, *, consumer): consumer.in_transaction = False consumer._commit = AsyncMock(return_value=False) + consumer.app.producer.flush = AsyncMock() consumer.current_assignment.update({TP1, TP2}) consumer.app.tables = Mock(name="app.tables") await consumer._commit_offsets( @@ -973,6 +976,7 @@ async def test_commit_offsets__did_not_commit(self, *, consumer): @pytest.mark.asyncio async def test_commit_offsets__in_transaction(self, *, consumer): consumer.in_transaction = True + consumer.app.producer.flush = AsyncMock() consumer.transactions.commit = AsyncMock() consumer.current_assignment.update({TP1, TP2}) ret = await consumer._commit_offsets( @@ -991,6 +995,7 @@ async def test_commit_offsets__in_transaction(self, *, consumer): @pytest.mark.asyncio async def test_commit_offsets__no_committable_offsets(self, *, consumer): consumer.current_assignment.clear() + consumer.app.producer.flush = AsyncMock() assert not await consumer._commit_offsets( { TP1: 3003,