Skip to content

Commit

Permalink
[MINOR]improvement(client/server): (RemoteMerger) Refactor to use mer…
Browse files Browse the repository at this point in the history
…geContext collect the arguments related (#2195)

### What changes were proposed in this pull request?

Refactor to use mergeContext collect the arguments related to remote merger

### Why are the changes needed?

- Make code clean and friendly to other developer who do not attention to `Remote Merger`.
- Without api change while extends the `mergeContext`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

No need, just refactor.
  • Loading branch information
maobaolong authored Oct 22, 2024
1 parent 3a35b0f commit 87f9b6f
Show file tree
Hide file tree
Showing 17 changed files with 145 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.RetryUtils;
import org.apache.uniffle.hadoop.shim.HadoopShimImpl;
import org.apache.uniffle.proto.RssProtos.MergeContext;
import org.apache.uniffle.storage.util.StorageType;

import static org.apache.hadoop.mapreduce.RssMRConfig.RSS_REMOTE_MERGE_CLASS_LOADER;
Expand Down Expand Up @@ -285,17 +286,20 @@ public Thread newThread(Runnable r) {
RssMRConfig.toRssConf(conf)
.get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE),
0,
remoteMergeEnable ? conf.getMapOutputKeyClass().getName() : null,
remoteMergeEnable
? conf.getMapOutputValueClass().getName()
: null,
remoteMergeEnable
? conf.getOutputKeyComparator().getClass().getName()
: null,
conf.getInt(
RssMRConfig.RSS_MERGED_BLOCK_SZIE,
RssMRConfig.RSS_MERGED_BLOCK_SZIE_DEFAULT),
conf.get(RSS_REMOTE_MERGE_CLASS_LOADER)));
? MergeContext.newBuilder()
.setKeyClass(conf.getMapOutputKeyClass().getName())
.setValueClass(conf.getMapOutputValueClass().getName())
.setComparatorClass(
conf.getOutputKeyComparator().getClass().getName())
.setMergedBlockSize(
conf.getInt(
RssMRConfig.RSS_MERGED_BLOCK_SZIE,
RssMRConfig.RSS_MERGED_BLOCK_SZIE_DEFAULT))
.setMergeClassLoader(
conf.get(RSS_REMOTE_MERGE_CLASS_LOADER, ""))
.build()
: null));
LOG.info(
"Finish register shuffle with "
+ (System.currentTimeMillis() - start)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.uniffle.common.serializer.SerializerInstance;
import org.apache.uniffle.common.serializer.SerializerUtils;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.proto.RssProtos;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -722,11 +723,7 @@ public void registerShuffle(
ShuffleDataDistributionType distributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber,
String keyClassName,
String valueClassName,
String comparatorClassName,
int mergedBlockSize,
String mergeClassLoader) {}
RssProtos.MergeContext mergeContext) {}

@Override
public boolean sendCommit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.hadoop.shim.HadoopShimImpl;
import org.apache.uniffle.proto.RssProtos;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -507,11 +508,7 @@ public void registerShuffle(
ShuffleDataDistributionType distributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber,
String keyClassName,
String valueClassName,
String comparatorClassName,
int mergedBlockSize,
String mergeClassLoader) {}
RssProtos.MergeContext mergeContext) {}

@Override
public boolean sendCommit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1028,10 +1028,6 @@ protected void registerShuffleServers(
ShuffleDataDistributionType.NORMAL,
maxConcurrencyPerPartitionToWrite,
stageAttemptNumber,
null,
null,
null,
-1,
null);
});
LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.RetryUtils;
import org.apache.uniffle.proto.RssProtos;

import static org.apache.uniffle.common.config.RssClientConf.MAX_CONCURRENCY_PER_PARTITION_TO_WRITE;

Expand Down Expand Up @@ -305,13 +306,23 @@ public ShuffleAssignmentsInfo run() throws Exception {
RssTezConfig.toRssConf(conf)
.get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE),
0,
keyClassName,
valueClassName,
comparatorClassName,
conf.getInt(
RssTezConfig.RSS_MERGED_BLOCK_SZIE,
RssTezConfig.RSS_MERGED_BLOCK_SZIE_DEFAULT),
conf.get(RssTezConfig.RSS_REMOTE_MERGE_CLASS_LOADER)));
StringUtils.isBlank(keyClassName)
? null
: RssProtos.MergeContext.newBuilder()
.setKeyClass(keyClassName)
.setValueClass(valueClassName)
.setComparatorClass(comparatorClassName)
.setMergedBlockSize(
conf.getInt(
RssTezConfig.RSS_MERGED_BLOCK_SZIE,
RssTezConfig
.RSS_MERGED_BLOCK_SZIE_DEFAULT))
.setMergeClassLoader(
conf.get(
RssTezConfig
.RSS_REMOTE_MERGE_CLASS_LOADER,
""))
.build()));
LOG.info(
"Finish register shuffle with "
+ (System.currentTimeMillis() - start)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.uniffle.common.serializer.SerializerInstance;
import org.apache.uniffle.common.serializer.SerializerUtils;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.proto.RssProtos;
import org.apache.uniffle.storage.util.StorageType;

import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -719,11 +720,7 @@ public void registerShuffle(
ShuffleDataDistributionType dataDistributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber,
String keyClassName,
String valueClassName,
String comparatorClassName,
int mergedBlockSize,
String mergeClassLoader) {}
RssProtos.MergeContext mergeContext) {}

@Override
public boolean sendCommit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.proto.RssProtos.MergeContext;

public interface ShuffleWriteClient {

Expand Down Expand Up @@ -72,10 +73,6 @@ default void registerShuffle(
dataDistributionType,
maxConcurrencyPerPartitionToWrite,
0,
null,
null,
null,
-1,
null);
}

Expand All @@ -88,11 +85,7 @@ void registerShuffle(
ShuffleDataDistributionType dataDistributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber,
String keyClassName,
String valueClassName,
String comparatorClassName,
int mergedBlockSize,
String mergeClassLoader);
MergeContext mergeContext);

boolean sendCommit(
Set<ShuffleServerInfo> shuffleServerInfoSet, String appId, int shuffleId, int numMaps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import org.apache.uniffle.common.util.BlockIdLayout;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.proto.RssProtos.MergeContext;

public class ShuffleWriteClientImpl implements ShuffleWriteClient {

Expand Down Expand Up @@ -564,11 +565,7 @@ public void registerShuffle(
ShuffleDataDistributionType dataDistributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber,
String keyClassName,
String valueClassName,
String comparatorClassName,
int mergedBlockSize,
String mergeClassLoader) {
MergeContext mergeContext) {
String user = null;
try {
user = UserGroupInformation.getCurrentUser().getShortUserName();
Expand All @@ -586,11 +583,7 @@ public void registerShuffle(
dataDistributionType,
maxConcurrencyPerPartitionToWrite,
stageAttemptNumber,
keyClassName,
valueClassName,
comparatorClassName,
mergedBlockSize,
mergeClassLoader);
mergeContext);
RssRegisterShuffleResponse response =
getShuffleServerClient(shuffleServerInfo).registerShuffle(request);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.proto.RssProtos;

public class MockedShuffleWriteClient implements ShuffleWriteClient {

Expand Down Expand Up @@ -63,11 +64,7 @@ public void registerShuffle(
ShuffleDataDistributionType dataDistributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber,
String keyClassName,
String valueClassName,
String comparatorClassName,
int mergedBlockSize,
String mergeClassLoader) {}
RssProtos.MergeContext mergeContext) {}

@Override
public boolean sendCommit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.uniffle.common.util.BlockIdLayout;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.proto.RssProtos;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.buffer.ShuffleBufferType;
import org.apache.uniffle.storage.util.StorageType;
Expand Down Expand Up @@ -174,11 +175,13 @@ public void remoteMergeWriteReadTest(String classes) throws Exception {
ShuffleDataDistributionType.NORMAL,
0,
-1,
keyClass.getName(),
valueClass.getName(),
comparator.getClass().getName(),
-1,
null);
RssProtos.MergeContext.newBuilder()
.setKeyClass(keyClass.getName())
.setValueClass(valueClass.getName())
.setComparatorClass(comparator.getClass().getName())
.setMergedBlockSize(-1)
.setMergeClassLoader("")
.build());

// 3 report shuffle result
// task 0 attempt 0 generate three blocks
Expand Down Expand Up @@ -337,11 +340,13 @@ public void remoteMergeWriteReadTestWithCombine(String classes) throws Exception
ShuffleDataDistributionType.NORMAL,
0,
-1,
keyClass.getName(),
valueClass.getName(),
comparator.getClass().getName(),
-1,
null);
RssProtos.MergeContext.newBuilder()
.setKeyClass(keyClass.getName())
.setValueClass(valueClass.getName())
.setComparatorClass(comparator.getClass().getName())
.setMergedBlockSize(-1)
.setMergeClassLoader("")
.build());

// 3 report shuffle result
// task 0 attempt 0 generate three blocks
Expand Down Expand Up @@ -508,11 +513,13 @@ public void remoteMergeWriteReadTestMultiPartition(String classes) throws Except
ShuffleDataDistributionType.NORMAL,
0,
-1,
keyClass.getName(),
valueClass.getName(),
comparator.getClass().getName(),
-1,
null);
RssProtos.MergeContext.newBuilder()
.setKeyClass(keyClass.getName())
.setValueClass(valueClass.getName())
.setComparatorClass(comparator.getClass().getName())
.setMergedBlockSize(-1)
.setMergeClassLoader("")
.build());

// 3 report shuffle result
// this shuffle have three partition, which is hash by key index mode 3
Expand Down Expand Up @@ -714,11 +721,13 @@ public void remoteMergeWriteReadTestMultiPartitionWithCombine(String classes) th
ShuffleDataDistributionType.NORMAL,
0,
-1,
keyClass.getName(),
valueClass.getName(),
comparator.getClass().getName(),
-1,
null);
RssProtos.MergeContext.newBuilder()
.setKeyClass(keyClass.getName())
.setValueClass(valueClass.getName())
.setComparatorClass(comparator.getClass().getName())
.setMergedBlockSize(-1)
.setMergeClassLoader("")
.build());

// 3 report shuffle result
// this shuffle have three partition, which is hash by key index mode 3
Expand Down
Loading

0 comments on commit 87f9b6f

Please sign in to comment.