Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement split enumeration back pressure for FTE #15514

Merged
merged 1 commit into from
Jan 10, 2023

Conversation

arhimondr
Copy link
Contributor

Description

Prevent the scheduler from creating more task descriptors than necessary

Additional context and related issues

TODO

Release notes

(X) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
( ) Release notes are required, with the following suggested text:

# Section
* Fix some things. ({issue}`issuenumber`)

@@ -974,6 +935,28 @@ private StateChangeListener<TaskStatus> createExchangeSinkInstanceHandleUpdateRe
};
}

private void loadMoreTaskDescriptorsIfNecessary()
{
if (schedulingQueue.size() - schedulingQueue.getSpeculativeCount() < 100) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: make 100 configurable

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I am wrong here. If we have arbitrary distribution stage with a replicated source then we do not have really much control over how many "speculative" task descriptors we would produce before we enumarte all the splits for the replicated source.
Only after the split enumaration for replicated source completes the priority for tasks will change from "speculative" to "non-speculative".
Probably a corner case which is not something to bother but I would like to be sure I understand the logic here correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. You are right. I think there's even a more generic problem. Currently we do not account memory for task descriptors that are not complete. It is hard to say how big is this problem in practice. The scheduler should not schedule downstream stages before upstream stages are done (at least done with scheduling tasks). Hence in theory the time it takes to enumerate splits for broadcast tasks shouldn't be long. However yeah, today it is inherently racy and it is possible that broadcast split enumeration could be slow. We may need to further improve this algorithm to have more control over it at some point.

}

public synchronized void start()
public synchronized ListenableFuture<AssignmentResult> process()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this method is called twice, and second call is done before future returned by the first call is done then we would register same splits twice in the assigner. I think we should guard against it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you just need to verify that getSplitBatchAndAdvance is never called twice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a check to make sure process is not called before the previous one is finished

return immediateFuture(assigner.finish());
}

ListenableFuture<IdempotentSplitSource.SplitBatchReference> firstCompleted = whenAnyComplete(futures);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This attaches a callback to each future on each call to process. If some sources are slow the list of callback can grow large.

You can avoid that by keeping SettabeFuture<AssignmentResult to be completed when new splits are discovered on the field. But I think it overall gets more complicated (you cannot easily exploit IdempotentSplitSource I think).
Maybe this is not a big deal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's a good point. Added a proxy future as an attempt to address the accumulating callbacks problem. Not sure if that's the cleanest solution though. Please take a look and let me know what you think.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tricky but should work. I really hope will I will not need to touch this code in the future :)

@@ -272,116 +269,19 @@ public void stressTest()
testStageTaskSourceSuccess(sourceHandles, remoteSources, splits);
}

@Test(invocationCount = INVOCATION_COUNT)
public void testFailures()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is that tested now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The failure handling logic is now trivial (there's no custom error handling code, the assumption is that Future#transform handles failures as expected). Thought that might not be worth it maintaining this extra complexity.

Copy link
Member

@losipiuk losipiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As often I had hard time reading that (complex nature). It looks good. Some comments.

@linzebing
Copy link
Member

It's definitely very hard to digest here given the complexity of the scheduler. Can you elaborate more on how we achieve back pressure here?

@arhimondr
Copy link
Contributor Author

@linzebing

Can you elaborate more on how we achieve back pressure here?

Before the process of enumerating splits was launched in background and a set of callbacks called when a new task was added. With such implementation there was no way for the scheduler to tell the task factory to "suspend". In this PR the model is changed to more of a "pull" model. Whenever scheduler decides it needs more tasks it asks the task factory to create more.

@arhimondr arhimondr force-pushed the limit-task-descriptor-buffer branch 3 times, most recently from b8b8f15 to 05ab2f1 Compare January 4, 2023 17:57
@arhimondr arhimondr changed the title [WIP] Implement split enumeration back pressure for FTE Implement split enumeration back pressure for FTE Jan 4, 2023
@arhimondr
Copy link
Contributor Author

Updated and ready for review

}
splitSource.close();
boolean result = delegate().cancel(mayInterruptIfRunning);
propagateIfNecessary();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need this if this when propagateIfNecessary is already attached to delegate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, good point. Let me remove the override

@arhimondr arhimondr force-pushed the limit-task-descriptor-buffer branch from 05ab2f1 to 2675557 Compare January 10, 2023 15:54
@arhimondr arhimondr merged commit 3fa94ea into trinodb:master Jan 10, 2023
@arhimondr arhimondr deleted the limit-task-descriptor-buffer branch January 10, 2023 18:44
@github-actions github-actions bot added this to the 406 milestone Jan 10, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

3 participants