-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle interruption in RetryDriver and TaskRunner #15803
Handle interruption in RetryDriver and TaskRunner #15803
Conversation
64ca8d3
to
879d6e6
Compare
Stop attempting retries in Hive's RetryDriver if an InterruptedException is caught or when the current thread is interrupted and immediately throw the current exception. Otherwise, RetryDriver might interfere with drivers terminating in a timely manner after tasks are terminated via a cancel, failure, or abort.
879d6e6
to
9f70de7
Compare
Drivers may leave the TaskRunner thread's interrupt flag set during the course of processing, but doing so should not result in the TaskRunner terminating its own processing loop until the TaskExecutor is closed. Instead of allowing the interrupt to terminate the current task runner's loop and re-creating a new runner to replace the interrupted one, we can clear the interrupt flag after each iteration. Otherwise, TaskRunners that were interrupted would have to replace themselves and end up creating unnecessary threads in the cached threadpool executor in the process.
6570643
to
46fcbd6
Compare
@@ -561,6 +561,15 @@ public void run() | |||
} | |||
splitFinished(split); | |||
} | |||
finally { | |||
// Clear the interrupted flag on the current thread, driver cancellation may have triggered an interrupt | |||
if (Thread.interrupted()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the motivation of this change?
Based on my understanding terminating a runner tread upon interruption is by design (see line 503, while (!closed && !Thread.currentThread().isInterrupted()) {
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Terminating a runner isn't necessary if the TaskExecutor
is not closed. Essentially there are two sources of interrupts that can occur:
TaskExecutor#shutdown()
setsclosed = true
, callsexecutor.shutdownNow()
which will interrupt all threads. If they're blocked in a call towaitingSplits.take()
, they will wake up and throwInterruptedException
and the runner will terminate (without adding a replacement for itself). This happens exactly once during the whole lifecycle of the JVM in production, but in tests obviously it can happen multiple times.- The active driver is closed while the
TaskRunner
is running it. The task runner thread is interrupted and may or may not throw anInterruptedException
as a result since it's in the middle of a call tosplit.process()
and we can't know for sure that an interruptable method will be called. Alternatively, some code inside of the driver interrupts the current thread for some other reason. Regardless of when/why that happens, the runner will terminate on the next iteration if we leave the flag set (because of!Thread.currentThread().isInterrupted()
) but ifTaskExecutor#shutdown()
was not called then there's no need to let that happen since it will just re-queue a newTaskRunner
to replace itself via line 576 in the finally block.
So we can see that there are essentially two points to interrupting the thread: 1. stop all TaskRunner
s because we're shutting down (rare) or 2. stop a single driver that happens to be running on a given TaskRunner
. In the second case, pushing a new runner to replace the one that will terminate into the executor will create a new thread (because it's a cached threadpool) if an extra thread doesn't already exist there (note: the current thread is still "running", so it the runner can't replace itself on the same carrier thread), which isn't necessary and does have a real runtime memory and performance cost.
So basically we're taking all interrupts that occur from single driver "stop" commands, and clearing them away and leaving only interrupts originating from scenario 1 set on the runner thread after we finish processing each split. Technically, we could strip that !Thread.currentThread().isInterrupted()
out of the loop condition since now the only interrupts that should terminate the runner happen as part of waitingSplits.take()
which has it's own exit path handling.
@@ -130,6 +130,11 @@ public <V> V run(String callableName, Callable<V> callable) | |||
return callable.call(); | |||
} | |||
catch (Exception e) { | |||
// Immediately stop retry attempts once an interrupt has been received | |||
if (e instanceof InterruptedException || Thread.currentThread().isInterrupted()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should recover the Interrupted
flag while also stopping retries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was on the fence about whether to do that here. I think since we're re-throwing InterruptedException
directly, the caller is supposed to be responsible for making that decision- otherwise you can end up setting an "extra" interrupts on the current thread (perhaps inappropriately?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nevermind, I think I misread. You are re-throwing the InterruptedException
and not suppressing it. No need to recover the flag.
Description
This PR includes two changes to Thread interrupt handling:
trino-hive
, modifyRetryDriver
to stop attempting retries ifInterruptedException
is caught or the current thread is interrupted and immediately throw the current exception. Otherwise,RetryDriver
might interfere with drivers terminating in a timely manner after tasks are terminated via a cancel, failure, or abort.trino-main
, check and clear thread interrupt status at the end of processing a split inTaskRunner
before looping back to start processing a new split. Otherwise, a split that is interrupted (eg: because the task was cancelled) would leave the thread interrupted flag set which would cause theTaskRunner
to stop processing and unnecessarily submit a new instance from which could cause additional threads to be created in the cached thread pool executor.Release notes
(x) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
( ) Release notes are required, with the following suggested text: