Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Avoid unnecessarily recreating processing strategy #62

Merged
merged 2 commits into from
Apr 26, 2022

Conversation

lynnagara
Copy link
Member

If the assignment or revocation callback is called with no partitions
we don't need to close and recreate the processing strategy. This
happens frequently with incremental rebalancing.

If the assignment or revocation callback is called with no partitions
we don't need to close and recreate the processing strategy. This
happens frequently with incremental rebalancing.
@lynnagara lynnagara requested a review from a team as a code owner April 21, 2022 23:34
@@ -79,21 +79,24 @@ def _create_strategy() -> None:

def on_partitions_assigned(partitions: Mapping[Partition, int]) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The signature of this method says that partitions cannot be optional. If what you mention is that partitions could be None then can you update the signatures of these functions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it would be None, just an empty dictionary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's an empty dictionary

@@ -79,21 +79,24 @@ def _create_strategy() -> None:

def on_partitions_assigned(partitions: Mapping[Partition, int]) -> None:
logger.info("New partitions assigned: %r", partitions)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this go inside the if partitions check as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's useful to know when the partition assigned callback gets called even if it's empty

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I was under the impression that partitions could be optional. In that case, it would cause an exception.

if partitions:
if self.__processing_strategy is not None:
_close_strategy()
_create_strategy()

def on_partitions_revoked(partitions: Sequence[Partition]) -> None:
logger.info("Partitions revoked: %r", partitions)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above

if self.__processing_strategy is not None:
_close_strategy()
_create_strategy()
if partitions:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A hypothetical scenario.

Let's say you get new partitions assigned while the consumer is shutting down. That means you have set shutdown_requested to True over here https://github.com/getsentry/arroyo/blob/main/arroyo/processing/processor.py#L223. Is there still a need to keep creating the strategy again? Or can we check for the flag and do nothing in those scenarios?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent some time deliberating about this. I decided not to add it because it shouldn't be necessary. During shutdown all partitions should have be revoked and thus the strategy will not be recreated anyway (since self.__consumer.tell().keys() - set(partitions) should be falsy.

@lynnagara
Copy link
Member Author

@nikhars Mind taking another look?

@lynnagara lynnagara merged commit e21a68d into main Apr 26, 2022
@lynnagara lynnagara deleted the avoid-unnecessarily-recreating-strategy branch April 26, 2022 20:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants