Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop fetching when flow stops #253

Merged

Conversation

ekerstens
Copy link
Contributor

Description

Changes

  1. During recovery, resume the consumer after the app. When the app flow_control resumes it clears all queues, and resuming the consumer after the flow control makes sure that we don't write data to the app which is then discarded.
  2. Added logic to the consumer to cancel the fetching coroutine during a rebalance.
  3. Moved the wait for flow to resume from consumer._wait_next_records to consumer.getmany to avoid a lock during rebalance
  4. During a rebalance, the app waits for the consumer to cancel fetching before continuing, to avoid potential issues of discarding a fetch which occurs after assignment.

# Ignore records fetched while flow was suspended
if coro is suspend_flow:
records = {}
break
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be continue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reason for break is that even if the fetcher has completed (I have seen cases where both happen around the same time) to ignore the result and return nothing

@codecov-commenter
Copy link

codecov-commenter commented Jan 7, 2022

Codecov Report

Merging #253 (032436b) into master (a0e9a31) will decrease coverage by 0.04%.
The diff coverage is 79.31%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #253      +/-   ##
==========================================
- Coverage   94.26%   94.22%   -0.05%     
==========================================
  Files         100      100              
  Lines       10814    10834      +20     
  Branches     1477     1482       +5     
==========================================
+ Hits        10194    10208      +14     
- Misses        549      553       +4     
- Partials       71       73       +2     
Impacted Files Coverage Δ
faust/transport/consumer.py 94.24% <77.77%> (-0.90%) ⬇️
faust/app/base.py 99.51% <100.00%> (+<0.01%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update a0e9a31...032436b. Read the comment docs.

await self.sleep(1)
return records, active_partitions
finally:
self.not_waiting_next_records.set()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want to process the exception?

@patkivikram patkivikram merged commit 2d8bdb9 into faust-streaming:master Jan 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants