-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement split enumeration back pressure for FTE #15514
Implement split enumeration back pressure for FTE #15514
Conversation
@@ -974,6 +935,28 @@ private StateChangeListener<TaskStatus> createExchangeSinkInstanceHandleUpdateRe | |||
}; | |||
} | |||
|
|||
private void loadMoreTaskDescriptorsIfNecessary() | |||
{ | |||
if (schedulingQueue.size() - schedulingQueue.getSpeculativeCount() < 100) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: make 100 configurable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct me if I am wrong here. If we have arbitrary distribution stage with a replicated source then we do not have really much control over how many "speculative" task descriptors we would produce before we enumarte all the splits for the replicated source.
Only after the split enumaration for replicated source completes the priority for tasks will change from "speculative" to "non-speculative".
Probably a corner case which is not something to bother but I would like to be sure I understand the logic here correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. You are right. I think there's even a more generic problem. Currently we do not account memory for task descriptors that are not complete. It is hard to say how big is this problem in practice. The scheduler should not schedule downstream stages before upstream stages are done (at least done with scheduling tasks). Hence in theory the time it takes to enumerate splits for broadcast tasks shouldn't be long. However yeah, today it is inherently racy and it is possible that broadcast split enumeration could be slow. We may need to further improve this algorithm to have more control over it at some point.
core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenTaskSource.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenTaskSource.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
public synchronized void start() | ||
public synchronized ListenableFuture<AssignmentResult> process() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this method is called twice, and second call is done before future returned by the first call is done then we would register same splits twice in the assigner. I think we should guard against it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you just need to verify that getSplitBatchAndAdvance
is never called twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a check to make sure process
is not called before the previous one is finished
return immediateFuture(assigner.finish()); | ||
} | ||
|
||
ListenableFuture<IdempotentSplitSource.SplitBatchReference> firstCompleted = whenAnyComplete(futures); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This attaches a callback to each future on each call to process
. If some sources are slow the list of callback can grow large.
You can avoid that by keeping SettabeFuture<AssignmentResult
to be completed when new splits are discovered on the field. But I think it overall gets more complicated (you cannot easily exploit IdempotentSplitSource
I think).
Maybe this is not a big deal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's a good point. Added a proxy future as an attempt to address the accumulating callbacks problem. Not sure if that's the cleanest solution though. Please take a look and let me know what you think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tricky but should work. I really hope will I will not need to touch this code in the future :)
@@ -272,116 +269,19 @@ public void stressTest() | |||
testStageTaskSourceSuccess(sourceHandles, remoteSources, splits); | |||
} | |||
|
|||
@Test(invocationCount = INVOCATION_COUNT) | |||
public void testFailures() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how is that tested now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The failure handling logic is now trivial (there's no custom error handling code, the assumption is that Future#transform handles failures as expected). Thought that might not be worth it maintaining this extra complexity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As often I had hard time reading that (complex nature). It looks good. Some comments.
It's definitely very hard to digest here given the complexity of the scheduler. Can you elaborate more on how we achieve back pressure here? |
Before the process of enumerating splits was launched in background and a set of callbacks called when a new task was added. With such implementation there was no way for the scheduler to tell the task factory to "suspend". In this PR the model is changed to more of a "pull" model. Whenever scheduler decides it needs more tasks it asks the task factory to create more. |
b8b8f15
to
05ab2f1
Compare
Updated and ready for review |
} | ||
splitSource.close(); | ||
boolean result = delegate().cancel(mayInterruptIfRunning); | ||
propagateIfNecessary(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you need this if this when propagateIfNecessary
is already attached to delegate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, good point. Let me remove the override
05ab2f1
to
2675557
Compare
Description
Prevent the scheduler from creating more task descriptors than necessary
Additional context and related issues
TODO
Release notes
(X) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
( ) Release notes are required, with the following suggested text: