Skip to content

Commit

Permalink
[CELEBORN-1021] Celeborn support arbitary Ratis configs and client rp…
Browse files Browse the repository at this point in the history
…c timeout

### What changes were proposed in this pull request?
1. To support arbitrary Ratis configs
2. To support Ratis client rpc timeout

### Why are the changes needed?
After some digs that I found out Celeborn never changed the default config of ratis client's timeout.

### Does this PR introduce _any_ user-facing change?
NO.

### How was this patch tested?
GA and cluster.

Closes #1969 from FMX/CELEBORN-1021.

Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.tb@gmail.com>
  • Loading branch information
FMX authored and RexXiong committed Oct 18, 2023
1 parent f6dcfaa commit 69defca
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def haMasterRatisRetryCacheExpiryTime: Long = get(HA_MASTER_RATIS_SERVER_RETRY_CACHE_EXPIRY_TIME)
def haMasterRatisRpcTimeoutMin: Long = get(HA_MASTER_RATIS_RPC_TIMEOUT_MIN)
def haMasterRatisRpcTimeoutMax: Long = get(HA_MASTER_RATIS_RPC_TIMEOUT_MAX)
def haMasterRatisClientRpcTimeout: Long = get(HA_MASTER_RATIS_CLIENT_RPC_TIMEOUT)
def haMasterRatisClientRpcWatchTimeout: Long = get(HA_MASTER_RATIS_CLIENT_RPC_WATCH_TIMEOUT)
def haMasterRatisFirstElectionTimeoutMin: Long = get(HA_MASTER_RATIS_FIRSTELECTION_TIMEOUT_MIN)
def haMasterRatisFirstElectionTimeoutMax: Long = get(HA_MASTER_RATIS_FIRSTELECTION_TIMEOUT_MAX)
def haMasterRatisNotificationNoLeaderTimeout: Long =
Expand All @@ -625,6 +627,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def haMasterRatisSnapshotAutoTriggerThreshold: Long =
get(HA_MASTER_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD)
def haMasterRatisSnapshotRetentionFileNum: Int = get(HA_MASTER_RATIS_SNAPSHOT_RETENTION_FILE_NUM)
def haRatisCustomConfigs: JMap[String, String] = {
settings.asScala.filter(_._1.startsWith("celeborn.ratis")).toMap.asJava
}

// //////////////////////////////////////////////////////
// Worker //
Expand Down Expand Up @@ -1807,6 +1812,22 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("5s")

val HA_MASTER_RATIS_CLIENT_RPC_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.master.ha.ratis.raft.client.rpc.timeout")
.internal
.categories("ha")
.version("0.3.2")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("10s")

val HA_MASTER_RATIS_CLIENT_RPC_WATCH_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.master.ha.ratis.raft.client.rpc.watch.timeout")
.internal
.categories("ha")
.version("0.3.2")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("20s")

val HA_MASTER_RATIS_FIRSTELECTION_TIMEOUT_MIN: ConfigEntry[Long] =
buildConf("celeborn.master.ha.ratis.first.election.timeout.min")
.withAlternative("celeborn.ha.master.ratis.first.election.timeout.min")
Expand Down
6 changes: 6 additions & 0 deletions docs/migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ license: |
- Since 0.4.0, Celeborn deprecate `celeborn.worker.storage.baseDir.prefix` and `celeborn.worker.storage.baseDir.number`.
Please use `celeborn.worker.storage.dirs` instead.

## Upgrading from 0.3.1 to 0.3.2

- Since 0.3.1, Celeborn changed the default value of `raft.client.rpc.request.timeout` from `3s` to `10s`.

- Since 0.3.1, Celeborn changed the default value of `raft.client.rpc.watch.request.timeout` from `10s` to `20s`.

## Upgrading from 0.3.0 to 0.3.1

- Since 0.3.1, Celeborn changed the default value of `celeborn.worker.directMemoryRatioToResume` from `0.5` to `0.7`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.netty.NettyConfigKeys;
Expand Down Expand Up @@ -324,6 +325,14 @@ private RaftProperties newRaftProperties(CelebornConf conf) {
RaftServerConfigKeys.Rpc.setFirstElectionTimeoutMin(properties, firstElectionTimeoutMin);
RaftServerConfigKeys.Rpc.setFirstElectionTimeoutMax(properties, firstElectionTimeoutMax);

// Set the rpc client timeout
TimeDuration clientRpcTimeout =
TimeDuration.valueOf(conf.haMasterRatisClientRpcTimeout(), TimeUnit.SECONDS);
TimeDuration clientRpcWatchTimeout =
TimeDuration.valueOf(conf.haMasterRatisClientRpcWatchTimeout(), TimeUnit.SECONDS);
RaftClientConfigKeys.Rpc.setRequestTimeout(properties, clientRpcTimeout);
RaftClientConfigKeys.Rpc.setWatchRequestTimeout(properties, clientRpcWatchTimeout);

// Set the number of maximum cached segments
RaftServerConfigKeys.Log.setSegmentCacheNumMax(properties, 2);

Expand All @@ -348,6 +357,10 @@ private RaftProperties newRaftProperties(CelebornConf conf) {
long snapshotAutoTriggerThreshold = conf.haMasterRatisSnapshotAutoTriggerThreshold();
RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(properties, snapshotAutoTriggerThreshold);

for (Map.Entry<String, String> ratisEntry : conf.haRatisCustomConfigs().entrySet()) {
properties.set(ratisEntry.getKey().replace("celeborn.ratis.", ""), ratisEntry.getValue());
}

return properties;
}

Expand Down

0 comments on commit 69defca

Please sign in to comment.