-
-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Avoid unnecessarily recreating processing strategy #62
Conversation
If the assignment or revocation callback is called with no partitions we don't need to close and recreate the processing strategy. This happens frequently with incremental rebalancing.
@@ -79,21 +79,24 @@ def _create_strategy() -> None: | |||
|
|||
def on_partitions_assigned(partitions: Mapping[Partition, int]) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The signature of this method says that partitions cannot be optional. If what you mention is that partitions could be None
then can you update the signatures of these functions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it would be None
, just an empty dictionary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it's an empty dictionary
@@ -79,21 +79,24 @@ def _create_strategy() -> None: | |||
|
|||
def on_partitions_assigned(partitions: Mapping[Partition, int]) -> None: | |||
logger.info("New partitions assigned: %r", partitions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this go inside the if partitions check as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's useful to know when the partition assigned callback gets called even if it's empty
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I was under the impression that partitions could be optional. In that case, it would cause an exception.
if partitions: | ||
if self.__processing_strategy is not None: | ||
_close_strategy() | ||
_create_strategy() | ||
|
||
def on_partitions_revoked(partitions: Sequence[Partition]) -> None: | ||
logger.info("Partitions revoked: %r", partitions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above
if self.__processing_strategy is not None: | ||
_close_strategy() | ||
_create_strategy() | ||
if partitions: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A hypothetical scenario.
Let's say you get new partitions assigned while the consumer is shutting down. That means you have set shutdown_requested to True over here https://github.com/getsentry/arroyo/blob/main/arroyo/processing/processor.py#L223. Is there still a need to keep creating the strategy again? Or can we check for the flag and do nothing in those scenarios?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I spent some time deliberating about this. I decided not to add it because it shouldn't be necessary. During shutdown all partitions should have be revoked and thus the strategy will not be recreated anyway (since self.__consumer.tell().keys() - set(partitions)
should be falsy.
@nikhars Mind taking another look? |
If the assignment or revocation callback is called with no partitions
we don't need to close and recreate the processing strategy. This
happens frequently with incremental rebalancing.