-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17066: new consumer updateFetchPositions all in background thread #16885
KAFKA-17066: new consumer updateFetchPositions all in background thread #16885
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this PR, @lianetm! Very tricky 😄
I think it's the correct approach, but I'm still wrapping my head around it. I left some basic feedback, but I'll do a follow up review in a day or two.
Thanks.
// has been truncated due to a leader change. This will trigger an OffsetForLeaderEpoch | ||
// request, retrieve the partition end offsets, and validate the current position | ||
// against it. It will throw an exception if log truncation is detected. | ||
requestManagers.offsetsRequestManager.validatePositionsIfNeeded(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
validatePositionsIfNeeded()
returns a CompletableFuture
. We need to wait for it to be completed before continuing, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think it should be chained. Also, I don't think it does throw an exception if log truncation is detected. Completing the future exceptionally is not quite the same thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it throws an exception (here) when it checks for the partitions it needs to validate. Even though it does return a future, this is one of the parts I was trying to simplify with this refactoring. This is my reasoning:
- validate positions is an operation whose only purpose is to detect log truncation. So it fires a request, and when it gets a response it looks for truncation. If it detects it, it saves the exception to be thrown on the next call to validate (common concept and behaviour up to here, on the 2 consumers)
- based on the conceptual definition above, the classic consumer triggers it as an async operation and does not wait for a response to move on and attempt to reset positions with committed offsets or partition offsets
So, with the async consumer doing all the updates in the background now, seemed easy to simplify and do the same: trigger validation as an async (no waiting for the result future to complete), carry on with reset, throw log truncation if any on the next call.
Note that one fair concern with not chaining the validate request is how to ensure it won't be storming the broker with requests. That does not happen because it already sets the pace of requests to send based on the subscriptionState allowedRetries (see here). This ensures that whenever a validation request is sent, it waits up to the requestTimeout before sending a new one).
Makes sense? I could be missing something but seems to me we already get the behaviour we want without having to play with the futures here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lianetm—thanks for the explanation. We don't want to require the position validation to finish before performing the rest of the logic.
Out of curiosity, regarding "storming the broker with requests," does the OffsetRequestManager
already handle duplicate concurrent requests?
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/kafka/clients/consumer/internals/events/UpdateFetchPositionsEvent.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
Outdated
Show resolved
Hide resolved
/** | ||
* OffsetFetch request triggered to update fetch positions. The request is kept. It will be | ||
* cleared every time a response with the committed offsets is received and used to update | ||
* fetch positions. If the response cannot be used because the UpdateFetchPositions expired, | ||
* it will be kept to be used on the next attempt to update fetch positions if partitions | ||
* remain the same. | ||
*/ | ||
private FetchCommittedOffsetsEvent pendingOffsetFetchEvent; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We've introduced a lot of logic and state in this class. I'd really like to consider moving the bulk of process()
, initWithPartitionOffsetsIfNeeded()
, and initWithPartitionOffsetsIfNeeded()
to a dedicated class. I'd prefer to keep ApplicationEventProcessor
focused on dispatching events to their corresponding RequestManager
method(s).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeap, totally agree. I debated on that too. It would be a new UpdatePositionsEventProcessor
that still "dispatches events to their corresponding managers", but agree that in this case it requires more logic to achieve that.
I didn't go ahead with a separate class since the beginning just to see how others felt about this. I saw this options:
- leave it in the same processor where all events are indeed processed
- have a new
UpdatePositionsEventProcessor
(just to leave it no specific manager, and mix all the ones needed here) - have it in the offsetsRequestManager, given that most of the logic to update positions is about partition offsets, mixing in a bit of committed offsets.
I do like the 3rd one, mainly because I find it consistent with the MembershipManager, that does what it needs and may require the commitRequestManager. I felt I could be biased here of course :), but @AndrewJSchofield also suggested this option 3, so if it makes sense to all I'll go for it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Option 3 makes the most sense. I don't think we need to be afraid of making a dedicated class for this, if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, all moved to the OffsetsRequestManager (option 3) and looks much better indeed. Let me know
.../main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
Outdated
Show resolved
Hide resolved
// (with potentially a longer timeout) and stored. The event is used for the first attempt, but in the | ||
// case it times out, subsequent attempts will also use the event in order to wait for the results. | ||
if (!canReusePendingOffsetFetchEvent(initializingPartitions)) { | ||
final long deadlineMs = Math.max(updateFetchPositionsEvent.deadlineMs(), updateFetchPositionsEvent.fetchOffsetsDeadlineMs()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like we could possibly calculate the fetch offsets deadline here instead of including two deadlines in the event. Sorry, I'm kind of hung up on the two deadlines in the event 😞
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I totally get your point. The alternative was to get the config for "defaultApiTimeout" in the UpdateFetchPositionsEvent (that's what we need for the 2nd deadline). This is changing now anyways, because of the changes to address other comments, so we won't have it anymore.
} | ||
}); | ||
} catch (Exception e) { | ||
updateFetchPositionsEvent.future().completeExceptionally(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to wrap this exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uhm I wasn't completely sure about this, but after checking in detail seems to make sense to wrap here. I was concerned that maybe we would mistakenly wrap something that was expected from the API, given that we know perform in the background lots of operations that were in the app thread before. But we do handle the expected errors in the updatePositions flow and requests, and use them to complete the future). So this would really be for unexpected errors in the flow I would say, and ok to wrap. Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. This is definitely a nice design change to make in one of the more tricky areas of the consumer.
I wonder whether the timeout handling needs to work quite like this. There's the amount of time that the consumer's application thread is prepared to wait, and there's the amount of time that the offsets request manager is prepared to spend fetching and validating offsets. Unless the set of partitions to process changes, I can't see why it wouldn't just continue until it has an answer, which it can cache. We are trying to guard against a situation where KafkaConsumer.poll(Duration) is being issued repeatedly with a timeout so short that the offset management cannot complete. Couldn't the OffsetsRequestManager simply make the sequence of RPCs to complete the little dance, regardless of a timeout? I don't like caching the event, because that seems to me to apply to a single invocation from the AsyncKafkaConsumer, rather than being an embodiment of the operation in hand.
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
Outdated
Show resolved
Hide resolved
// has been truncated due to a leader change. This will trigger an OffsetForLeaderEpoch | ||
// request, retrieve the partition end offsets, and validate the current position | ||
// against it. It will throw an exception if log truncation is detected. | ||
requestManagers.offsetsRequestManager.validatePositionsIfNeeded(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think it should be chained. Also, I don't think it does throw an exception if log truncation is detected. Completing the future exceptionally is not quite the same thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates @lianetm! I did another really quick review to read through your responses. I have not yet gotten as far as reviewing the tests 😞
/** | ||
* OffsetFetch request triggered to update fetch positions. The request is kept. It will be | ||
* cleared every time a response with the committed offsets is received and used to update | ||
* fetch positions. If the response cannot be used because the UpdateFetchPositions expired, | ||
* it will be kept to be used on the next attempt to update fetch positions if partitions | ||
* remain the same. | ||
*/ | ||
private FetchCommittedOffsetsEvent pendingOffsetFetchEvent; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Option 3 makes the most sense. I don't think we need to be afraid of making a dedicated class for this, if needed.
// has been truncated due to a leader change. This will trigger an OffsetForLeaderEpoch | ||
// request, retrieve the partition end offsets, and validate the current position | ||
// against it. It will throw an exception if log truncation is detected. | ||
requestManagers.offsetsRequestManager.validatePositionsIfNeeded(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lianetm—thanks for the explanation. We don't want to require the position validation to finish before performing the rest of the logic.
Out of curiosity, regarding "storming the broker with requests," does the OffsetRequestManager
already handle duplicate concurrent requests?
Hey! I just pushed all the requested changes. The most important change is addressing @AndrewJSchofield 's comment regarding timeout handling, totally agree.
Yes, I think we could and I went ahead and did it. Basically the I'm currently adding more tests, but all existing test (unit and integration) pass, so please take a look and let me know. Thank you both! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lianetm—thanks for all of the updates! I'm scraping the bottom of the barrel with most of my change requests, sorry 😞
applicationEventHandler.addAndGet(new ResetPositionsEvent(calculateDeadlineMs(timer))); | ||
return true; | ||
UpdateFetchPositionsEvent updateFetchPositionsEvent = new UpdateFetchPositionsEvent(calculateDeadlineMs(timer)); | ||
wakeupTrigger.setActiveTask(updateFetchPositionsEvent.future()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to involve the wakeup trigger for this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To ensure that we throw the WakeupException
if we get a wakeup call while waiting for the event to complete while polling (part of the contract of poll).
This logic was already in place btw, here, closer to the OffsetFetch, but since it's all in one event now it got moved here. Makes sense?
.../main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
Outdated
Show resolved
Hide resolved
private boolean canReusePendingOffsetFetchEvent(Set<TopicPartition> partitions) { | ||
if (pendingOffsetFetchEvent == null) { | ||
return false; | ||
} | ||
|
||
return pendingOffsetFetchEvent.requestedPartitions.equals(partitions); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The corresponding code in AsyncKafkaConsumer.canReusePendingOffsetFetchEvent
also checked that the pendingOffsetFetchEvent
’s deadline hadn't passed. Is that no longer a concern after the refactor?
private MockTime time; | ||
private final Time time = mock(Time.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity, what prompted this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a simplification, I noticed none of the tests really need to advance the time (that's when we usually bring in the custom Time implementation that MockTime provides), so a mock is enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update. I do like putting the logic into the offsets request manager although it's getting a bit hard to follow now.
// Need to generate a new request to fetch committed offsets | ||
final long fetchCommittedDeadlineMs = Math.max(deadlineMs, time.milliseconds() + defaultApiTimeoutMs); | ||
fetchCommittedFuture = commitRequestManager.fetchOffsets(initializingPartitions, fetchCommittedDeadlineMs); | ||
pendingOffsetFetchEvent = new PendingFetchCommittedRequest(initializingPartitions, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This would be more legible as a single line.
@@ -194,14 +402,15 @@ public CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> fetchO | |||
* an error is received in the response, it will be saved to be thrown on the next call to | |||
* this function (ex. {@link org.apache.kafka.common.errors.TopicAuthorizationException}) | |||
*/ | |||
public CompletableFuture<Void> resetPositionsIfNeeded() { | |||
protected CompletableFuture<Void> resetPositionsIfNeeded() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I always feel that protected
has an implication of subclassing. I don't believe this is being subclassed, so I'd prefer to see package-private which should give accessibility for testing. Of course, this is a matter of taste so feel free to ignore this :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I totally agree with you on the subclassing expectation, I missed it here. Changed.
// positions. | ||
applicationEventHandler.addAndGet(new ResetPositionsEvent(calculateDeadlineMs(timer))); | ||
return true; | ||
UpdateFetchPositionsEvent updateFetchPositionsEvent = new UpdateFetchPositionsEvent(calculateDeadlineMs(timer)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe that this will create an UpdateFetchPositionsEvent
for every iteration of the inner loop in AsyncKafkaConsumer.poll(Duration)
. Previously, the cachedSubscriptionsHasAllFetchPositions
was an attempt at optimising this area, which was probably not entirely satisfactory but it was well intentioned. Generally, once the positions have been validated, there's no need for any of this unless something nasty happens. I feel there's an optimisation waiting here. We know when the set of assigned partitions changed. We also know when an error such as log truncation occurred. We know when an UpdateFetchPositionsEvent
completed successfully. So, I think we probably can be a bit more deliberate about creating the event only when it's needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. The trick is we do need to generate an event on every iteration of the poll just because we need to go to the background thread (even if we end up finding that we have all the positions already). This is to:
- Get errors that may have been found in the background on a previous update positions attempt
- Trigger validate positions (we do it on every poll as the classic consumer, and the validate common logic internally determines if it really needs to do something or not)
- check
subscriptions.hasAllFetchPositions()
(this we want to do it in the background to avoid races on the assignment that it checks)
And I'll stop here, just to point out that this is what we expect will happen on every poll iteration, under happy/regular scenarios (all partitions have valid positions). This is what that UpdateFetchPositionsEvent
is needed for, (not really updating anything, but doing what it needs to determine that it doesn't need to). It's probably more of a MaybeUpdateFetchPositionsEvents (weird or better?)
Then, to cater for the scenarios where something fails/changes, the same event gets more interesting. If it does not "hasAllFetchPositions", it will attempt to retrieve committed offsets or partition offsets as needed, the meaty part.
This is the reasoning behind having an event on every poll, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me. Thanks for explanation. We're well on the way to removing the SubscriptionState references from the application thread entirely.
return true; | ||
UpdateFetchPositionsEvent updateFetchPositionsEvent = new UpdateFetchPositionsEvent(calculateDeadlineMs(timer)); | ||
wakeupTrigger.setActiveTask(updateFetchPositionsEvent.future()); | ||
cachedSubscriptionHasAllFetchPositions = applicationEventHandler.addAndGet(updateFetchPositionsEvent); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, this variable was being set based on subscriptions.hasAllFetchPositions()
and did not rely on successfully retrieving the result of the new event. I wonder whether it has become stickier as a result which might not be desirable. Perhaps you should set it to false before waiting for the result of the event so that it's false if the result does not arrive. What do you think the "default" value would be here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. The default is false but it could definitely stick to a true result if the UpdateFetchPositions fails after (wrong). I added your suggestion of setting it to false before waiting, seems sensible to me.
return false; | ||
} | ||
|
||
public CompletableFuture<Boolean> updateFetchPositions(long deadlineMs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This CompletableFuture<Boolean>
is a slightly tricky thing. The boolean I think refers to whether we have all of the offset information we need. But there are a lot of "results" flying around and I'm finding it tricky to follow this boolean through the various methods. I wonder whether a specific variable name for the future which contains the flag which indicates whether we have all of the offset information used at all points whether it's currently a variable or a method argument would help my tiny brain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeap, agree is not straightforward. The boolean only indicates if the event hasAllFetchPositions
when it starts (meaning that it didn't have to fetch offsets). The results flying around really have no direct impact on that result (only indirect, setting a position, so next time the hasAllFetchPositions will find it).
Definitely having specific var names for the result will help, going for it now...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lianetm this is a nice patch, and I have only few minor comments :)
return false; | ||
} | ||
|
||
return pendingOffsetFetchEvent.requestedPartitions.equals(partitions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use containsAll
instead of equals
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it makes sense to me. Just for the record, when the changes for caching the request were initially implemented (in the app thread), a separate Jira was created for this (https://issues.apache.org/jira/browse/KAFKA-16966). Still, I'm ok with changing the containsAll
here, and keep that other Jira maybe only to review this in the classic consumer (I won't get there on this PR)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get squeamish about changing things like this as part of a refactor. I'm OK to change it here, but as @lianetm mentioned, KAFKA-16966 was split out separately as an optimization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we can address that in KAFKA-16966
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
Show resolved
Hide resolved
// been assigned a position manually) | ||
if (error == null) { | ||
Map<TopicPartition, OffsetAndMetadata> offsetsToApply = offsetsForInitializingPartitions(offsets); | ||
refreshCommittedOffsets(offsetsToApply, metadata, subscriptionState); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it legal to call refreshCommittedOffsets
multi times when there are many calls reusing the "first attempt"? i.e we register many actions to whenComplete
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch. I would say it's harmless because it has the check to be no-op if the partitions already got positions, but it's definitely conceptually wrong. We want to trigger the OffsetFetch
, keep it as pending, and refreshCommittedOffsets
only once when we get a response (what we need to do "multi times" is not the refreshCommittedOffsets, it's only to call "complete" on all the many attempts that may be waiting for that operation). I made the change.
Hey @chia7712 , thanks for the review! All comments addressed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lianetm thanks for updated patch!
return false; | ||
} | ||
|
||
return pendingOffsetFetchEvent.requestedPartitions.containsAll(partitions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussion (#16885 (comment)), could you please keep using equals
😄
// this case, on the first attempt to fetch the committed offsets, a FetchCommittedOffsetsEvent is created | ||
// (with potentially a longer timeout) and stored. The event is used for the first attempt, but in the | ||
// case it times out, subsequent attempts will also use the event in order to wait for the results. | ||
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> refreshWithCommitted; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we don't need refreshWithCommitted
.
if (!canReusePendingOffsetFetchEvent(initializingPartitions)) {
// Generate a new OffsetFetch request and update positions when a response is received
final long fetchCommittedDeadlineMs = Math.max(deadlineMs, time.milliseconds() + defaultApiTimeoutMs);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchOffsets =
commitRequestManager.fetchOffsets(initializingPartitions, fetchCommittedDeadlineMs);
pendingOffsetFetchEvent = new PendingFetchCommittedRequest(initializingPartitions,
refreshOffsetsAndCompleteResultOnResponseReceived(fetchOffsets, result));
} else {
// Reuse pending OffsetFetch request
pendingOffsetFetchEvent.result.whenComplete((__, error) -> {
if (error == null) {
result.complete(null);
} else {
result.completeExceptionally(error);
}
});
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was to ensure that we only move into resetting positions (with partition offset) after refreshing offsets (not after receiving the response to the OffsetFetch).
That being said, I did remove the refreshWithCommitted
and followed your suggestion here + considering the above, and it did lead to what seems to me a better version, the flow seems clearer. Thanks! Let me know.
final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchCommittedFuture, | ||
final CompletableFuture<Void> result) { | ||
return fetchCommittedFuture.whenComplete((offsets, error) -> { | ||
pendingOffsetFetchEvent = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if fetchCommittedFuture
is a already completed future, whenComplete
will be evaluated before return. That means pendingOffsetFetchEvent
will never be null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this changed shape with the latest changes, so I believe we don't have such situation anymore. Now the pending goes to null here (even if the OffsetFetch future is somehow done already).
Hello @chia7712, thanks for the comments! All addressed. |
Hey @chia7712 , I just added a small fix 2957cc4 after noticing some suspicious failures in the tests after the previous changes. I was indeed missing the fact that we need to allow to reset positions after refreshing committed offsets (even if no offset-fetch request is generated). That was affecting several tests (probably the best one to see the flow would be |
Last build completed with 3 unrelated failures. I just rebased and will check the next one too, but I would say this is ready for another look when you have a chance @chia7712. Thanks a lot! |
refreshOffsetsAndResetPositionsStillMissing(offsets, error, result); | ||
}); | ||
pendingOffsetFetchEvent = new PendingFetchCommittedRequest(initializingPartitions, fetchOffsets); | ||
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchOffsetsAndRefresh = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I introduced this var here just to make sure that we when reusing a fetch, we only complete the whole operation and move onto reset positions once we have applied the retrieved offsets. Without this var, I was adding 2 whenComplete
to the same future (ln 376 and ln 384), and those would execute in reverse order of addition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lianetm thanks for updates.
Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate = | ||
offsetFetcherUtils.getPartitionsToValidate(); | ||
void validatePositionsIfNeeded() { | ||
Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate = offsetFetcherUtils.getPartitionsToValidate(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
offsetFetcherUtils.getPartitionsToValidate
can throw exception from cache, so should we propagate the exception by CompletableFuture
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're right, and that's the intention but it's done on the catch further down (exception here would short-circuit the flow as we want, and complete exceptionally on catch).
kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
Line 261 in 021a290
result.completeExceptionally(maybeWrapAsKafkaException(e)); |
Makes sense?
} | ||
|
||
return sendOffsetsForLeaderEpochRequestsAndValidatePositions(partitionsToValidate); | ||
sendOffsetsForLeaderEpochRequestsAndValidatePositions(partitionsToValidate); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return value of sendOffsetsForLeaderEpochRequestsAndValidatePositions
is useless now. Could you please do a bit cleanup for it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, done
Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate = | ||
offsetFetcherUtils.getPartitionsToValidate(); | ||
void validatePositionsIfNeeded() { | ||
Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate = offsetFetcherUtils.getPartitionsToValidate(); | ||
if (partitionsToValidate.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How to make sure all partitions get validated already? the partitionsToValidate
could be empty if there are in-flight OffsetsForLeaderEpochRequest
, right?
kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
Line 202 in b436499
.partitionsNeedingValidation(time.milliseconds()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're totally right that here we could have empty partitions to validate (only because we may have some but there's already a OffsetsForLeaderEpochRequest
in-flight for them). But my expectation is that even in that case, we make sure that all partitions end up being validated because of how we handle the OffsetsForLeaderEpochRequest response :
- if it succeeds, partitions get validated
- if it fails, the next allowed retry is updated for those partitions, so they are validated on the next poll/updateFetchPositions after the backoff expires (
partitionsToValidate
won't be empty on that next run)
kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
Line 396 in 02e3f7c
subscriptionState.requestFailed(fetchPositions.keySet(), time.milliseconds() + retryBackoffMs);
Makes sense?
Hello @chia7712 , thanks for the review! All comments addressed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Thanks @chia7712! Build completed with 3 unrelated failures. (filed https://issues.apache.org/jira/browse/KAFKA-17554 for the one that is on the ConsumerNetworkClientTest, classic consumer, that I see has been flaky for a while but wasn't reported yet) |
Fix for the known issue that the logic for updating fetch positions in the new consumer was being performed partly in the app thread, party in the background thread, potentially leading to race conditions on the subscription state.
This PR moves the logic for updateFetchPositions to the background thread as a single event (instead of triggering separate events to validate, fetchOffsets, listOffsets). A new UpdateFetchPositionsEvent is triggered from the app thread and processed in the background, where it performs those same operations and updates the subscription state accordingly, without blocking the background thread.
This PR maintains the existing logic for keeping a pendingOffsetFetchRequest that does not complete within the lifetime of the updateFetchPositions attempt, and may be used on the next call to updateFetchPositions.