Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Avoid unnecessarily recreating processing strategy #62

Merged
merged 2 commits into from
Apr 26, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions arroyo/processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,21 +79,24 @@ def _create_strategy() -> None:

def on_partitions_assigned(partitions: Mapping[Partition, int]) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The signature of this method says that partitions cannot be optional. If what you mention is that partitions could be None then can you update the signatures of these functions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it would be None, just an empty dictionary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's an empty dictionary

logger.info("New partitions assigned: %r", partitions)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this go inside the if partitions check as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's useful to know when the partition assigned callback gets called even if it's empty

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I was under the impression that partitions could be optional. In that case, it would cause an exception.

if self.__processing_strategy is not None:
_close_strategy()
_create_strategy()
if partitions:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A hypothetical scenario.

Let's say you get new partitions assigned while the consumer is shutting down. That means you have set shutdown_requested to True over here https://github.com/getsentry/arroyo/blob/main/arroyo/processing/processor.py#L223. Is there still a need to keep creating the strategy again? Or can we check for the flag and do nothing in those scenarios?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent some time deliberating about this. I decided not to add it because it shouldn't be necessary. During shutdown all partitions should have be revoked and thus the strategy will not be recreated anyway (since self.__consumer.tell().keys() - set(partitions) should be falsy.

if self.__processing_strategy is not None:
_close_strategy()
_create_strategy()

def on_partitions_revoked(partitions: Sequence[Partition]) -> None:
logger.info("Partitions revoked: %r", partitions)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above

_close_strategy()

# Recreate the strategy if the consumer still has other partitions
# assigned and is not closed or errored
try:
if self.__consumer.tell().keys() - set(partitions):
_create_strategy()
except RuntimeError:
pass
if partitions:
_close_strategy()

# Recreate the strategy if the consumer still has other partitions
# assigned and is not closed or errored
try:
if self.__consumer.tell().keys() - set(partitions):
_create_strategy()
except RuntimeError:
pass

self.__consumer.subscribe(
[topic], on_assign=on_partitions_assigned, on_revoke=on_partitions_revoked
Expand Down