Skip to content

Commit

Permalink
adding intervaltree to manage gaps in topics to prevent OOM (#282)
Browse files Browse the repository at this point in the history
* adding intervaltree to manage gaps in topics to prevent OOM

* adding intervaltree to manage gaps in topics to prevent OOM

* remove old consumer test TPs

Co-authored-by: William Barnhart <williambbarnhart@gmail.com>
  • Loading branch information
patkivikram and wbarnha authored Aug 12, 2022
1 parent 681c2c9 commit 1e7be3a
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 28 deletions.
44 changes: 29 additions & 15 deletions faust/transport/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
from weakref import WeakSet

from aiokafka.errors import ProducerFenced
from intervaltree import IntervalTree
from mode import Service, ServiceT, flight_recorder, get_logger
from mode.threads import MethodQueue, QueueServiceThread
from mode.utils.futures import notify
Expand Down Expand Up @@ -392,7 +393,7 @@ class Consumer(Service, ConsumerT):
consumer_stopped_errors: ClassVar[Tuple[Type[BaseException], ...]] = ()

# Mapping of TP to list of gap in offsets.
_gap: MutableMapping[TP, List[int]]
_gap: MutableMapping[TP, IntervalTree]

# Mapping of TP to list of acked offsets.
_acked: MutableMapping[TP, List[int]]
Expand Down Expand Up @@ -465,7 +466,7 @@ def __init__(
commit_livelock_soft_timeout
or self.app.conf.broker_commit_livelock_soft_timeout
)
self._gap = defaultdict(list)
self._gap = defaultdict(IntervalTree)
self._acked = defaultdict(list)
self._acked_index = defaultdict(set)
self._read_offset = defaultdict(lambda: None)
Expand Down Expand Up @@ -1087,15 +1088,22 @@ def _new_offset(self, tp: TP) -> Optional[int]:
# the return value will be: 37
if acked:
max_offset = max(acked)
gap_for_tp = self._gap[tp]
gap_for_tp: IntervalTree = self._gap[tp]
if gap_for_tp:
gap_index = next(
(i for i, x in enumerate(gap_for_tp) if x > max_offset),
len(gap_for_tp),
)
gaps = gap_for_tp[:gap_index]
acked.extend(gaps)
gap_for_tp[:gap_index] = []
# find all the ranges up to the max of acked, add them in to acked,
# and chop them off the gap.
candidates = gap_for_tp.overlap(0, max_offset)
# note: merge_overlaps will sort the intervaltree and will ensure that
# the intervals left over don't overlap each other. So can sort by their
# start without worrying about ends overlapping.
sorted_candidates = sorted(candidates, key=lambda x: x.begin)
if sorted_candidates:
stuff_to_add = []
for entry in sorted_candidates:
stuff_to_add.extend(range(entry.begin, entry.end))
new_max_offset = max(stuff_to_add[-1], max_offset + 1)
acked.extend(stuff_to_add)
gap_for_tp.chop(0, new_max_offset)
acked.sort()

# We iterate over it until we handle gap in the head of acked queue
Expand Down Expand Up @@ -1123,12 +1131,18 @@ async def on_task_error(self, exc: BaseException) -> None:
"""Call when processing a message failed."""
await self.commit()

def _add_gap(self, tp: TP, offset_from: int, offset_to: int) -> None:
async def _add_gap(self, tp: TP, offset_from: int, offset_to: int) -> None:
committed = self._committed_offset[tp]
gap_for_tp = self._gap[tp]
for offset in range(offset_from, offset_to):
if committed is None or offset > committed:
gap_for_tp.append(offset)
if committed is not None:
offset_from = max(offset_from, committed + 1)
# intervaltree intervals exclude the end
if offset_from <= offset_to:
gap_for_tp.addi(offset_from, offset_to + 1)
# sleep 0 to allow other coroutines to get some loop time
# for example, to answer health checks while building the gap
await asyncio.sleep(0)
gap_for_tp.merge_overlaps()

async def _drain_messages(self, fetcher: ServiceT) -> None: # pragma: no cover
# This is the background thread started by Fetcher, used to
Expand Down Expand Up @@ -1175,7 +1189,7 @@ async def _drain_messages(self, fetcher: ServiceT) -> None: # pragma: no cover
if gap > 1 and r_offset:
acks_enabled = acks_enabled_for(message.topic)
if acks_enabled:
self._add_gap(tp, r_offset + 1, offset)
await self._add_gap(tp, r_offset + 1, offset)
if commit_every is not None:
if self._n_acked >= commit_every:
self._n_acked = 0
Expand Down
1 change: 1 addition & 0 deletions requirements/dist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ tox>=2.3.1
twine
vulture
wheel>=0.29.0
intervaltree
1 change: 1 addition & 0 deletions requirements/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ yarl>=1.0,<2.0
croniter>=0.3.16
mypy_extensions
venusian==3.0.0
intervaltree
1 change: 1 addition & 0 deletions requirements/test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ codecov
bandit==1.6.2
twine
wheel
intervaltree
-r requirements.txt
-r extras/datadog.txt
-r extras/redis.txt
Expand Down
31 changes: 18 additions & 13 deletions tests/unit/transport/test_consumer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio

import pytest
from intervaltree import Interval, IntervalTree
from mode import Service
from mode.threads import MethodQueue
from mode.utils.futures import done_future
Expand Down Expand Up @@ -1075,13 +1076,15 @@ def test_new_offset(self, tp, acked, expected_offset, expected_acked, *, consume
"tp,acked,gaps,expected_offset",
[
(TP1, [], [], None),
(TP1, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], [], 11),
(TP1, [1, 2, 3, 4, 5, 6, 7, 8, 10], [9], 11),
(TP1, [1, 2, 3, 4, 6, 7, 8, 10], [5], 9),
(TP1, [1, 3, 4, 6, 7, 8, 10], [2, 5, 9], 11),
(TP1, [3, 4], [], None),
(TP1, [3, 4], [2], None),
(TP1, [3, 4], [1, 2], 5),
(TP1, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], IntervalTree(), 11),
(TP1, [1, 2, 3, 4, 5, 6, 7, 8, 10], IntervalTree([Interval(9, 10)]), 11),
(TP1, [1, 2, 3, 4, 6, 7, 8, 10], IntervalTree([Interval(5, 6)]), 9),
(
TP1,
[1, 3, 4, 6, 7, 8, 10],
IntervalTree([Interval(2, 3), Interval(5, 6), Interval(9, 10)]),
11,
),
],
)
def test_new_offset_with_gaps(self, tp, acked, gaps, expected_offset, *, consumer):
Expand All @@ -1096,19 +1099,21 @@ async def test_on_task_error(self, *, consumer):
await consumer.on_task_error(KeyError())
consumer.commit.assert_called_once_with()

def test__add_gap(self, *, consumer):
@pytest.mark.asyncio
async def test__add_gap(self, *, consumer):
tp = TP1
consumer._committed_offset[tp] = 299
consumer._add_gap(TP1, 300, 343)
await consumer._add_gap(TP1, 300, 343)

assert consumer._gap[tp] == list(range(300, 343))
assert consumer._gap[tp] == IntervalTree([Interval(300, 344)])

def test__add_gap__previous_to_committed(self, *, consumer):
@pytest.mark.asyncio
async def test__add_gap__previous_to_committed(self, *, consumer):
tp = TP1
consumer._committed_offset[tp] = 400
consumer._add_gap(TP1, 300, 343)
await consumer._add_gap(TP1, 300, 343)

assert consumer._gap[tp] == []
assert consumer._gap[tp] == IntervalTree()

@pytest.mark.asyncio
async def test_commit_handler(self, *, consumer):
Expand Down

0 comments on commit 1e7be3a

Please sign in to comment.