Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Messages are dropped by the Consumer when buffer full condition occurs #88

Closed
2 tasks done
bobh66 opened this issue Jan 28, 2021 · 0 comments · Fixed by #89
Closed
2 tasks done

Messages are dropped by the Consumer when buffer full condition occurs #88

bobh66 opened this issue Jan 28, 2021 · 0 comments · Fixed by #89

Comments

@bobh66
Copy link
Contributor

bobh66 commented Jan 28, 2021

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Running the following code will only process some of the messages generated by the task. The problem is that the stream buffer fills up and removes the topic from the active partitions, and when Consumer.getmany() is iterating over the list of already-received messages, and see that the topic is no longer in the list of active partitions, it drops the remaining messages.

The fix is to include self._buffered_partitions in the check in Consumer.getmany() so that it will continue to process the already-received messages even when the Consumer is no longer actively fetching new messages from the topic because the buffer is full. When the buffer full condition clears, new messages will be processed as usual.

import faust

app = faust.App(
    'hello-world3',
    broker='kafka://localhost:29092',
    value_serializer='raw',
#    stream_buffer_maxsize=1000000,
)

greetings_topic = app.topic('greetings')

@app.agent(greetings_topic)
async def greet(greetings):
    count = 0
    async for greeting in greetings:
        count += 1
        print(count)

@app.task()
async def say_hello():
    count = 0
    for i in range(0, 40000):
        count += 1
        await greetings_topic.send(key='key', value=f'hello{count}')

Expected behavior

The agent should continue to receive messages when the Consumer buffer is considered "full" and backpressure has been applied

Actual behavior

The Consumer stops sending the already-received messages to the agents/Stream when backpressure is being applied to the topic

Full traceback

Versions

  • Python version - 3.8
  • Faust version - 0.4.3
  • Operating system - Linux
  • Kafka version - N/A
  • RocksDB version (if applicable) N/A
patkivikram added a commit that referenced this issue Jan 29, 2021
* Restore tox configuration and enable unit tests (#79)

* Restore tox configuration and enable unit tests (#79)

* Restore tox configuration and enable unit tests (#79)

* Restore tox configuration and enable unit tests (#79)

* Fix rocksdb tests

* Fix dropped messages when topic backpressure is enabled (#88) (#89)

* Restore tox configuration and enable unit tests (#79)

* Fix rocksdb tests

Co-authored-by: Vikram Patki <54442035+patkivikram@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant