Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-42090] Introduce sasl retry count in RetryingBlockTransferor #39611

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.TimeUnit;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
Expand Down Expand Up @@ -87,7 +88,16 @@ void createAndStart(String[] blockIds, BlockTransferListener listener)
/** Number of times we've attempted to retry so far. */
private int retryCount = 0;

private boolean saslTimeoutSeen;
// Number of times SASL timeout has been retried without success.
// If we see maxRetries consecutive failures, the request is failed.
// On the other hand, if sasl succeeds and we are able to send other requests subsequently,
// we reduce the SASL failures from retryCount (since SASL failures were part of
// connection bootstrap - which ended up being successful).
// spark.network.auth.rpcTimeout is much lower than spark.network.timeout and others -
// and so sasl is more susceptible to failures when remote service
// (like external shuffle service) is under load: but once it succeeds, we do not want to
// include it as part of request retries.
private int saslRetryCount = 0;

/**
* Set of all block ids which have not been transferred successfully or with a non-IO Exception.
Expand Down Expand Up @@ -123,7 +133,7 @@ public RetryingBlockTransferor(
this.currentListener = new RetryingBlockTransferListener();
this.errorHandler = errorHandler;
this.enableSaslRetries = conf.enableSaslRetries();
this.saslTimeoutSeen = false;
this.saslRetryCount = 0;
}

public RetryingBlockTransferor(
Expand Down Expand Up @@ -167,7 +177,7 @@ private void transferAllOutstanding() {
numRetries > 0 ? "(after " + numRetries + " retries)" : ""), e);

if (shouldRetry(e)) {
initiateRetry();
initiateRetry(e);
} else {
for (String bid : blockIdsToTransfer) {
listener.onBlockTransferFailure(bid, e);
Expand All @@ -180,7 +190,10 @@ private void transferAllOutstanding() {
* Lightweight method which initiates a retry in a different thread. The retry will involve
* calling transferAllOutstanding() after a configured wait time.
*/
private synchronized void initiateRetry() {
private synchronized void initiateRetry(Throwable e) {
if (enableSaslRetries && e instanceof SaslTimeoutException) {
saslRetryCount += 1;
}
retryCount += 1;
currentListener = new RetryingBlockTransferListener();

Expand All @@ -203,16 +216,17 @@ private synchronized boolean shouldRetry(Throwable e) {
boolean isIOException = e instanceof IOException
|| e.getCause() instanceof IOException;
boolean isSaslTimeout = enableSaslRetries && e instanceof SaslTimeoutException;
if (!isSaslTimeout && saslTimeoutSeen) {
retryCount = 0;
saslTimeoutSeen = false;
// If this is a non SASL request failure, reduce earlier SASL failures from retryCount
// since some subsequent SASL attempt was successful
if (!isSaslTimeout && saslRetryCount > 0) {
Preconditions.checkState(retryCount >= saslRetryCount,
"retryCount must be greater than or equal to saslRetryCount");
retryCount -= saslRetryCount;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also verify that retryCount >= saslRetryCount? Just so that we don't ever end up in some funky state where we end up with negative values (I don't think this can ever happen, but would good to add the check!). :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a runtime exception.
But as you said, subset should never be bigger than the whole.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good, thanks

saslRetryCount = 0;
}
boolean hasRemainingRetries = retryCount < maxRetries;
boolean shouldRetry = (isSaslTimeout || isIOException) &&
hasRemainingRetries && errorHandler.shouldRetryError(e);
if (shouldRetry && isSaslTimeout) {
this.saslTimeoutSeen = true;
}
return shouldRetry;
}

Expand All @@ -236,9 +250,13 @@ private void handleBlockTransferSuccess(String blockId, ManagedBuffer data) {
if (this == currentListener && outstandingBlocksIds.contains(blockId)) {
outstandingBlocksIds.remove(blockId);
shouldForwardSuccess = true;
if (saslTimeoutSeen) {
retryCount = 0;
saslTimeoutSeen = false;
// If there were SASL failures earlier, remove them from retryCount, as there was
// a SASL success (and some other request post bootstrap was also successful).
if (saslRetryCount > 0) {
Preconditions.checkState(retryCount >= saslRetryCount,
"retryCount must be greater than or equal to saslRetryCount");
retryCount -= saslRetryCount;
tedyu marked this conversation as resolved.
Show resolved Hide resolved
saslRetryCount = 0;
}
}
}
Expand All @@ -256,7 +274,7 @@ private void handleBlockTransferFailure(String blockId, Throwable exception) {
synchronized (RetryingBlockTransferor.this) {
if (this == currentListener && outstandingBlocksIds.contains(blockId)) {
if (shouldRetry(exception)) {
initiateRetry();
initiateRetry(exception);
} else {
if (errorHandler.shouldLogError(exception)) {
logger.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,12 @@ public class RetryingBlockTransferorSuite {
private static Map<String, String> configMap;
private static RetryingBlockTransferor _retryingBlockTransferor;

private static final int MAX_RETRIES = 2;

@Before
public void initMap() {
configMap = new HashMap<String, String>() {{
put("spark.shuffle.io.maxRetries", "2");
put("spark.shuffle.io.maxRetries", Integer.toString(MAX_RETRIES));
put("spark.shuffle.io.retryWait", "0");
}};
}
Expand Down Expand Up @@ -309,7 +311,7 @@ public void testRepeatedSaslRetryFailures() throws IOException, InterruptedExcep
verify(listener, timeout(5000)).onBlockTransferFailure("b0", saslTimeoutException);
verify(listener, times(3)).getTransferType();
verifyNoMoreInteractions(listener);
assert(_retryingBlockTransferor.getRetryCount() == 2);
assert(_retryingBlockTransferor.getRetryCount() == MAX_RETRIES);
}

@Test
Expand Down Expand Up @@ -341,6 +343,35 @@ public void testBlockTransferFailureAfterSasl() throws IOException, InterruptedE
assert(_retryingBlockTransferor.getRetryCount() == 1);
}

@Test
public void testIOExceptionFailsConnectionEvenWithSaslException()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think we should also expose saslRetryCount as @VisibleForTesting and assert the count in the tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the change w.r.t. your 3rd comment.
With that change, I think we don't need to expose saslRetryCount.

In the future, we can use a map (type of exception -> outstanding retry count for that type) where we can think more about asserting on retry count for each type.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense. I had a similar idea here: 3440f12
But we ended up opting to go with a simple flag for now. I agree with using a map in the future where we might have retry counts for each type.

throws IOException, InterruptedException {
BlockFetchingListener listener = mock(BlockFetchingListener.class);

SaslTimeoutException saslExceptionInitial = new SaslTimeoutException("initial",
new TimeoutException());
SaslTimeoutException saslExceptionFinal = new SaslTimeoutException("final",
new TimeoutException());
IOException ioException = new IOException();
List<? extends Map<String, Object>> interactions = Arrays.asList(
ImmutableMap.of("b0", saslExceptionInitial),
ImmutableMap.of("b0", ioException),
ImmutableMap.of("b0", saslExceptionInitial),
ImmutableMap.of("b0", ioException),
ImmutableMap.of("b0", saslExceptionFinal),
// will not get invoked because the connection fails
ImmutableMap.of("b0", ioException),
// will not get invoked
ImmutableMap.of("b0", block0)
);
configMap.put("spark.shuffle.sasl.enableRetries", "true");
performInteractions(interactions, listener);
verify(listener, timeout(5000)).onBlockTransferFailure("b0", saslExceptionFinal);
verify(listener, atLeastOnce()).getTransferType();
verifyNoMoreInteractions(listener);
assert(_retryingBlockTransferor.getRetryCount() == MAX_RETRIES);
}

/**
* Performs a set of interactions in response to block requests from a RetryingBlockFetcher.
* Each interaction is a Map from BlockId to either ManagedBuffer or Exception. This interaction
Expand Down