From a887571e143f875c3ab4df488964e2ebde6dc5d2 Mon Sep 17 00:00:00 2001 From: Matthew Drago Date: Tue, 19 Jul 2022 16:44:55 +0200 Subject: [PATCH] Fixed filter not acking filtered out messages. (#208) * Fixed filter not acking filtered out messages. * Removed debug print from test. * Added Cython implementation for the filter fix. Co-authored-by: Matthew Drago Co-authored-by: Vikram Patki <54442035+patkivikram@users.noreply.github.com> Co-authored-by: Taybin Rutkin --- faust/_cython/streams.pyx | 3 ++- faust/streams.py | 7 ++++++- tests/functional/test_streams.py | 13 +++++++++++++ 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/faust/_cython/streams.pyx b/faust/_cython/streams.pyx index 42736c6f6..30b51460c 100644 --- a/faust/_cython/streams.pyx +++ b/faust/_cython/streams.pyx @@ -109,7 +109,8 @@ cdef class StreamIterator: object consumer consumer = self.consumer last_stream_to_ack = False - if do_ack and event is not None: + # if do_ack and event is not None: + if event is not None and (do_ack or event.value is self._skipped_value): message = event.message if not message.acked: refcount = message.refcount diff --git a/faust/streams.py b/faust/streams.py index 1de5afb57..2e45b6ace 100644 --- a/faust/streams.py +++ b/faust/streams.py @@ -1113,6 +1113,8 @@ async def _py_aiter(self) -> AsyncIterator[T_co]: value = await _maybe_async(processor(value)) value = await on_merge(value) except Skip: + # We want to ack the filtered message + # otherwise the lag would increase value = skipped_value try: @@ -1121,7 +1123,9 @@ async def _py_aiter(self) -> AsyncIterator[T_co]: yield value finally: self.current_event = None - if do_ack and event is not None: + # We want to ack the filtered out message + # otherwise the lag would increase + if event is not None and (do_ack or value is skipped_value): # This inlines self.ack last_stream_to_ack = event.ack() message = event.message @@ -1130,6 +1134,7 @@ async def _py_aiter(self) -> AsyncIterator[T_co]: on_stream_event_out(tp, offset, self, event, sensor_state) if last_stream_to_ack: on_message_out(tp, offset, message) + except StopAsyncIteration: # We are not allowed to propagate StopAsyncIteration in __aiter__ # (if we do, it'll be converted to RuntimeError by CPython). diff --git a/tests/functional/test_streams.py b/tests/functional/test_streams.py index 84ae325e8..145037c5e 100644 --- a/tests/functional/test_streams.py +++ b/tests/functional/test_streams.py @@ -222,6 +222,19 @@ async def myfilter(value): assert i == 3 +@pytest.mark.asyncio +async def test_stream_filter_acks_filtered_out_messages(app, event_loop): + """ + Test the filter function acknowledges the filtered out + messages regardless of the ack setting. + """ + values = [1000, 3000, 99, 5000, 3, 9999] + async with app.stream(values).filter(lambda x: x > 1000) as stream: + async for event in stream.events(): + assert event.value > 1000 + assert len(app.consumer.unacked) == 0 + + @pytest.mark.asyncio async def test_events(app): async with new_stream(app) as stream: