-
Notifications
You must be signed in to change notification settings - Fork 364
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CELEBORN-1757] Add retry when sending RPC to LifecycleManager #3008
base: main
Are you sure you want to change the base?
Conversation
docs/configuration/network.md
Outdated
@@ -29,7 +29,7 @@ license: | | |||
| celeborn.<module>.io.enableVerboseMetrics | false | false | Whether to track Netty memory detailed metrics. If true, the detailed metrics of Netty PoolByteBufAllocator will be gotten, otherwise only general memory usage will be tracked. | | | | |||
| celeborn.<module>.io.lazyFD | true | false | Whether to initialize FileDescriptor lazily or not. If true, file descriptors are created only when data is going to be transferred. This can reduce the number of open files. If setting <module> to `fetch`, it works for worker fetch server. | | | | |||
| celeborn.<module>.io.maxRetries | 3 | false | Max number of times we will try IO exceptions (such as connection timeouts) per request. If set to 0, we will not do any retries. If setting <module> to `data`, it works for shuffle client push and fetch data. If setting <module> to `replicate`, it works for replicate client of worker replicating data to peer worker. If setting <module> to `push`, it works for Flink shuffle client push data. | | | | |||
| celeborn.<module>.io.mode | EPOLL | false | Netty EventLoopGroup backend, available options: NIO, EPOLL. If epoll mode is available, the default IO mode is EPOLL; otherwise, the default is NIO. | | | | |||
| celeborn.<module>.io.mode | NIO | false | Netty EventLoopGroup backend, available options: NIO, EPOLL. If epoll mode is available, the default IO mode is EPOLL; otherwise, the default is NIO. | | | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @SteNicholas Seems the doc generation depends on the developer environment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it can not pass the GA, need to revert it.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3008 +/- ##
==========================================
+ Coverage 32.88% 33.02% +0.14%
==========================================
Files 331 331
Lines 19800 19851 +51
Branches 1780 1787 +7
==========================================
+ Hits 6510 6554 +44
- Misses 12929 12934 +5
- Partials 361 363 +2 ☔ View full report in Codecov by Sentry. |
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
Outdated
Show resolved
Hide resolved
initDataClientFactoryIfNeeded(); | ||
} | ||
|
||
public <T> T callLifecycleManagerWithTimeoutRetry(Callable<T> callable, String name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of making changes everywhere - do we want to simply change askSync/askAsync to become retry aware ? With number of retries passed in as a param (for specific cases where we dont want retries for ex) ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree to change askSync/askAsync.
There are a lot of exception changes caused by that the setupLifecycleManagerRef
will throws RpcTimeoutExceptions
which we need to catch. I change the Exception type to RuntimeException
.withAlternative("celeborn.callLifecycleManager.maxRetries") | ||
.categories("client") | ||
.version("0.6.0") | ||
.doc("Max retry times for client to reserve slots.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to reserve slots.
Seems not only for reserving slots.
buildConf("celeborn.rpc.timeoutRetryWait") | ||
.categories("network") | ||
.version("0.6.0") | ||
.doc("Wait time before next retry if RpcTimeoutException.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: if RpcTimeoutException
=> on RpcTimeoutException
.
try { | ||
limitZeroInFlight(mapKey, pushState); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary change.
@@ -1700,13 +1700,12 @@ private void mapEndInternal( | |||
throws IOException { | |||
final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId); | |||
PushState pushState = getPushState(mapKey); | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary change.
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems unnecessary change? I do not see new concurrent class involved in this class.
|
||
val CLIENT_CALL_LIFECYCLEMANAGER_MAX_RETRIES: ConfigEntry[Int] = | ||
buildConf("celeborn.client.callLifecycleManager.maxRetries") | ||
.withAlternative("celeborn.callLifecycleManager.maxRetries") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems there is no legacy config celeborn.callLifecycleManager.maxRetries
, do not need withAlternative
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to reuse CLIENT_RPC_MAX_RETIRES?
val CLIENT_RPC_MAX_RETIRES: ConfigEntry[Int] =
buildConf("celeborn.client.rpc.maxRetries")
.categories("client")
.version("0.3.2")
.doc("Max RPC retry times in LifecycleManager.")
.intConf
.createWithDefault(3)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Too many parameters are not easy to maintain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can fallback the specific client config to a client default config item at least.
@@ -4884,6 +4886,23 @@ object CelebornConf extends Logging { | |||
.timeConf(TimeUnit.MILLISECONDS) | |||
.createWithDefaultString("3s") | |||
|
|||
val RPC_TIMEOUT_RETRY_WAIT: ConfigEntry[Long] = | |||
buildConf("celeborn.rpc.timeoutRetryWait") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems celeborn.rpc.retryWait
is enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
has been updated~
What changes were proposed in this pull request?
Retry seding RPC to LifecycleManager when TimeoutException.
Why are the changes needed?
RPC messages are processed by
Dispatcher.threadpool
which its numThreads depends onnumUsableCores
.In some cases (k8s) the numThreads of LifecycleManager are not enough while the RPCs are a lot so there are TimeoutExceptions.
Add retry when there are TimeoutExceptions.
Does this PR introduce any user-facing change?
No.
Another way is to adjust the configuration
celeborn.lifecycleManager.rpc.dispatcher.threads
to add the numThreads.This way is more affective.
How was this patch tested?
Cluster testing.