diff --git a/faust/__init__.py b/faust/__init__.py index ba5e4e194..ff43ad7b2 100644 --- a/faust/__init__.py +++ b/faust/__init__.py @@ -23,7 +23,7 @@ import typing from typing import Any, Mapping, NamedTuple, Optional, Sequence, Tuple -__version__ = "0.8.5" +__version__ = "0.8.6" __author__ = "Robinhood Markets, Inc." __contact__ = "schrohm@gmail.com, vpatki@wayfair.com" __homepage__ = "https://github.com/faust-streaming/faust" diff --git a/faust/tables/recovery.py b/faust/tables/recovery.py index 83e9f2051..f54afbca8 100644 --- a/faust/tables/recovery.py +++ b/faust/tables/recovery.py @@ -690,12 +690,18 @@ async def _build_offsets( # Offsets may have been compacted, need to get to the recent ones earliest = await consumer.earliest_offsets(*tps) # FIXME To be consistent with the offset -1 logic - earliest = {tp: offset - 1 for tp, offset in earliest.items()} + earliest = { + tp: offset - 1 if offset is not None else None + for tp, offset in earliest.items() + } + for tp in tps: last_value = destination[tp] - new_value = earliest[tp] + new_value = earliest.get(tp, None) - if last_value is None: + if last_value is None and new_value is None: + destination[tp] = -1 + elif last_value is None: destination[tp] = new_value elif new_value is None: destination[tp] = last_value diff --git a/faust/transport/consumer.py b/faust/transport/consumer.py index 4d79d9a86..837d422f5 100644 --- a/faust/transport/consumer.py +++ b/faust/transport/consumer.py @@ -1097,6 +1097,18 @@ def _new_offset(self, tp: TP) -> Optional[int]: acked.extend(gaps) gap_for_tp[:gap_index] = [] acked.sort() + + # We iterate over it until we handle gap in the head of acked queue + # then return the previous committed offset. + # For example if acked[tp] is: + # 34 35 36 37 + # ^-- gap + # self._committed_offset[tp] is 31 + # the return value will be None (the same as 31) + if self._committed_offset[tp]: + if min(acked) - self._committed_offset[tp] > 0: + return None + # Note: acked is always kept sorted. # find first list of consecutive numbers batch = next(consecutive_numbers(acked)) diff --git a/tests/unit/tables/test_recovery.py b/tests/unit/tables/test_recovery.py index e7de15d2e..0412ea6fe 100644 --- a/tests/unit/tables/test_recovery.py +++ b/tests/unit/tables/test_recovery.py @@ -237,6 +237,52 @@ async def test__build_offsets(self, *, recovery): } ) + @pytest.mark.asyncio + async def test__build_offsets_with_none(self, *, recovery, app) -> None: + consumer = Mock( + name="consumer", + earliest_offsets=AsyncMock( + return_value={TP1: 0, TP2: 3, TP3: 5, TP4: None} + ), + ) + tps = {TP1, TP2, TP3, TP4} + destination = {TP1: None, TP2: 1, TP3: 8, TP4: -1} + await recovery._build_offsets(consumer, tps, destination, "some-title") + assert len(destination) == 4 + assert destination[TP1] == -1 + assert destination[TP2] == 2 + assert destination[TP3] == 8 + assert destination[TP4] == -1 + + @pytest.mark.asyncio + async def test__build_offsets_both_none(self, *, recovery, app) -> None: + consumer = Mock( + name="consumer", + earliest_offsets=AsyncMock(return_value={TP1: None}), + ) + tps = {TP1} + destination = {TP1: None} + await recovery._build_offsets(consumer, tps, destination, "some-title") + assert len(destination) == 1 + assert destination[TP1] == -1 + + @pytest.mark.asyncio + async def test__build_offsets_partial_consumer_response( + self, *, recovery, app + ) -> None: + consumer = Mock( + name="consumer", + earliest_offsets=AsyncMock(return_value={TP1: None}), + ) + tps = {TP1} + destination = {TP1: 3, TP2: 4, TP3: 5, TP4: 20} + await recovery._build_offsets(consumer, tps, destination, "some-title") + assert len(destination) == 4 + assert destination[TP1] == 3 + assert destination[TP2] == 4 + assert destination[TP3] == 5 + assert destination[TP4] == 20 + @pytest.mark.asyncio async def test__seek_offsets(self, *, recovery): consumer = Mock( diff --git a/tests/unit/transport/test_consumer.py b/tests/unit/transport/test_consumer.py index 088acfeff..2fe4e8d90 100644 --- a/tests/unit/transport/test_consumer.py +++ b/tests/unit/transport/test_consumer.py @@ -1079,9 +1079,13 @@ def test_new_offset(self, tp, acked, expected_offset, expected_acked, *, consume (TP1, [1, 2, 3, 4, 5, 6, 7, 8, 10], [9], 11), (TP1, [1, 2, 3, 4, 6, 7, 8, 10], [5], 9), (TP1, [1, 3, 4, 6, 7, 8, 10], [2, 5, 9], 11), + (TP1, [3, 4], [], None), + (TP1, [3, 4], [2], None), + (TP1, [3, 4], [1, 2], 5), ], ) def test_new_offset_with_gaps(self, tp, acked, gaps, expected_offset, *, consumer): + consumer._committed_offset[tp] = 1 consumer._acked[tp] = acked consumer._gap[tp] = gaps assert consumer._new_offset(tp) == expected_offset