Skip to content

Commit

Permalink
Fix commited offset is always behind the real offset by 1 (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
patkivikram authored Nov 16, 2020
2 parents 9fe472f + 277729a commit 18230a7
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 12 deletions.
6 changes: 3 additions & 3 deletions faust/transport/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ def ack(self, message: Message) -> bool:
if self.app.topics.acks_enabled_for(message.topic):
committed = self._committed_offset[tp]
try:
if committed is None or offset > committed:
if committed is None or offset >= committed:
acked_index = self._acked_index[tp]
if offset not in acked_index:
self._unacked_messages.discard(message)
Expand Down Expand Up @@ -1028,7 +1028,7 @@ def _new_offset(self, tp: TP) -> Optional[int]:
acked[: len(batch) - 1] = []
self._acked_index[tp].difference_update(batch)
# return the highest commit offset
return batch[-1]
return batch[-1] + 1
return None

async def on_task_error(self, exc: BaseException) -> None:
Expand Down Expand Up @@ -1081,7 +1081,7 @@ async def _drain_messages(self, fetcher: ServiceT) -> None: # pragma: no cover

offset = message.offset
r_offset = get_read_offset(tp)
if r_offset is None or offset > r_offset:
if r_offset is None or offset >= r_offset:
gap = offset - (r_offset or 0)
# We have a gap in income messages
if gap > 1 and r_offset:
Expand Down
19 changes: 10 additions & 9 deletions tests/unit/transport/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,8 @@ def test_filter_committable_offsets(self, *, consumer):
TP2: 30,
}
assert consumer._filter_committable_offsets({TP1, TP2}) == {
TP2: 36,
TP1: 5,
TP2: 37,
}

@pytest.mark.asyncio
Expand Down Expand Up @@ -1015,10 +1016,10 @@ def test_should_commit(self, tp, offset, committed, should, *, consumer):
"tp,acked,expected_offset",
[
(TP1, [], None),
(TP1, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 10),
(TP1, [1, 2, 3, 4, 5, 6, 7, 8, 10], 8),
(TP1, [1, 2, 3, 4, 6, 7, 8, 10], 4),
(TP1, [1, 3, 4, 6, 7, 8, 10], 1),
(TP1, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 11),
(TP1, [1, 2, 3, 4, 5, 6, 7, 8, 10], 9),
(TP1, [1, 2, 3, 4, 6, 7, 8, 10], 5),
(TP1, [1, 3, 4, 6, 7, 8, 10], 2),
],
)
def test_new_offset(self, tp, acked, expected_offset, *, consumer):
Expand All @@ -1029,10 +1030,10 @@ def test_new_offset(self, tp, acked, expected_offset, *, consumer):
"tp,acked,gaps,expected_offset",
[
(TP1, [], [], None),
(TP1, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], [], 10),
(TP1, [1, 2, 3, 4, 5, 6, 7, 8, 10], [9], 10),
(TP1, [1, 2, 3, 4, 6, 7, 8, 10], [5], 8),
(TP1, [1, 3, 4, 6, 7, 8, 10], [2, 5, 9], 10),
(TP1, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], [], 11),
(TP1, [1, 2, 3, 4, 5, 6, 7, 8, 10], [9], 11),
(TP1, [1, 2, 3, 4, 6, 7, 8, 10], [5], 9),
(TP1, [1, 3, 4, 6, 7, 8, 10], [2, 5, 9], 11),
],
)
def test_new_offset_with_gaps(self, tp, acked, gaps, expected_offset, *, consumer):
Expand Down

0 comments on commit 18230a7

Please sign in to comment.