Skip to content

Commit

Permalink
Merge branch 'master' into issue-2173
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengchenyu authored Oct 22, 2024
2 parents 3a68303 + 87f9b6f commit ccd9f2e
Show file tree
Hide file tree
Showing 32 changed files with 317 additions and 221 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.RetryUtils;
import org.apache.uniffle.hadoop.shim.HadoopShimImpl;
import org.apache.uniffle.proto.RssProtos.MergeContext;
import org.apache.uniffle.storage.util.StorageType;

import static org.apache.hadoop.mapreduce.RssMRConfig.RSS_REMOTE_MERGE_CLASS_LOADER;
Expand Down Expand Up @@ -285,17 +286,20 @@ public Thread newThread(Runnable r) {
RssMRConfig.toRssConf(conf)
.get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE),
0,
remoteMergeEnable ? conf.getMapOutputKeyClass().getName() : null,
remoteMergeEnable
? conf.getMapOutputValueClass().getName()
: null,
remoteMergeEnable
? conf.getOutputKeyComparator().getClass().getName()
: null,
conf.getInt(
RssMRConfig.RSS_MERGED_BLOCK_SZIE,
RssMRConfig.RSS_MERGED_BLOCK_SZIE_DEFAULT),
conf.get(RSS_REMOTE_MERGE_CLASS_LOADER)));
? MergeContext.newBuilder()
.setKeyClass(conf.getMapOutputKeyClass().getName())
.setValueClass(conf.getMapOutputValueClass().getName())
.setComparatorClass(
conf.getOutputKeyComparator().getClass().getName())
.setMergedBlockSize(
conf.getInt(
RssMRConfig.RSS_MERGED_BLOCK_SZIE,
RssMRConfig.RSS_MERGED_BLOCK_SZIE_DEFAULT))
.setMergeClassLoader(
conf.get(RSS_REMOTE_MERGE_CLASS_LOADER, ""))
.build()
: null));
LOG.info(
"Finish register shuffle with "
+ (System.currentTimeMillis() - start)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.uniffle.common.serializer.SerializerInstance;
import org.apache.uniffle.common.serializer.SerializerUtils;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.proto.RssProtos;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -721,11 +722,7 @@ public void registerShuffle(
ShuffleDataDistributionType distributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber,
String keyClassName,
String valueClassName,
String comparatorClassName,
int mergedBlockSize,
String mergeClassLoader) {}
RssProtos.MergeContext mergeContext) {}

@Override
public boolean sendCommit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.hadoop.shim.HadoopShimImpl;
import org.apache.uniffle.proto.RssProtos;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -507,11 +508,7 @@ public void registerShuffle(
ShuffleDataDistributionType distributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber,
String keyClassName,
String valueClassName,
String comparatorClassName,
int mergedBlockSize,
String mergeClassLoader) {}
RssProtos.MergeContext mergeContext) {}

@Override
public boolean sendCommit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1028,10 +1028,6 @@ protected void registerShuffleServers(
ShuffleDataDistributionType.NORMAL,
maxConcurrencyPerPartitionToWrite,
stageAttemptNumber,
null,
null,
null,
-1,
null);
});
LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.RetryUtils;
import org.apache.uniffle.proto.RssProtos;

import static org.apache.uniffle.common.config.RssClientConf.MAX_CONCURRENCY_PER_PARTITION_TO_WRITE;

Expand Down Expand Up @@ -305,13 +306,23 @@ public ShuffleAssignmentsInfo run() throws Exception {
RssTezConfig.toRssConf(conf)
.get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE),
0,
keyClassName,
valueClassName,
comparatorClassName,
conf.getInt(
RssTezConfig.RSS_MERGED_BLOCK_SZIE,
RssTezConfig.RSS_MERGED_BLOCK_SZIE_DEFAULT),
conf.get(RssTezConfig.RSS_REMOTE_MERGE_CLASS_LOADER)));
StringUtils.isBlank(keyClassName)
? null
: RssProtos.MergeContext.newBuilder()
.setKeyClass(keyClassName)
.setValueClass(valueClassName)
.setComparatorClass(comparatorClassName)
.setMergedBlockSize(
conf.getInt(
RssTezConfig.RSS_MERGED_BLOCK_SZIE,
RssTezConfig
.RSS_MERGED_BLOCK_SZIE_DEFAULT))
.setMergeClassLoader(
conf.get(
RssTezConfig
.RSS_REMOTE_MERGE_CLASS_LOADER,
""))
.build()));
LOG.info(
"Finish register shuffle with "
+ (System.currentTimeMillis() - start)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.uniffle.common.serializer.SerializerInstance;
import org.apache.uniffle.common.serializer.SerializerUtils;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.proto.RssProtos;
import org.apache.uniffle.storage.util.StorageType;

import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -715,11 +716,7 @@ public void registerShuffle(
ShuffleDataDistributionType dataDistributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber,
String keyClassName,
String valueClassName,
String comparatorClassName,
int mergedBlockSize,
String mergeClassLoader) {}
RssProtos.MergeContext mergeContext) {}

@Override
public boolean sendCommit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.proto.RssProtos.MergeContext;

public interface ShuffleWriteClient {

Expand Down Expand Up @@ -72,10 +73,6 @@ default void registerShuffle(
dataDistributionType,
maxConcurrencyPerPartitionToWrite,
0,
null,
null,
null,
-1,
null);
}

Expand All @@ -88,11 +85,7 @@ void registerShuffle(
ShuffleDataDistributionType dataDistributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber,
String keyClassName,
String valueClassName,
String comparatorClassName,
int mergedBlockSize,
String mergeClassLoader);
MergeContext mergeContext);

boolean sendCommit(
Set<ShuffleServerInfo> shuffleServerInfoSet, String appId, int shuffleId, int numMaps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import org.apache.uniffle.common.util.BlockIdLayout;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.proto.RssProtos.MergeContext;

public class ShuffleWriteClientImpl implements ShuffleWriteClient {

Expand Down Expand Up @@ -564,11 +565,7 @@ public void registerShuffle(
ShuffleDataDistributionType dataDistributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber,
String keyClassName,
String valueClassName,
String comparatorClassName,
int mergedBlockSize,
String mergeClassLoader) {
MergeContext mergeContext) {
String user = null;
try {
user = UserGroupInformation.getCurrentUser().getShortUserName();
Expand All @@ -586,11 +583,7 @@ public void registerShuffle(
dataDistributionType,
maxConcurrencyPerPartitionToWrite,
stageAttemptNumber,
keyClassName,
valueClassName,
comparatorClassName,
mergedBlockSize,
mergeClassLoader);
mergeContext);
RssRegisterShuffleResponse response =
getShuffleServerClient(shuffleServerInfo).registerShuffle(request);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.proto.RssProtos;

public class MockedShuffleWriteClient implements ShuffleWriteClient {

Expand Down Expand Up @@ -63,11 +64,7 @@ public void registerShuffle(
ShuffleDataDistributionType dataDistributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber,
String keyClassName,
String valueClassName,
String comparatorClassName,
int mergedBlockSize,
String mergeClassLoader) {}
RssProtos.MergeContext mergeContext) {}

@Override
public boolean sendCommit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,18 @@ public class RssBaseConf extends RssConf {
+ " first combining the username and the password with a colon (uniffle:uniffle123)"
+ ", and then by encoding the resulting string in base64 (dW5pZmZsZTp1bmlmZmxlMTIz).");

public static final ConfigOption<String> RSS_STORAGE_WRITE_DATA_BUFFER_SIZE =
ConfigOptions.key("rss.storage.write.dataBufferSize")
.stringType()
.defaultValue("8k")
.withDescription("The buffer size to cache the write data content.");

public static final ConfigOption<String> RSS_STORAGE_WRITE_INDEX_BUFFER_SIZE =
ConfigOptions.key("rss.storage.write.indexBufferSize")
.stringType()
.defaultValue("8k")
.withDescription("The buffer size to cache the write index content.");

public boolean loadConfFromFile(String fileName, List<ConfigOption<Object>> configOptions) {
Map<String, String> properties = RssUtils.getPropertiesFromFile(fileName);
if (properties == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,21 @@ private static Map<Integer, List<ShuffleBlockInfo>> decodePartitionData(ByteBuf
int lengthOfShuffleBlocks = byteBuf.readInt();
List<ShuffleBlockInfo> shuffleBlockInfoList = Lists.newArrayList();
for (int j = 0; j < lengthOfShuffleBlocks; j++) {
shuffleBlockInfoList.add(Decoders.decodeShuffleBlockInfo(byteBuf));
try {
shuffleBlockInfoList.add(Decoders.decodeShuffleBlockInfo(byteBuf));
} catch (Throwable t) {
// An OutOfDirectMemoryError will be thrown, when the direct memory reaches the limit.
// OutOfDirectMemoryError will not cause the JVM to exit, but may lead to direct memory
// leaks.
// Note: You can refer to docs/server_guide.md to set MAX_DIRECT_MEMORY_SIZE to a
// reasonable value.
shuffleBlockInfoList.forEach(sbi -> sbi.getData().release());
partitionToBlocks.forEach(
(integer, shuffleBlockInfos) -> {
shuffleBlockInfos.forEach(sbi -> sbi.getData().release());
});
throw t;
}
}
partitionToBlocks.put(partitionId, shuffleBlockInfoList);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,8 @@ private ServerNode toServerNode(ShuffleServerHeartBeatRequest request) {
request.getServerId().getJettyPort(),
request.getStartTimeMs(),
request.getVersion(),
request.getGitCommitId());
request.getGitCommitId(),
request.getApplicationInfoList());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@

package org.apache.uniffle.coordinator;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.storage.StorageInfo;
import org.apache.uniffle.proto.RssProtos;
import org.apache.uniffle.proto.RssProtos.ShuffleServerId;

public class ServerNode implements Comparable<ServerNode> {
Expand All @@ -46,6 +50,7 @@ public class ServerNode implements Comparable<ServerNode> {
private long startTime = -1;
private String version;
private String gitCommitId;
Map<String, RssProtos.ApplicationInfo> appIdToInfos;

public ServerNode(String id) {
this(id, "", 0, 0, 0, 0, 0, Sets.newHashSet(), ServerStatus.EXCLUDED);
Expand Down Expand Up @@ -181,7 +186,8 @@ public ServerNode(
jettyPort,
startTime,
"",
"");
"",
Collections.EMPTY_LIST);
}

public ServerNode(
Expand All @@ -199,7 +205,8 @@ public ServerNode(
int jettyPort,
long startTime,
String version,
String gitCommitId) {
String gitCommitId,
List<RssProtos.ApplicationInfo> appInfos) {
this.id = id;
this.ip = ip;
this.grpcPort = grpcPort;
Expand All @@ -221,6 +228,8 @@ public ServerNode(
this.startTime = startTime;
this.version = version;
this.gitCommitId = gitCommitId;
this.appIdToInfos = new ConcurrentHashMap<>();
appInfos.forEach(appInfo -> appIdToInfos.put(appInfo.getAppId(), appInfo));
}

public ShuffleServerId convertToGrpcProto() {
Expand Down
17 changes: 7 additions & 10 deletions dashboard/src/main/webapp/src/pages/ApplicationPage.vue
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,13 @@
:formatter="dateFormatter"
sortable
/>
<el-table-column
prop="version"
label="Version"
min-width="180"
/>
<el-table-column
prop="gitCommitId"
label="GitCommitId"
min-width="180"
/>
<el-table-column label="Version" min-width="180">
<template v-slot="{ row }">
<div class="version">
{{ row.version }}_{{ row.gitCommitId }}
</div>
</template>
</el-table-column>
</el-table>
</div>
</div>
Expand Down
Loading

0 comments on commit ccd9f2e

Please sign in to comment.