-
Notifications
You must be signed in to change notification settings - Fork 13.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17448: New consumer seek should update positions in background thread #17075
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, @FrankYang0529!
I think the implementation is sound. I just have the one suggestion to defer the creation of the FetchPosition
until the ApplicationEventProcessor.process()
invocation. This makes the calls in AsyncKafkaConsumer
cleaner (IMO), and also keeps the call to ConsumerMetadata.currentLeader()
as close as possible to the seekUnvalidated()
call.
Thanks!
|
||
public class SeekUnvalidatedEvent extends CompletableApplicationEvent<Void> { | ||
private final TopicPartition partition; | ||
private final SubscriptionState.FetchPosition position; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just my preference, but I'd rather not pass the FetchPosition
around in the event. Could we instead include the following directly in the event:
TopicPartition partition
long offset
Optional<Integer> offsetEpoch
The ApplicationEventProcessor
has access to the ConsumerMetadata
object, so we could fetch the current leader there, in one place, instead of having it twice in the AsyncKafkaConsumer
.
|
||
private void process(final SeekUnvalidatedEvent event) { | ||
try { | ||
subscriptions.seekUnvalidated(event.partition(), event.position()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we update the event class, we can then move more of the logic here:
subscriptions.seekUnvalidated(event.partition(), event.position()); | |
SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition( | |
event.offset(), | |
event.offsetEpoch(), | |
metadata.currentLeader(event.partition()) | |
); | |
subscriptions.seekUnvalidated(event.partition(), newPosition); |
SeekUnvalidatedEvent seekUnvalidatedEventEvent = new SeekUnvalidatedEvent(calculateDeadlineMs(timer), partition, newPosition); | ||
applicationEventHandler.addAndGet(seekUnvalidatedEventEvent); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per the other suggestions, we could make this a bit more succinct:
SeekUnvalidatedEvent seekUnvalidatedEventEvent = new SeekUnvalidatedEvent(calculateDeadlineMs(timer), partition, newPosition); | |
applicationEventHandler.addAndGet(seekUnvalidatedEventEvent); | |
SeekUnvalidatedEvent event = new SeekUnvalidatedEvent( | |
calculateDeadlineMs(timer), | |
partition, | |
offsetAndMetadata.offset(), | |
offsetAndMetadata.leaderEpoch() | |
); | |
applicationEventHandler.addAndGet(event); |
SeekUnvalidatedEvent seekUnvalidatedEventEvent = new SeekUnvalidatedEvent(calculateDeadlineMs(timer), partition, newPosition); | ||
applicationEventHandler.addAndGet(seekUnvalidatedEventEvent); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per the other suggestions, we could defer the creation of the SubscriptionState.FetchPosition
until the ApplicationEventProcessor
processor method to make this call a bit more succinct:
SeekUnvalidatedEvent seekUnvalidatedEventEvent = new SeekUnvalidatedEvent(calculateDeadlineMs(timer), partition, newPosition); | |
applicationEventHandler.addAndGet(seekUnvalidatedEventEvent); | |
SeekUnvalidatedEvent event = new SeekUnvalidatedEvent( | |
calculateDeadlineMs(timer), | |
partition, | |
offset, | |
Optional.empty(), // This will ensure we skip validation | |
); | |
applicationEventHandler.addAndGet(event); |
Also, could you put a sentence or two description in the event class’ Javadoc? Thanks! |
9b3c61c
to
a986d50
Compare
@kirktrue, thanks for reviewing and suggestions. I addressed all comments and add Javadoc for the |
|
||
@Override | ||
protected String toStringBase() { | ||
return super.toStringBase() + ", partition=" + partition + ", offset=" + offset + ", offsetEpoch=" + offsetEpoch; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given that offsetEpoch is an Optional object, won't this print something like "offsetEpoch=Optional(10)" or "offsetEpoch=Optional.empty"? If so, I find it confusing.
We could check offsetEpoch.isPresent, and only add "offsetsEpoch=.." to the output string if we have a value for it, seems clearer as output. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review. Updated it.
@@ -183,6 +186,38 @@ public void testAssignmentChangeEventWithException() { | |||
assertInstanceOf(IllegalStateException.class, e.getCause()); | |||
} | |||
|
|||
@ParameterizedTest | |||
@ValueSource(booleans = {true, false}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we get any value from these params? I expect that having or not a group ID does not change the course of this test that is only about seeking to a position
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think we don't need to test both situations for this case. Thanks.
} | ||
|
||
@ParameterizedTest | ||
@ValueSource(booleans = {true, false}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
Hey @FrankYang0529 , thanks for the patch! Took a full pass, just some minor comments. |
a986d50
to
7f56043
Compare
return offsetEpoch | ||
.map(integer -> super.toStringBase() + ", partition=" + partition + ", offset=" + offset + ", offsetEpoch=" + integer) | ||
.orElseGet(() -> super.toStringBase() + ", partition=" + partition + ", offset=" + offset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return offsetEpoch | |
.map(integer -> super.toStringBase() + ", partition=" + partition + ", offset=" + offset + ", offsetEpoch=" + integer) | |
.orElseGet(() -> super.toStringBase() + ", partition=" + partition + ", offset=" + offset); | |
return super.toStringBase() | |
+ ", partition=" + partition | |
+ ", offset=" + offset | |
+ offsetEpoch.map(integer -> ", offsetEpoch=" + integer).orElse(""); |
What about this, just to avoid duplicating what's common?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, I think it's better solution. Thanks for the suggestion. Updated it.
7f56043
to
5b0727a
Compare
import java.util.Optional; | ||
|
||
/** | ||
* Event to perform {@link org.apache.kafka.clients.consumer.internals.SubscriptionState#seekUnvalidated(TopicPartition, SubscriptionState.FetchPosition)} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one more detail, we already import SubscriptionState on ln 19, so we can simplify here to ...link SubscriptionState#....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated it. Thank you.
…thread Signed-off-by: PoAn Yang <payang@apache.org>
5b0727a
to
2ef030b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks!
The
AsyncKafkaConsumer#seek
usesSubscriptionState#seekUnvalidated
which callsSubscriptionState#assignedState
. If we call it in app thread, it may have race condition with background thread. MoveSubscriptionState#seekUnvalidated
to background thread to avoid concurrent write.Committer Checklist (excluded from commit message)