Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1757] Add retry when sending RPC to LifecycleManager #3008

Open
wants to merge 21 commits into
base: main
Choose a base branch
from

Conversation

zaynt4606
Copy link
Contributor

@zaynt4606 zaynt4606 commented Dec 19, 2024

What changes were proposed in this pull request?

Retry seding RPC to LifecycleManager when TimeoutException.

Why are the changes needed?

RPC messages are processed by Dispatcher.threadpool which its numThreads depends on numUsableCores.
In some cases (k8s) the numThreads of LifecycleManager are not enough while the RPCs are a lot so there are TimeoutExceptions.
Add retry when there are TimeoutExceptions.

Does this PR introduce any user-facing change?

No.

Another way is to adjust the configuration celeborn.lifecycleManager.rpc.dispatcher.threads to add the numThreads.
This way is more affective.

How was this patch tested?

Cluster testing.

@@ -29,7 +29,7 @@ license: |
| celeborn.<module>.io.enableVerboseMetrics | false | false | Whether to track Netty memory detailed metrics. If true, the detailed metrics of Netty PoolByteBufAllocator will be gotten, otherwise only general memory usage will be tracked. | | |
| celeborn.&lt;module&gt;.io.lazyFD | true | false | Whether to initialize FileDescriptor lazily or not. If true, file descriptors are created only when data is going to be transferred. This can reduce the number of open files. If setting <module> to `fetch`, it works for worker fetch server. | | |
| celeborn.&lt;module&gt;.io.maxRetries | 3 | false | Max number of times we will try IO exceptions (such as connection timeouts) per request. If set to 0, we will not do any retries. If setting <module> to `data`, it works for shuffle client push and fetch data. If setting <module> to `replicate`, it works for replicate client of worker replicating data to peer worker. If setting <module> to `push`, it works for Flink shuffle client push data. | | |
| celeborn.&lt;module&gt;.io.mode | EPOLL | false | Netty EventLoopGroup backend, available options: NIO, EPOLL. If epoll mode is available, the default IO mode is EPOLL; otherwise, the default is NIO. | | |
| celeborn.&lt;module&gt;.io.mode | NIO | false | Netty EventLoopGroup backend, available options: NIO, EPOLL. If epoll mode is available, the default IO mode is EPOLL; otherwise, the default is NIO. | | |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @SteNicholas Seems the doc generation depends on the developer environment

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can not pass the GA, need to revert it.

Copy link

codecov bot commented Dec 19, 2024

Codecov Report

Attention: Patch coverage is 88.88889% with 1 line in your changes missing coverage. Please review.

Project coverage is 33.02%. Comparing base (4aabe37) to head (f8cd555).
Report is 16 commits behind head on main.

Files with missing lines Patch % Lines
...cala/org/apache/celeborn/common/CelebornConf.scala 88.89% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3008      +/-   ##
==========================================
+ Coverage   32.88%   33.02%   +0.14%     
==========================================
  Files         331      331              
  Lines       19800    19851      +51     
  Branches     1780     1787       +7     
==========================================
+ Hits         6510     6554      +44     
- Misses      12929    12934       +5     
- Partials      361      363       +2     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

initDataClientFactoryIfNeeded();
}

public <T> T callLifecycleManagerWithTimeoutRetry(Callable<T> callable, String name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of making changes everywhere - do we want to simply change askSync/askAsync to become retry aware ? With number of retries passed in as a param (for specific cases where we dont want retries for ex) ?

Copy link
Contributor Author

@zaynt4606 zaynt4606 Dec 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree to change askSync/askAsync.
There are a lot of exception changes caused by that the setupLifecycleManagerRef will throws RpcTimeoutExceptions which we need to catch. I change the Exception type to RuntimeException

.withAlternative("celeborn.callLifecycleManager.maxRetries")
.categories("client")
.version("0.6.0")
.doc("Max retry times for client to reserve slots.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to reserve slots.

Seems not only for reserving slots.

buildConf("celeborn.rpc.timeoutRetryWait")
.categories("network")
.version("0.6.0")
.doc("Wait time before next retry if RpcTimeoutException.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if RpcTimeoutException => on RpcTimeoutException.

try {
limitZeroInFlight(mapKey, pushState);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary change.

@@ -1700,13 +1700,12 @@ private void mapEndInternal(
throws IOException {
final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
PushState pushState = getPushState(mapKey);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary change.

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems unnecessary change? I do not see new concurrent class involved in this class.


val CLIENT_CALL_LIFECYCLEMANAGER_MAX_RETRIES: ConfigEntry[Int] =
buildConf("celeborn.client.callLifecycleManager.maxRetries")
.withAlternative("celeborn.callLifecycleManager.maxRetries")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems there is no legacy config celeborn.callLifecycleManager.maxRetries, do not need withAlternative?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to reuse CLIENT_RPC_MAX_RETIRES?

  val CLIENT_RPC_MAX_RETIRES: ConfigEntry[Int] =
    buildConf("celeborn.client.rpc.maxRetries")
      .categories("client")
      .version("0.3.2")
      .doc("Max RPC retry times in LifecycleManager.")
      .intConf
      .createWithDefault(3)

Copy link
Member

@turboFei turboFei Dec 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too many parameters are not easy to maintain.

Copy link
Member

@turboFei turboFei Dec 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can fallback the specific client config to a client default config item at least.

@@ -4884,6 +4886,23 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("3s")

val RPC_TIMEOUT_RETRY_WAIT: ConfigEntry[Long] =
buildConf("celeborn.rpc.timeoutRetryWait")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems celeborn.rpc.retryWait is enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

has been updated~

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants