Skip to content

Commit

Permalink
Merge branch 'master' into remove-3.6-support
Browse files Browse the repository at this point in the history
  • Loading branch information
patkivikram authored Aug 16, 2022
2 parents 0eb434f + b0f927d commit a6ab7c9
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 42 deletions.
16 changes: 9 additions & 7 deletions .github/workflows/dist.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,25 @@ on:
- 'master'

jobs:
build_wheels_macos:
name: 'Build wheels for macOS'
runs-on: macos-11
build_wheels:
name: Build wheels on ${{ matrix.os }}
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-20.04, windows-2019, macos-11]

steps:
- uses: actions/checkout@v3

- name: Build wheels
uses: pypa/cibuildwheel@v2.8.1
env:
CIBW_ARCHS_MACOS: x86_64 arm64
CIBW_PLATFORM: macos
CIBW_MANYLINUX_X86_64_IMAGE: 'manylinux2014'
CIBW_ARCHS: auto64
CIBW_BUILD: 'cp3*'
CIBW_BEFORE_BUILD: pip3 install Cython

- uses: actions/upload-artifact@v3
name: 'Upload build artifacts'
with:
path: ./wheelhouse/*.whl

Expand Down Expand Up @@ -53,7 +55,7 @@ jobs:

upload_pypi:
name: 'Upload packages'
needs: ['build_wheels_macos', 'build_sdist']
needs: ['build_wheels', 'build_sdist']
runs-on: 'ubuntu-latest'
if: github.event_name == 'release' && github.event.action == 'created'
steps:
Expand Down
12 changes: 6 additions & 6 deletions docs/contributing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ Bugs for a package in the Faust ecosystem should be reported to the relevant
issue tracker.

* :pypi:`Faust` - https://github.com/faust-streaming/faust/issues
* :pypi:`Mode` - https://github.com/ask/mode/issues
* :pypi:`Mode` - https://github.com/faust-streaming/mode/issues

If you're unsure of the origin of the bug you can ask the
:ref:`mailing-list`, or just use the Faust issue tracker.
Expand Down Expand Up @@ -797,16 +797,16 @@ Packages
:git: https://github.com/faust-streaming/faust
:CI: http://travis-ci.org/#!/robinhood/faust
:Windows-CI: https://ci.appveyor.com/project/ask/faust
:PyPI: :pypi:`faust`
:PyPI: :pypi:`faust-streaming`
:docs: https://fauststream.com/en/latest/

``Mode``
--------

:git: https://github.com/ask/mode
:CI: http://travis-ci.org/#!/ask/mode
:Windows-CI: https://ci.appveyor.com/project/ask/mode
:PyPI: :pypi:`Mode`
:git: https://github.com/faust-streaming/mode/
:CI: https://www.travis-ci.com/#!/faust-streaming/mode
:Windows-CI: https://ci.appveyor.com/project/faust-streaming/mode
:PyPI: :pypi:`mode-streaming`
:docs: http://mode.readthedocs.io/

.. _release-procedure:
Expand Down
2 changes: 1 addition & 1 deletion faust/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import typing
from typing import Any, Mapping, NamedTuple, Optional, Sequence, Tuple

__version__ = "0.8.7"
__version__ = "0.8.8"
__author__ = "Robinhood Markets, Inc."
__contact__ = "schrohm@gmail.com, vpatki@wayfair.com"
__homepage__ = "https://github.com/faust-streaming/faust"
Expand Down
1 change: 1 addition & 0 deletions faust/tables/wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ def __init__(
key_type=self.table.key_type,
window=None,
)
self.table.app.tables.add(self.key_index_table)
self._get_relative_timestamp = self._relative_handler(relative_to)

def clone(self, relative_to: RelativeArg) -> WindowWrapperT:
Expand Down
44 changes: 29 additions & 15 deletions faust/transport/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
from weakref import WeakSet

from aiokafka.errors import ProducerFenced
from intervaltree import IntervalTree
from mode import Service, ServiceT, flight_recorder, get_logger
from mode.threads import MethodQueue, QueueServiceThread
from mode.utils.futures import notify
Expand Down Expand Up @@ -392,7 +393,7 @@ class Consumer(Service, ConsumerT):
consumer_stopped_errors: ClassVar[Tuple[Type[BaseException], ...]] = ()

# Mapping of TP to list of gap in offsets.
_gap: MutableMapping[TP, List[int]]
_gap: MutableMapping[TP, IntervalTree]

# Mapping of TP to list of acked offsets.
_acked: MutableMapping[TP, List[int]]
Expand Down Expand Up @@ -465,7 +466,7 @@ def __init__(
commit_livelock_soft_timeout
or self.app.conf.broker_commit_livelock_soft_timeout
)
self._gap = defaultdict(list)
self._gap = defaultdict(IntervalTree)
self._acked = defaultdict(list)
self._acked_index = defaultdict(set)
self._read_offset = defaultdict(lambda: None)
Expand Down Expand Up @@ -1087,15 +1088,22 @@ def _new_offset(self, tp: TP) -> Optional[int]:
# the return value will be: 37
if acked:
max_offset = max(acked)
gap_for_tp = self._gap[tp]
gap_for_tp: IntervalTree = self._gap[tp]
if gap_for_tp:
gap_index = next(
(i for i, x in enumerate(gap_for_tp) if x > max_offset),
len(gap_for_tp),
)
gaps = gap_for_tp[:gap_index]
acked.extend(gaps)
gap_for_tp[:gap_index] = []
# find all the ranges up to the max of acked, add them in to acked,
# and chop them off the gap.
candidates = gap_for_tp.overlap(0, max_offset)
# note: merge_overlaps will sort the intervaltree and will ensure that
# the intervals left over don't overlap each other. So can sort by their
# start without worrying about ends overlapping.
sorted_candidates = sorted(candidates, key=lambda x: x.begin)
if sorted_candidates:
stuff_to_add = []
for entry in sorted_candidates:
stuff_to_add.extend(range(entry.begin, entry.end))
new_max_offset = max(stuff_to_add[-1], max_offset + 1)
acked.extend(stuff_to_add)
gap_for_tp.chop(0, new_max_offset)
acked.sort()

# We iterate over it until we handle gap in the head of acked queue
Expand Down Expand Up @@ -1123,12 +1131,18 @@ async def on_task_error(self, exc: BaseException) -> None:
"""Call when processing a message failed."""
await self.commit()

def _add_gap(self, tp: TP, offset_from: int, offset_to: int) -> None:
async def _add_gap(self, tp: TP, offset_from: int, offset_to: int) -> None:
committed = self._committed_offset[tp]
gap_for_tp = self._gap[tp]
for offset in range(offset_from, offset_to):
if committed is None or offset > committed:
gap_for_tp.append(offset)
if committed is not None:
offset_from = max(offset_from, committed + 1)
# intervaltree intervals exclude the end
if offset_from <= offset_to:
gap_for_tp.addi(offset_from, offset_to + 1)
# sleep 0 to allow other coroutines to get some loop time
# for example, to answer health checks while building the gap
await asyncio.sleep(0)
gap_for_tp.merge_overlaps()

async def _drain_messages(self, fetcher: ServiceT) -> None: # pragma: no cover
# This is the background thread started by Fetcher, used to
Expand Down Expand Up @@ -1175,7 +1189,7 @@ async def _drain_messages(self, fetcher: ServiceT) -> None: # pragma: no cover
if gap > 1 and r_offset:
acks_enabled = acks_enabled_for(message.topic)
if acks_enabled:
self._add_gap(tp, r_offset + 1, offset)
await self._add_gap(tp, r_offset + 1, offset)
if commit_every is not None:
if self._n_acked >= commit_every:
self._n_acked = 0
Expand Down
1 change: 1 addition & 0 deletions requirements/dist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ tox>=2.3.1
twine
vulture
wheel>=0.29.0
intervaltree
1 change: 1 addition & 0 deletions requirements/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ yarl>=1.0,<2.0
croniter>=0.3.16
mypy_extensions
venusian==3.0.0
intervaltree
1 change: 1 addition & 0 deletions requirements/test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ codecov
bandit==1.6.2
twine
wheel
intervaltree
-r requirements.txt
-r extras/datadog.txt
-r extras/redis.txt
Expand Down
31 changes: 18 additions & 13 deletions tests/unit/transport/test_consumer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio

import pytest
from intervaltree import Interval, IntervalTree
from mode import Service
from mode.threads import MethodQueue
from mode.utils.futures import done_future
Expand Down Expand Up @@ -1075,13 +1076,15 @@ def test_new_offset(self, tp, acked, expected_offset, expected_acked, *, consume
"tp,acked,gaps,expected_offset",
[
(TP1, [], [], None),
(TP1, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], [], 11),
(TP1, [1, 2, 3, 4, 5, 6, 7, 8, 10], [9], 11),
(TP1, [1, 2, 3, 4, 6, 7, 8, 10], [5], 9),
(TP1, [1, 3, 4, 6, 7, 8, 10], [2, 5, 9], 11),
(TP1, [3, 4], [], None),
(TP1, [3, 4], [2], None),
(TP1, [3, 4], [1, 2], 5),
(TP1, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], IntervalTree(), 11),
(TP1, [1, 2, 3, 4, 5, 6, 7, 8, 10], IntervalTree([Interval(9, 10)]), 11),
(TP1, [1, 2, 3, 4, 6, 7, 8, 10], IntervalTree([Interval(5, 6)]), 9),
(
TP1,
[1, 3, 4, 6, 7, 8, 10],
IntervalTree([Interval(2, 3), Interval(5, 6), Interval(9, 10)]),
11,
),
],
)
def test_new_offset_with_gaps(self, tp, acked, gaps, expected_offset, *, consumer):
Expand All @@ -1096,19 +1099,21 @@ async def test_on_task_error(self, *, consumer):
await consumer.on_task_error(KeyError())
consumer.commit.assert_called_once_with()

def test__add_gap(self, *, consumer):
@pytest.mark.asyncio
async def test__add_gap(self, *, consumer):
tp = TP1
consumer._committed_offset[tp] = 299
consumer._add_gap(TP1, 300, 343)
await consumer._add_gap(TP1, 300, 343)

assert consumer._gap[tp] == list(range(300, 343))
assert consumer._gap[tp] == IntervalTree([Interval(300, 344)])

def test__add_gap__previous_to_committed(self, *, consumer):
@pytest.mark.asyncio
async def test__add_gap__previous_to_committed(self, *, consumer):
tp = TP1
consumer._committed_offset[tp] = 400
consumer._add_gap(TP1, 300, 343)
await consumer._add_gap(TP1, 300, 343)

assert consumer._gap[tp] == []
assert consumer._gap[tp] == IntervalTree()

@pytest.mark.asyncio
async def test_commit_handler(self, *, consumer):
Expand Down

0 comments on commit a6ab7c9

Please sign in to comment.