Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle interruption in RetryDriver and TaskRunner #18964

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ public <V> V run(String callableName, Callable<V> callable)
return callable.call();
}
catch (Exception e) {
// Immediately stop retry attempts once an interrupt has been received
if (e instanceof InterruptedException || Thread.currentThread().isInterrupted()) {
ajaygeorge marked this conversation as resolved.
Show resolved Hide resolved

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late comment, Checking the isInterrupted will clear the interrupt flag on the thread, is that expected ? I am not sure who is responsible for clearing the interrupt flag in the Presto codebase.

Sleep on line 163, tries to rightly set the interrupted flag.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, isInterrupted does not clear the flag, so it is good. but on handling the interruptedException, there are still differences.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We’re rethrowing InterruptedException (or whatever other exception was thrown if the current thread is interrupted) but not “handing” it other than to ensure that we stop retrying and let the exception propagate.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks this makes sense

addSuppressed(e, suppressedExceptions);
throw e;
}
e = exceptionMapper.apply(e);
for (Class<? extends Exception> clazz : exceptionWhiteList) {
if (clazz.isInstance(e)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ private ObjectMetadata getS3ObjectMetadata(Path path, String bucketName, String
return retry()
.maxAttempts(maxAttempts)
.exponentialBackoff(BACKOFF_MIN_SLEEP, maxBackoffTime, maxRetryTime, 2.0)
.stopOn(InterruptedException.class, UnrecoverableS3OperationException.class)
.stopOn(InterruptedException.class, UnrecoverableS3OperationException.class, AbortedException.class)
.onRetry(STATS::newGetMetadataRetry)
.run("getS3ObjectMetadata", () -> {
try {
Expand Down Expand Up @@ -931,7 +931,7 @@ public int read(long position, byte[] buffer, int offset, int length)
return retry()
.maxAttempts(maxAttempts)
.exponentialBackoff(BACKOFF_MIN_SLEEP, maxBackoffTime, maxRetryTime, 2.0)
.stopOn(InterruptedException.class, UnrecoverableS3OperationException.class, EOFException.class, FileNotFoundException.class)
.stopOn(InterruptedException.class, UnrecoverableS3OperationException.class, EOFException.class, FileNotFoundException.class, AbortedException.class)
.onRetry(STATS::newGetObjectRetry)
.run("getS3Object", () -> {
InputStream stream;
Expand Down Expand Up @@ -1101,7 +1101,7 @@ private InputStream openStream(Path path, long start)
return retry()
.maxAttempts(maxAttempts)
.exponentialBackoff(BACKOFF_MIN_SLEEP, maxBackoffTime, maxRetryTime, 2.0)
.stopOn(InterruptedException.class, UnrecoverableS3OperationException.class, FileNotFoundException.class)
.stopOn(InterruptedException.class, UnrecoverableS3OperationException.class, FileNotFoundException.class, AbortedException.class)
.onRetry(STATS::newGetObjectRetry)
.run("getS3Object", () -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.hive.s3select;

import com.amazonaws.AbortedException;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CompressionType;
import com.amazonaws.services.s3.model.InputSerialization;
Expand Down Expand Up @@ -180,7 +181,7 @@ private int readLine(Text value)
return retry()
.maxAttempts(maxAttempts)
.exponentialBackoff(BACKOFF_MIN_SLEEP, maxBackoffTime, maxRetryTime, 2.0)
.stopOn(InterruptedException.class, UnrecoverableS3OperationException.class)
.stopOn(InterruptedException.class, UnrecoverableS3OperationException.class, AbortedException.class)
.run("readRecordsContentStream", () -> {
if (isFirstLine) {
recordsFromS3 = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,17 @@ public void run()
}
splitFinished(split);
}
finally {
// Clear the interrupted flag on the current thread, driver cancellation may have triggered an interrupt
// without TaskExecutor being shut down
if (Thread.interrupted()) {
if (closed) {
// reset interrupted flag if TaskExecutor was closed, since that interrupt may have been the
// shutdown signal to this TaskRunner thread
Thread.currentThread().interrupt();
}
}
}
ajaygeorge marked this conversation as resolved.
Show resolved Hide resolved
}
}
finally {
Expand Down