-
-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Avoid unnecessarily recreating processing strategy #62
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -79,21 +79,24 @@ def _create_strategy() -> None: | |
|
||
def on_partitions_assigned(partitions: Mapping[Partition, int]) -> None: | ||
logger.info("New partitions assigned: %r", partitions) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't this go inside the if partitions check as well? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's useful to know when the partition assigned callback gets called even if it's empty There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok. I was under the impression that partitions could be optional. In that case, it would cause an exception. |
||
if self.__processing_strategy is not None: | ||
_close_strategy() | ||
_create_strategy() | ||
if partitions: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A hypothetical scenario. Let's say you get new partitions assigned while the consumer is shutting down. That means you have set shutdown_requested to True over here https://github.com/getsentry/arroyo/blob/main/arroyo/processing/processor.py#L223. Is there still a need to keep creating the strategy again? Or can we check for the flag and do nothing in those scenarios? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I spent some time deliberating about this. I decided not to add it because it shouldn't be necessary. During shutdown all partitions should have be revoked and thus the strategy will not be recreated anyway (since |
||
if self.__processing_strategy is not None: | ||
_close_strategy() | ||
_create_strategy() | ||
|
||
def on_partitions_revoked(partitions: Sequence[Partition]) -> None: | ||
logger.info("Partitions revoked: %r", partitions) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment as above |
||
_close_strategy() | ||
|
||
# Recreate the strategy if the consumer still has other partitions | ||
# assigned and is not closed or errored | ||
try: | ||
if self.__consumer.tell().keys() - set(partitions): | ||
_create_strategy() | ||
except RuntimeError: | ||
pass | ||
if partitions: | ||
_close_strategy() | ||
|
||
# Recreate the strategy if the consumer still has other partitions | ||
# assigned and is not closed or errored | ||
try: | ||
if self.__consumer.tell().keys() - set(partitions): | ||
_create_strategy() | ||
except RuntimeError: | ||
pass | ||
|
||
self.__consumer.subscribe( | ||
[topic], on_assign=on_partitions_assigned, on_revoke=on_partitions_revoked | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The signature of this method says that partitions cannot be optional. If what you mention is that partitions could be
None
then can you update the signatures of these functionsThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it would be
None
, just an empty dictionary.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it's an empty dictionary