-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-42090] Introduce sasl retry count in RetryingBlockTransferor #39611
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,10 +58,12 @@ public class RetryingBlockTransferorSuite { | |
private static Map<String, String> configMap; | ||
private static RetryingBlockTransferor _retryingBlockTransferor; | ||
|
||
private static final int MAX_RETRIES = 2; | ||
|
||
@Before | ||
public void initMap() { | ||
configMap = new HashMap<String, String>() {{ | ||
put("spark.shuffle.io.maxRetries", "2"); | ||
put("spark.shuffle.io.maxRetries", Integer.toString(MAX_RETRIES)); | ||
put("spark.shuffle.io.retryWait", "0"); | ||
}}; | ||
} | ||
|
@@ -309,7 +311,7 @@ public void testRepeatedSaslRetryFailures() throws IOException, InterruptedExcep | |
verify(listener, timeout(5000)).onBlockTransferFailure("b0", saslTimeoutException); | ||
verify(listener, times(3)).getTransferType(); | ||
verifyNoMoreInteractions(listener); | ||
assert(_retryingBlockTransferor.getRetryCount() == 2); | ||
assert(_retryingBlockTransferor.getRetryCount() == MAX_RETRIES); | ||
} | ||
|
||
@Test | ||
|
@@ -341,6 +343,35 @@ public void testBlockTransferFailureAfterSasl() throws IOException, InterruptedE | |
assert(_retryingBlockTransferor.getRetryCount() == 1); | ||
} | ||
|
||
@Test | ||
public void testIOExceptionFailsConnectionEvenWithSaslException() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do you think we should also expose There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See the change w.r.t. your 3rd comment. In the future, we can use a map (type of exception -> outstanding retry count for that type) where we can think more about asserting on retry count for each type. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. makes sense. I had a similar idea here: 3440f12 |
||
throws IOException, InterruptedException { | ||
BlockFetchingListener listener = mock(BlockFetchingListener.class); | ||
|
||
SaslTimeoutException saslExceptionInitial = new SaslTimeoutException("initial", | ||
new TimeoutException()); | ||
SaslTimeoutException saslExceptionFinal = new SaslTimeoutException("final", | ||
new TimeoutException()); | ||
IOException ioException = new IOException(); | ||
List<? extends Map<String, Object>> interactions = Arrays.asList( | ||
ImmutableMap.of("b0", saslExceptionInitial), | ||
ImmutableMap.of("b0", ioException), | ||
ImmutableMap.of("b0", saslExceptionInitial), | ||
ImmutableMap.of("b0", ioException), | ||
ImmutableMap.of("b0", saslExceptionFinal), | ||
// will not get invoked because the connection fails | ||
ImmutableMap.of("b0", ioException), | ||
// will not get invoked | ||
ImmutableMap.of("b0", block0) | ||
); | ||
configMap.put("spark.shuffle.sasl.enableRetries", "true"); | ||
performInteractions(interactions, listener); | ||
verify(listener, timeout(5000)).onBlockTransferFailure("b0", saslExceptionFinal); | ||
verify(listener, atLeastOnce()).getTransferType(); | ||
verifyNoMoreInteractions(listener); | ||
assert(_retryingBlockTransferor.getRetryCount() == MAX_RETRIES); | ||
} | ||
|
||
/** | ||
* Performs a set of interactions in response to block requests from a RetryingBlockFetcher. | ||
* Each interaction is a Map from BlockId to either ManagedBuffer or Exception. This interaction | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also verify that
retryCount >= saslRetryCount
? Just so that we don't ever end up in some funky state where we end up with negative values (I don't think this can ever happen, but would good to add the check!). :)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a runtime exception.
But as you said, subset should never be bigger than the whole.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good, thanks