Skip to content

Commit

Permalink
Merge branch 'master' into fix-twisted-doc-link
Browse files Browse the repository at this point in the history
  • Loading branch information
wbarnha authored Jul 14, 2022
2 parents 3420634 + db6a3ae commit 7cb9936
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 4 deletions.
2 changes: 1 addition & 1 deletion faust/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import typing
from typing import Any, Mapping, NamedTuple, Optional, Sequence, Tuple

__version__ = "0.8.5"
__version__ = "0.8.6"
__author__ = "Robinhood Markets, Inc."
__contact__ = "schrohm@gmail.com, vpatki@wayfair.com"
__homepage__ = "https://github.com/faust-streaming/faust"
Expand Down
12 changes: 9 additions & 3 deletions faust/tables/recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -690,12 +690,18 @@ async def _build_offsets(
# Offsets may have been compacted, need to get to the recent ones
earliest = await consumer.earliest_offsets(*tps)
# FIXME To be consistent with the offset -1 logic
earliest = {tp: offset - 1 for tp, offset in earliest.items()}
earliest = {
tp: offset - 1 if offset is not None else None
for tp, offset in earliest.items()
}

for tp in tps:
last_value = destination[tp]
new_value = earliest[tp]
new_value = earliest.get(tp, None)

if last_value is None:
if last_value is None and new_value is None:
destination[tp] = -1
elif last_value is None:
destination[tp] = new_value
elif new_value is None:
destination[tp] = last_value
Expand Down
12 changes: 12 additions & 0 deletions faust/transport/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,18 @@ def _new_offset(self, tp: TP) -> Optional[int]:
acked.extend(gaps)
gap_for_tp[:gap_index] = []
acked.sort()

# We iterate over it until we handle gap in the head of acked queue
# then return the previous committed offset.
# For example if acked[tp] is:
# 34 35 36 37
# ^-- gap
# self._committed_offset[tp] is 31
# the return value will be None (the same as 31)
if self._committed_offset[tp]:
if min(acked) - self._committed_offset[tp] > 0:
return None

# Note: acked is always kept sorted.
# find first list of consecutive numbers
batch = next(consecutive_numbers(acked))
Expand Down
46 changes: 46 additions & 0 deletions tests/unit/tables/test_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,52 @@ async def test__build_offsets(self, *, recovery):
}
)

@pytest.mark.asyncio
async def test__build_offsets_with_none(self, *, recovery, app) -> None:
consumer = Mock(
name="consumer",
earliest_offsets=AsyncMock(
return_value={TP1: 0, TP2: 3, TP3: 5, TP4: None}
),
)
tps = {TP1, TP2, TP3, TP4}
destination = {TP1: None, TP2: 1, TP3: 8, TP4: -1}
await recovery._build_offsets(consumer, tps, destination, "some-title")
assert len(destination) == 4
assert destination[TP1] == -1
assert destination[TP2] == 2
assert destination[TP3] == 8
assert destination[TP4] == -1

@pytest.mark.asyncio
async def test__build_offsets_both_none(self, *, recovery, app) -> None:
consumer = Mock(
name="consumer",
earliest_offsets=AsyncMock(return_value={TP1: None}),
)
tps = {TP1}
destination = {TP1: None}
await recovery._build_offsets(consumer, tps, destination, "some-title")
assert len(destination) == 1
assert destination[TP1] == -1

@pytest.mark.asyncio
async def test__build_offsets_partial_consumer_response(
self, *, recovery, app
) -> None:
consumer = Mock(
name="consumer",
earliest_offsets=AsyncMock(return_value={TP1: None}),
)
tps = {TP1}
destination = {TP1: 3, TP2: 4, TP3: 5, TP4: 20}
await recovery._build_offsets(consumer, tps, destination, "some-title")
assert len(destination) == 4
assert destination[TP1] == 3
assert destination[TP2] == 4
assert destination[TP3] == 5
assert destination[TP4] == 20

@pytest.mark.asyncio
async def test__seek_offsets(self, *, recovery):
consumer = Mock(
Expand Down
4 changes: 4 additions & 0 deletions tests/unit/transport/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1079,9 +1079,13 @@ def test_new_offset(self, tp, acked, expected_offset, expected_acked, *, consume
(TP1, [1, 2, 3, 4, 5, 6, 7, 8, 10], [9], 11),
(TP1, [1, 2, 3, 4, 6, 7, 8, 10], [5], 9),
(TP1, [1, 3, 4, 6, 7, 8, 10], [2, 5, 9], 11),
(TP1, [3, 4], [], None),
(TP1, [3, 4], [2], None),
(TP1, [3, 4], [1, 2], 5),
],
)
def test_new_offset_with_gaps(self, tp, acked, gaps, expected_offset, *, consumer):
consumer._committed_offset[tp] = 1
consumer._acked[tp] = acked
consumer._gap[tp] = gaps
assert consumer._new_offset(tp) == expected_offset
Expand Down

0 comments on commit 7cb9936

Please sign in to comment.