Skip to content

Commit

Permalink
[apache#1888] feat(server): Reject requireBuffer/sendShuffleData for …
Browse files Browse the repository at this point in the history
…an application if one of the partitions exceeds the limit (apache#1889)

### What changes were proposed in this pull request?

Reject the `requireBuffer` and `sendShuffleData` requests for an application if one of the partitions exceeds the limit.
Introduce a config to limit the maximum of partition size, the client will receive an exception with message to show the partition size and the configured max partition size.

### Why are the changes needed?

Fix: apache#1888

### Does this PR introduce _any_ user-facing change?

Yes. this PR introduced a new config key `rss.server.huge-partition.size.hard.limit` with default value Long.MAX_VALUE to keep consistent with the previous code.

### How was this patch tested?

Manually tested in our env:
- Configure `rss.server.huge-partition.size.hard.limit` to a small size and wait for the expected exception.
  • Loading branch information
maobaolong authored Aug 5, 2024
1 parent 7729993 commit 69e4cde
Show file tree
Hide file tree
Showing 20 changed files with 397 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,32 @@

import java.util.Arrays;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.tuple.Pair;

public class ShufflePartitionedData {

private static final ShufflePartitionedBlock[] EMPTY_BLOCK_LIST =
new ShufflePartitionedBlock[] {};
private int partitionId;
private ShufflePartitionedBlock[] blockList;
private final ShufflePartitionedBlock[] blockList;
private final long totalBlockSize;

public ShufflePartitionedData(int partitionId, Pair<Long, ShufflePartitionedBlock[]> pair) {
this.partitionId = partitionId;
this.blockList = pair.getRight() == null ? EMPTY_BLOCK_LIST : pair.getRight();
totalBlockSize = pair.getLeft();
}

@VisibleForTesting
public ShufflePartitionedData(int partitionId, ShufflePartitionedBlock[] blockList) {
this.partitionId = partitionId;
this.blockList = blockList;
this.blockList = blockList == null ? EMPTY_BLOCK_LIST : blockList;
long size = 0L;
for (ShufflePartitionedBlock block : this.blockList) {
size += block.getSize();
}
totalBlockSize = size;
}

@Override
Expand All @@ -47,20 +65,10 @@ public void setPartitionId(int partitionId) {
}

public ShufflePartitionedBlock[] getBlockList() {
if (blockList == null) {
return new ShufflePartitionedBlock[] {};
}
return blockList;
}

public long getTotalBlockSize() {
if (blockList == null) {
return 0L;
}
long size = 0;
for (ShufflePartitionedBlock block : blockList) {
size += block.getSize();
}
return size;
return totalBlockSize;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.common.exception;

public class ExceedHugePartitionHardLimitException extends NotRetryException {
public ExceedHugePartitionHardLimitException(String message) {
super(message);
}

public ExceedHugePartitionHardLimitException(Throwable e) {
super(e);
}

public ExceedHugePartitionHardLimitException(String message, Throwable e) {
super(message, e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.uniffle.common.exception;

public class NoRegisterException extends RssException {
public class NoRegisterException extends NotRetryException {
public NoRegisterException(String message) {
super(message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ public NotRetryException(String message) {
super(message);
}

public NotRetryException(Throwable e) {
super(e);
}

public NotRetryException(String message, Throwable e) {
super(message, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public enum StatusCode {
INVALID_REQUEST(9),
NO_BUFFER_FOR_HUGE_PARTITION(10),
STAGE_RETRY_IGNORE(11),
EXCEED_HUGE_PARTITION_HARD_LIMIT(12),
APP_NOT_FOUND(13),
INTERNAL_NOT_RETRY_ERROR(14),
UNKNOWN(-1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ public void testToString() {
+ Arrays.toString(data1.getBlockList())
+ "}",
data1.toString());
ShufflePartitionedData data2 = new ShufflePartitionedData(0, null);
assertEquals("ShufflePartitionedData{partitionId=0, blockList=null}", data2.toString());
ShufflePartitionedData data2 = new ShufflePartitionedData(0, (ShufflePartitionedBlock[]) null);
assertEquals("ShufflePartitionedData{partitionId=0, blockList=[]}", data2.toString());
data2.setPartitionId(1);
assertEquals("ShufflePartitionedData{partitionId=1, blockList=null}", data2.toString());
assertEquals("ShufflePartitionedData{partitionId=1, blockList=[]}", data2.toString());
}
}
7 changes: 6 additions & 1 deletion docs/server_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ This document will introduce how to deploy Uniffle shuffle servers.
| rss.server.health.checker.script.execute.timeout | 5000 | Timeout for `HealthScriptChecker` execute health script.(ms) |

### Huge Partition Optimization
A huge partition is a common problem for Spark/MR and so on, caused by data skew. And it can cause the shuffle server to become unstable. To solve this, we introduce some mechanisms to limit the writing of huge partitions to avoid affecting regular partitions, more details can be found in [ISSUE-378](https://github.com/apache/incubator-uniffle/issues/378). The basic rules for limiting large partitions are memory usage limits and flushing individual buffers directly to persistent storage.
A huge partition is a common problem for Spark/MR and so on, caused by data skew. And it can cause the shuffle server to become unstable. To solve this, we introduce some mechanisms to limit the writing of huge partitions to avoid affecting regular partitions, and introduce a hard limit config to reject extremely huge partition, more details can be found in [ISSUE-378](https://github.com/apache/incubator-uniffle/issues/378). The basic rules for limiting large partitions are memory usage limits and flushing individual buffers directly to persistent storage.

#### Memory usage limit
To do this, we introduce the extra configs
Expand All @@ -144,6 +144,11 @@ For HADOOP FS, the conf value of `rss.server.single.buffer.flush.threshold` shou

Finally, to improve the speed of writing to HDFS for a single partition, the value of `rss.server.max.concurrency.of.per-partition.write` and `rss.server.flush.hdfs.threadPool.size` could be increased to 50 or 100.

#### Hard limit
Once the huge partition reaches the hard limit size, which is set by the configuration `rss.server.huge-partition.size.hard.limit`, the server will reject the `sendShuffleData` request and the client will not retry. This allows the client to fail fast and enables the user to modify their SQLs or jobs to avoid reaching the partition hard limit.

For example, if the hard limit is set to 50g, the server will reject the request if the partition size is greater than 50g, causing the job to eventually fail.

### Netty
In version 0.8.0, we introduced Netty. Enabling Netty on ShuffleServer can significantly reduce GC time in high-throughput scenarios. We can enable Netty through the parameters `rss.server.netty.port` and `rss.rpc.server.type`. Note: After setting the parameter `rss.rpc.server.type` to `GRPC_NETTY`, ShuffleServer will be tagged with `GRPC_NETTY`, that is, the node can only be assigned to clients with `spark.rss.client.type=GRPC_NETTY`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,13 @@ private void sendDataAndRequireBufferTest(boolean isNettyMode) throws IOExceptio
// trigger NoBufferForHugePartitionException and get FAILED_REQUIRE_ID
long requireId =
shuffleServerClient.requirePreAllocation(
appId, shuffleId, Lists.newArrayList(partitionId), hugePartitionDataLength, 3, 100);
appId,
shuffleId,
Lists.newArrayList(partitionId),
Lists.newArrayList(hugePartitionDataLength),
hugePartitionDataLength,
3,
100);
assertEquals(FAILED_REQUIRE_ID, requireId);

// Add NoBufferForHugePartitionException check
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,10 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
protected static final int BACK_OFF_BASE = 2000;
static final List<StatusCode> NOT_RETRY_STATUS_CODES =
Lists.newArrayList(
StatusCode.NO_REGISTER, StatusCode.APP_NOT_FOUND, StatusCode.INTERNAL_NOT_RETRY_ERROR);
StatusCode.NO_REGISTER,
StatusCode.APP_NOT_FOUND,
StatusCode.INTERNAL_NOT_RETRY_ERROR,
StatusCode.EXCEED_HUGE_PARTITION_HARD_LIMIT);

@VisibleForTesting
public ShuffleServerGrpcClient(String host, int port) {
Expand Down Expand Up @@ -262,21 +265,29 @@ private AppHeartBeatResponse doSendHeartBeat(String appId, long timeout) {
public long requirePreAllocation(
String appId, int requireSize, int retryMax, long retryIntervalMax) throws Exception {
return requirePreAllocation(
appId, 0, Collections.emptyList(), requireSize, retryMax, retryIntervalMax);
appId,
0,
Collections.emptyList(),
Collections.emptyList(),
requireSize,
retryMax,
retryIntervalMax);
}

@VisibleForTesting
public long requirePreAllocation(
String appId,
int shuffleId,
List<Integer> partitionIds,
List<Integer> partitionRequireSizes,
int requireSize,
int retryMax,
long retryIntervalMax) {
return requirePreAllocation(
appId,
shuffleId,
partitionIds,
partitionRequireSizes,
requireSize,
retryMax,
retryIntervalMax,
Expand All @@ -287,6 +298,7 @@ public long requirePreAllocation(
String appId,
int shuffleId,
List<Integer> partitionIds,
List<Integer> partitionRequireSizes,
int requireSize,
int retryMax,
long retryIntervalMax,
Expand All @@ -295,6 +307,7 @@ public long requirePreAllocation(
RequireBufferRequest.newBuilder()
.setShuffleId(shuffleId)
.addAllPartitionIds(partitionIds)
.addAllPartitionRequireSizes(partitionRequireSizes)
.setAppId(appId)
.setRequireSize(requireSize)
.build();
Expand Down Expand Up @@ -373,7 +386,9 @@ public long requirePreAllocation(
System.currentTimeMillis() - start);
}
result = rpcResponse.getRequireBufferId();
} else if (rpcResponse.getStatus() == RssProtos.StatusCode.NO_REGISTER) {
} else if (NOT_RETRY_STATUS_CODES.contains(
StatusCode.fromCode(rpcResponse.getStatus().getNumber()))) {
failedStatusCodeRef.set(StatusCode.fromCode(rpcResponse.getStatus().getNumber()));
String msg =
"Can't require "
+ requireSize
Expand Down Expand Up @@ -518,9 +533,11 @@ public RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest requ
int blockNum = 0;
int shuffleId = stb.getKey();
List<Integer> partitionIds = new ArrayList<>();
List<Integer> partitionRequireSizes = new ArrayList<>();

for (Map.Entry<Integer, List<ShuffleBlockInfo>> ptb : stb.getValue().entrySet()) {
List<ShuffleBlock> shuffleBlocks = Lists.newArrayList();
int partitionRequireSize = 0;
for (ShuffleBlockInfo sbi : ptb.getValue()) {
shuffleBlocks.add(
ShuffleBlock.newBuilder()
Expand All @@ -531,15 +548,17 @@ public RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest requ
.setUncompressLength(sbi.getUncompressLength())
.setData(UnsafeByteOperations.unsafeWrap(sbi.getData().nioBuffer()))
.build());
size += sbi.getSize();
partitionRequireSize += sbi.getSize();
blockNum++;
}
size += partitionRequireSize;
shuffleData.add(
ShuffleData.newBuilder()
.setPartitionId(ptb.getKey())
.addAllBlock(shuffleBlocks)
.build());
partitionIds.add(ptb.getKey());
partitionRequireSizes.add(partitionRequireSize);
}

final int allocateSize = size;
Expand All @@ -552,6 +571,7 @@ public RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest requ
appId,
shuffleId,
partitionIds,
partitionRequireSizes,
allocateSize,
request.getRetryMax() / maxRetryAttempts,
request.getRetryIntervalMax(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,16 @@ public RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest requ
int size = 0;
int blockNum = 0;
List<Integer> partitionIds = new ArrayList<>();
List<Integer> partitionRequireSizes = new ArrayList<>();
for (Map.Entry<Integer, List<ShuffleBlockInfo>> ptb : stb.getValue().entrySet()) {
int partitionRequireSize = 0;
for (ShuffleBlockInfo sbi : ptb.getValue()) {
size += sbi.getSize();
partitionRequireSize += sbi.getSize();
blockNum++;
}
size += partitionRequireSize;
partitionIds.add(ptb.getKey());
partitionRequireSizes.add(partitionRequireSize);
}

SendShuffleDataRequest sendShuffleDataRequest =
Expand All @@ -177,6 +181,7 @@ public RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest requ
request.getAppId(),
shuffleId,
partitionIds,
partitionRequireSizes,
allocateSize,
request.getRetryMax(),
request.getRetryIntervalMax(),
Expand Down
2 changes: 2 additions & 0 deletions proto/src/main/proto/Rss.proto
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ message RequireBufferRequest {
string appId = 2;
int32 shuffleId = 3;
repeated int32 partitionIds = 4;
repeated int32 partitionRequireSizes = 5;
}

message RequireBufferResponse {
Expand Down Expand Up @@ -312,6 +313,7 @@ enum StatusCode {
INVALID_REQUEST = 9;
NO_BUFFER_FOR_HUGE_PARTITION = 10;
STAGE_RETRY_IGNORE = 11;
EXCEED_HUGE_PARTITION_HARD_LIMIT = 12;
APP_NOT_FOUND = 13;
INTERNAL_NOT_RETRY_ERROR = 14;
// add more status
Expand Down
Loading

0 comments on commit 69e4cde

Please sign in to comment.