Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17448: New consumer seek should update positions in background thread #17075

Merged
merged 1 commit into from
Sep 11, 2024

Conversation

FrankYang0529
Copy link
Member

The AsyncKafkaConsumer#seek uses SubscriptionState#seekUnvalidated which calls SubscriptionState#assignedState. If we call it in app thread, it may have race condition with background thread. Move SubscriptionState#seekUnvalidated to background thread to avoid concurrent write.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@kirktrue kirktrue added consumer KIP-848 The Next Generation of the Consumer Rebalance Protocol ctr Consumer Threading Refactor (KIP-848) labels Sep 3, 2024
Copy link
Collaborator

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, @FrankYang0529!

I think the implementation is sound. I just have the one suggestion to defer the creation of the FetchPosition until the ApplicationEventProcessor.process() invocation. This makes the calls in AsyncKafkaConsumer cleaner (IMO), and also keeps the call to ConsumerMetadata.currentLeader() as close as possible to the seekUnvalidated() call.

Thanks!


public class SeekUnvalidatedEvent extends CompletableApplicationEvent<Void> {
private final TopicPartition partition;
private final SubscriptionState.FetchPosition position;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just my preference, but I'd rather not pass the FetchPosition around in the event. Could we instead include the following directly in the event:

  • TopicPartition partition
  • long offset
  • Optional<Integer> offsetEpoch

The ApplicationEventProcessor has access to the ConsumerMetadata object, so we could fetch the current leader there, in one place, instead of having it twice in the AsyncKafkaConsumer.


private void process(final SeekUnvalidatedEvent event) {
try {
subscriptions.seekUnvalidated(event.partition(), event.position());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we update the event class, we can then move more of the logic here:

Suggested change
subscriptions.seekUnvalidated(event.partition(), event.position());
SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
event.offset(),
event.offsetEpoch(),
metadata.currentLeader(event.partition())
);
subscriptions.seekUnvalidated(event.partition(), newPosition);

Comment on lines 828 to 822
SeekUnvalidatedEvent seekUnvalidatedEventEvent = new SeekUnvalidatedEvent(calculateDeadlineMs(timer), partition, newPosition);
applicationEventHandler.addAndGet(seekUnvalidatedEventEvent);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per the other suggestions, we could make this a bit more succinct:

Suggested change
SeekUnvalidatedEvent seekUnvalidatedEventEvent = new SeekUnvalidatedEvent(calculateDeadlineMs(timer), partition, newPosition);
applicationEventHandler.addAndGet(seekUnvalidatedEventEvent);
SeekUnvalidatedEvent event = new SeekUnvalidatedEvent(
calculateDeadlineMs(timer),
partition,
offsetAndMetadata.offset(),
offsetAndMetadata.leaderEpoch()
);
applicationEventHandler.addAndGet(event);

Comment on lines 798 to 796
SeekUnvalidatedEvent seekUnvalidatedEventEvent = new SeekUnvalidatedEvent(calculateDeadlineMs(timer), partition, newPosition);
applicationEventHandler.addAndGet(seekUnvalidatedEventEvent);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per the other suggestions, we could defer the creation of the SubscriptionState.FetchPosition until the ApplicationEventProcessor processor method to make this call a bit more succinct:

Suggested change
SeekUnvalidatedEvent seekUnvalidatedEventEvent = new SeekUnvalidatedEvent(calculateDeadlineMs(timer), partition, newPosition);
applicationEventHandler.addAndGet(seekUnvalidatedEventEvent);
SeekUnvalidatedEvent event = new SeekUnvalidatedEvent(
calculateDeadlineMs(timer),
partition,
offset,
Optional.empty(), // This will ensure we skip validation
);
applicationEventHandler.addAndGet(event);

@kirktrue
Copy link
Collaborator

kirktrue commented Sep 3, 2024

Also, could you put a sentence or two description in the event class’ Javadoc? Thanks!

@FrankYang0529
Copy link
Member Author

@kirktrue, thanks for reviewing and suggestions. I addressed all comments and add Javadoc for the SeekUnvalidatedEvent. Could you take a look when you have time? Thank you.


@Override
protected String toStringBase() {
return super.toStringBase() + ", partition=" + partition + ", offset=" + offset + ", offsetEpoch=" + offsetEpoch;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that offsetEpoch is an Optional object, won't this print something like "offsetEpoch=Optional(10)" or "offsetEpoch=Optional.empty"? If so, I find it confusing.

We could check offsetEpoch.isPresent, and only add "offsetsEpoch=.." to the output string if we have a value for it, seems clearer as output. What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. Updated it.

@@ -183,6 +186,38 @@ public void testAssignmentChangeEventWithException() {
assertInstanceOf(IllegalStateException.class, e.getCause());
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we get any value from these params? I expect that having or not a group ID does not change the course of this test that is only about seeking to a position

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think we don't need to test both situations for this case. Thanks.

}

@ParameterizedTest
@ValueSource(booleans = {true, false})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@lianetm
Copy link
Collaborator

lianetm commented Sep 6, 2024

Hey @FrankYang0529 , thanks for the patch! Took a full pass, just some minor comments.

Comment on lines 54 to 56
return offsetEpoch
.map(integer -> super.toStringBase() + ", partition=" + partition + ", offset=" + offset + ", offsetEpoch=" + integer)
.orElseGet(() -> super.toStringBase() + ", partition=" + partition + ", offset=" + offset);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return offsetEpoch
.map(integer -> super.toStringBase() + ", partition=" + partition + ", offset=" + offset + ", offsetEpoch=" + integer)
.orElseGet(() -> super.toStringBase() + ", partition=" + partition + ", offset=" + offset);
return super.toStringBase()
+ ", partition=" + partition
+ ", offset=" + offset
+ offsetEpoch.map(integer -> ", offsetEpoch=" + integer).orElse("");

What about this, just to avoid duplicating what's common?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, I think it's better solution. Thanks for the suggestion. Updated it.

import java.util.Optional;

/**
* Event to perform {@link org.apache.kafka.clients.consumer.internals.SubscriptionState#seekUnvalidated(TopicPartition, SubscriptionState.FetchPosition)}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one more detail, we already import SubscriptionState on ln 19, so we can simplify here to ...link SubscriptionState#....

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated it. Thank you.

…thread

Signed-off-by: PoAn Yang <payang@apache.org>
Copy link
Collaborator

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks!

@lianetm lianetm merged commit 3f4c25f into apache:trunk Sep 11, 2024
8 of 9 checks passed
@FrankYang0529 FrankYang0529 deleted the KAFKA-17448 branch September 12, 2024 02:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
consumer ctr Consumer Threading Refactor (KIP-848) KIP-848 The Next Generation of the Consumer Rebalance Protocol
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants