Skip to content

Commit

Permalink
[apache#1472] fix(server): Inaccurate flow control leads to Shuffle s…
Browse files Browse the repository at this point in the history
…erver OOM when enabling Netty
  • Loading branch information
rickyma committed Feb 5, 2024
1 parent 89d2ae8 commit 947bbf3
Show file tree
Hide file tree
Showing 15 changed files with 519 additions and 114 deletions.
12 changes: 12 additions & 0 deletions docs/server_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ MAX_DIRECT_MEMORY_SIZE=60g
```
rss.server.buffer.capacity 40g
rss.server.read.buffer.capacity 20g
rss.server.preAllocation.reserved.onHeap.size 1g
rss.server.preAllocation.reserved.offHeap.size 10g
```

#### Example of server conf
Expand All @@ -164,6 +166,16 @@ rss.server.preAllocation.expired 120000
rss.server.commit.timeout 600000
rss.server.app.expired.withoutHeartbeat 120000
# netty configs
rss.rpc.server.type GRPC_NETTY
rss.server.netty.port 17000
rss.server.preAllocation.reserved.onHeap.size 1g
rss.server.preAllocation.reserved.offHeap.size 10g
rss.server.netty.connect.timeout 50000
rss.server.netty.accept.thread 10
rss.server.netty.worker.thread 100
rss.server.netty.connect.backlog 128
# For huge partitions
rss.server.flush.localfile.threadPool.size 20
rss.server.flush.hadoop.threadPool.size 60
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,17 @@
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcNettyClient;
import org.apache.uniffle.client.request.RssFinishShuffleRequest;
import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;
Expand All @@ -55,6 +59,8 @@

public class DiskErrorToleranceTest extends ShuffleReadWriteBase {
private ShuffleServerGrpcClient shuffleServerClient;
private ShuffleServerGrpcNettyClient shuffleServerNettyClient;
private static ShuffleServerConf shuffleServerConfig;

private static File data1;
private static File data2;
Expand All @@ -76,21 +82,35 @@ public static void setupServers(@TempDir File serverTmpDir) throws Exception {
shuffleServerConf.setBoolean(ShuffleServerConf.HEALTH_CHECK_ENABLE, true);
createShuffleServer(shuffleServerConf);
startServers();
shuffleServerConfig = shuffleServerConf;
}

@BeforeEach
public void createClient() {
public void createClient() throws Exception {
shuffleServerClient = new ShuffleServerGrpcClient(LOCALHOST, SHUFFLE_SERVER_PORT);
RssConf rssConf = new RssConf();
rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY);
shuffleServerNettyClient =
new ShuffleServerGrpcNettyClient(
rssConf,
LOCALHOST,
SHUFFLE_SERVER_PORT,
shuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT));
}

@AfterEach
public void closeClient() {
shuffleServerClient.close();
shuffleServerNettyClient.close();
}

@Test
public void diskErrorTest() throws Exception {
diskErrorTest(true);
diskErrorTest(false);
}

private void diskErrorTest(boolean isNettyMode) throws Exception {
String appId = "ap_disk_error_data";
Map<Long, byte[]> expectedData = Maps.newHashMap();
Set<Long> expectedBlock1 = Sets.newHashSet();
Expand All @@ -106,10 +126,14 @@ public void diskErrorTest() throws Exception {
Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleToBlocks = Maps.newHashMap();
shuffleToBlocks.put(0, partitionToBlocks);
RssSendShuffleDataRequest rs1 = new RssSendShuffleDataRequest(appId, 3, 1000, shuffleToBlocks);
shuffleServerClient.sendShuffleData(rs1);
RssSendCommitRequest rc1 = new RssSendCommitRequest(appId, 0);
shuffleServerClient.sendCommit(rc1);
RssFinishShuffleRequest rf1 = new RssFinishShuffleRequest(appId, 0);
if (isNettyMode) {
shuffleServerNettyClient.sendShuffleData(rs1);
} else {
shuffleServerClient.sendShuffleData(rs1);
}
shuffleServerClient.sendCommit(rc1);
shuffleServerClient.finishShuffle(rf1);
ShuffleReadClientImpl readClient =
ShuffleClientFactory.newReadBuilder()
Expand Down Expand Up @@ -148,7 +172,11 @@ public void diskErrorTest() throws Exception {
partitionToBlocks.put(0, blocks2);
shuffleToBlocks.put(0, partitionToBlocks);
rs1 = new RssSendShuffleDataRequest(appId, 3, 1000, shuffleToBlocks);
shuffleServerClient.sendShuffleData(rs1);
if (isNettyMode) {
shuffleServerNettyClient.sendShuffleData(rs1);
} else {
shuffleServerClient.sendShuffleData(rs1);
}
shuffleServerClient.sendCommit(rc1);
shuffleServerClient.finishShuffle(rf1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,38 +34,57 @@
import org.apache.uniffle.client.factory.ShuffleServerClientFactory;
import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcNettyClient;
import org.apache.uniffle.client.request.RssFinishShuffleRequest;
import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.storage.util.StorageType;

import static org.junit.jupiter.api.Assertions.assertTrue;

public abstract class HybridStorageFaultToleranceBase extends ShuffleReadWriteBase {
private ShuffleServerGrpcClient shuffleServerClient;
private ShuffleServerGrpcNettyClient shuffleServerNettyClient;
private static String REMOTE_STORAGE = HDFS_URI + "rss/multi_storage_fault";

@BeforeEach
public void createClient() {
public void createClient() throws Exception {
ShuffleServerClientFactory.getInstance().cleanupCache();
shuffleServerClient = new ShuffleServerGrpcClient(LOCALHOST, SHUFFLE_SERVER_PORT);
RssConf rssConf = new RssConf();
rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY);
shuffleServerNettyClient =
new ShuffleServerGrpcNettyClient(
rssConf,
LOCALHOST,
SHUFFLE_SERVER_PORT,
NETTY_PORT);
}

@AfterEach
public void closeClient() {
shuffleServerClient.close();
shuffleServerNettyClient.close();
}

abstract void makeChaos();

@Test
public void fallbackTest() throws Exception {
fallbackTest(true);
fallbackTest(false);
}

private void fallbackTest(boolean isNettyMode) throws Exception {
String appId = "fallback_test_" + this.getClass().getSimpleName();
Map<Long, byte[]> expectedData = Maps.newHashMap();
Map<Integer, List<Integer>> map = Maps.newHashMap();
Expand All @@ -75,7 +94,7 @@ public void fallbackTest() throws Exception {
final List<ShuffleBlockInfo> blocks =
createShuffleBlockList(0, 0, 0, 40, 2 * 1024 * 1024, blockBitmap, expectedData);
makeChaos();
sendSinglePartitionToShuffleServer(appId, 0, 0, 0, blocks);
sendSinglePartitionToShuffleServer(appId, 0, 0, 0, blocks, isNettyMode);
validateResult(appId, 0, 0, blockBitmap, Roaring64NavigableMap.bitmapOf(0), expectedData);
}

Expand All @@ -94,16 +113,25 @@ private void registerShuffle(String appId, Map<Integer, List<Integer>> registerM
}

private void sendSinglePartitionToShuffleServer(
String appId, int shuffle, int partition, long taskAttemptId, List<ShuffleBlockInfo> blocks) {
String appId,
int shuffle,
int partition,
long taskAttemptId,
List<ShuffleBlockInfo> blocks,
boolean isNettyMode) {
Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap();
Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleToBlocks = Maps.newHashMap();
partitionToBlocks.put(partition, blocks);
shuffleToBlocks.put(shuffle, partitionToBlocks);
RssSendShuffleDataRequest rs = new RssSendShuffleDataRequest(appId, 3, 1000, shuffleToBlocks);
shuffleServerClient.sendShuffleData(rs);
RssSendCommitRequest rc = new RssSendCommitRequest(appId, shuffle);
shuffleServerClient.sendCommit(rc);
RssFinishShuffleRequest rf = new RssFinishShuffleRequest(appId, shuffle);
if (isNettyMode) {
shuffleServerNettyClient.sendShuffleData(rs);
} else {
shuffleServerClient.sendShuffleData(rs);
}
shuffleServerClient.sendCommit(rc);
shuffleServerClient.finishShuffle(rf);

Map<Integer, List<Long>> partitionToBlockIds = Maps.newHashMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,52 +141,6 @@ public static String generateBasePath(File tmpDir) {
return dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath();
}

public static List<ShuffleDataSegment> readShuffleIndexSegments(
ShuffleServerGrpcClient shuffleServerClient,
String appId,
int shuffleId,
int partitionId,
int partitionNumPerRange,
int partitionNum,
int readBufferSize) {
// read index file
RssGetShuffleIndexRequest rgsir =
new RssGetShuffleIndexRequest(
appId, shuffleId, partitionId, partitionNumPerRange, partitionNum);
ShuffleIndexResult shuffleIndexResult =
shuffleServerClient.getShuffleIndex(rgsir).getShuffleIndexResult();
return new FixedSizeSegmentSplitter(readBufferSize).split(shuffleIndexResult);
}

public static ShuffleDataResult readShuffleData(
ShuffleServerGrpcClient shuffleServerClient,
String appId,
int shuffleId,
int partitionId,
int partitionNumPerRange,
int partitionNum,
int segmentIndex,
List<ShuffleDataSegment> sds) {
if (segmentIndex >= sds.size()) {
return new ShuffleDataResult();
}

// read shuffle data
ShuffleDataSegment segment = sds.get(segmentIndex);
RssGetShuffleDataRequest rgsdr =
new RssGetShuffleDataRequest(
appId,
shuffleId,
partitionId,
partitionNumPerRange,
partitionNum,
segment.getOffset(),
segment.getLength());

return new ShuffleDataResult(
shuffleServerClient.getShuffleData(rgsdr).getShuffleData(), segment.getBufferSegments());
}

public static ShuffleDataResult readShuffleData(
ShuffleServerGrpcClient shuffleServerClient,
String appId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,16 @@ public static void setupServers() throws Exception {

private static Stream<Arguments> clientConcurrencyAndExpectedProvider() {
return Stream.of(
Arguments.of(-1, MAX_CONCURRENCY), Arguments.of(MAX_CONCURRENCY + 1, MAX_CONCURRENCY + 1));
Arguments.of(-1, MAX_CONCURRENCY, true),
Arguments.of(MAX_CONCURRENCY + 1, MAX_CONCURRENCY + 1, true),
Arguments.of(-1, MAX_CONCURRENCY, false),
Arguments.of(MAX_CONCURRENCY + 1, MAX_CONCURRENCY + 1, false));
}

@ParameterizedTest
@MethodSource("clientConcurrencyAndExpectedProvider")
public void testConcurrentWrite2Hadoop(int clientSpecifiedConcurrency, int expectedConcurrency)
public void testConcurrentWrite2Hadoop(
int clientSpecifiedConcurrency, int expectedConcurrency, boolean isNettyMode)
throws Exception {
String appId = "testConcurrentWrite2Hadoop_" + new Random().nextInt();
String dataBasePath = HDFS_URI + "rss/test";
Expand Down Expand Up @@ -116,7 +120,11 @@ public void testConcurrentWrite2Hadoop(int clientSpecifiedConcurrency, int expec
shuffleToBlocks.put(0, partitionToBlocks);
RssSendShuffleDataRequest rssdr =
new RssSendShuffleDataRequest(appId, 3, 1000, shuffleToBlocks);
shuffleServerClient.sendShuffleData(rssdr);
if (isNettyMode) {
shuffleServerNettyClient.sendShuffleData(rssdr);
} else {
shuffleServerClient.sendShuffleData(rssdr);
}
});

RssSendCommitRequest rscr = new RssSendCommitRequest(appId, 0);
Expand Down
Loading

0 comments on commit 947bbf3

Please sign in to comment.