Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle interruption in RetryDriver and TaskRunner #15803

Merged
merged 2 commits into from
Jan 23, 2023

Conversation

pettyjamesm
Copy link
Member

@pettyjamesm pettyjamesm commented Jan 21, 2023

Description

This PR includes two changes to Thread interrupt handling:

  1. In trino-hive, modify RetryDriver to stop attempting retries if InterruptedException is caught or the current thread is interrupted and immediately throw the current exception. Otherwise, RetryDriver might interfere with drivers terminating in a timely manner after tasks are terminated via a cancel, failure, or abort.
  2. In trino-main, check and clear thread interrupt status at the end of processing a split in TaskRunner before looping back to start processing a new split. Otherwise, a split that is interrupted (eg: because the task was cancelled) would leave the thread interrupted flag set which would cause the TaskRunner to stop processing and unnecessarily submit a new instance from which could cause additional threads to be created in the cached thread pool executor.

Release notes

(x) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
( ) Release notes are required, with the following suggested text:

@cla-bot cla-bot bot added the cla-signed label Jan 21, 2023
@pettyjamesm pettyjamesm requested a review from arhimondr January 21, 2023 19:06
@pettyjamesm pettyjamesm force-pushed the stop-retrydriver-on-interrupt branch from 64ca8d3 to 879d6e6 Compare January 21, 2023 20:40
Stop attempting retries in Hive's RetryDriver if an InterruptedException
is caught or when the current thread is interrupted and immediately throw
the current exception. Otherwise, RetryDriver might interfere with drivers
terminating in a timely manner after tasks are terminated via a
cancel, failure, or abort.
@pettyjamesm pettyjamesm force-pushed the stop-retrydriver-on-interrupt branch from 879d6e6 to 9f70de7 Compare January 21, 2023 20:43
Drivers may leave the TaskRunner thread's interrupt flag set during the
course of processing, but doing so should not result in the TaskRunner
terminating its own processing loop until the TaskExecutor is closed.

Instead of allowing the interrupt to terminate the current task runner's
loop and re-creating a new runner to replace the interrupted one, we can
clear the interrupt flag after each iteration. Otherwise, TaskRunners
that were interrupted would have to replace themselves and end up
creating unnecessary threads in the cached threadpool executor in the
process.
@pettyjamesm pettyjamesm force-pushed the stop-retrydriver-on-interrupt branch from 6570643 to 46fcbd6 Compare January 22, 2023 00:09
@pettyjamesm pettyjamesm changed the title Stop RetryDriver when interrupted Handle interruption in RetryDriver and TaskRunner Jan 23, 2023
@pettyjamesm pettyjamesm requested a review from sopel39 January 23, 2023 18:36
@@ -561,6 +561,15 @@ public void run()
}
splitFinished(split);
}
finally {
// Clear the interrupted flag on the current thread, driver cancellation may have triggered an interrupt
if (Thread.interrupted()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the motivation of this change?

Based on my understanding terminating a runner tread upon interruption is by design (see line 503, while (!closed && !Thread.currentThread().isInterrupted()) {)

Copy link
Member Author

@pettyjamesm pettyjamesm Jan 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Terminating a runner isn't necessary if the TaskExecutor is not closed. Essentially there are two sources of interrupts that can occur:

  1. TaskExecutor#shutdown() sets closed = true, calls executor.shutdownNow() which will interrupt all threads. If they're blocked in a call to waitingSplits.take(), they will wake up and throw InterruptedException and the runner will terminate (without adding a replacement for itself). This happens exactly once during the whole lifecycle of the JVM in production, but in tests obviously it can happen multiple times.
  2. The active driver is closed while the TaskRunner is running it. The task runner thread is interrupted and may or may not throw an InterruptedException as a result since it's in the middle of a call to split.process() and we can't know for sure that an interruptable method will be called. Alternatively, some code inside of the driver interrupts the current thread for some other reason. Regardless of when/why that happens, the runner will terminate on the next iteration if we leave the flag set (because of !Thread.currentThread().isInterrupted()) but if TaskExecutor#shutdown() was not called then there's no need to let that happen since it will just re-queue a new TaskRunner to replace itself via line 576 in the finally block.

So we can see that there are essentially two points to interrupting the thread: 1. stop all TaskRunners because we're shutting down (rare) or 2. stop a single driver that happens to be running on a given TaskRunner. In the second case, pushing a new runner to replace the one that will terminate into the executor will create a new thread (because it's a cached threadpool) if an extra thread doesn't already exist there (note: the current thread is still "running", so it the runner can't replace itself on the same carrier thread), which isn't necessary and does have a real runtime memory and performance cost.

So basically we're taking all interrupts that occur from single driver "stop" commands, and clearing them away and leaving only interrupts originating from scenario 1 set on the runner thread after we finish processing each split. Technically, we could strip that !Thread.currentThread().isInterrupted() out of the loop condition since now the only interrupts that should terminate the runner happen as part of waitingSplits.take() which has it's own exit path handling.

@@ -130,6 +130,11 @@ public <V> V run(String callableName, Callable<V> callable)
return callable.call();
}
catch (Exception e) {
// Immediately stop retry attempts once an interrupt has been received
if (e instanceof InterruptedException || Thread.currentThread().isInterrupted()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should recover the Interrupted flag while also stopping retries.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was on the fence about whether to do that here. I think since we're re-throwing InterruptedException directly, the caller is supposed to be responsible for making that decision- otherwise you can end up setting an "extra" interrupts on the current thread (perhaps inappropriately?).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, I think I misread. You are re-throwing the InterruptedException and not suppressing it. No need to recover the flag.

@arhimondr arhimondr merged commit 08522e5 into trinodb:master Jan 23, 2023
@pettyjamesm pettyjamesm deleted the stop-retrydriver-on-interrupt branch January 23, 2023 22:03
@github-actions github-actions bot added this to the 406 milestone Jan 24, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

2 participants