Skip to content

Commit

Permalink
[apache#1472] fix(server): Inaccurate flow control leads to Shuffle s…
Browse files Browse the repository at this point in the history
…erver OOM when enabling Netty
  • Loading branch information
rickyma committed Feb 5, 2024
1 parent 947bbf3 commit 4314b5b
Showing 1 changed file with 11 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.apache.uniffle.server.ShuffleServerMetrics;
import org.apache.uniffle.server.ShuffleTaskManager;

import static org.apache.uniffle.common.config.RssBaseConf.RSS_TEST_MODE_ENABLE;

public class ShuffleBufferManager {

private static final Logger LOG = LoggerFactory.getLogger(ShuffleBufferManager.class);
Expand Down Expand Up @@ -78,6 +80,7 @@ public class ShuffleBufferManager {
private long reservedOnHeapPreAllocatedBufferBytes;
private long reservedOffHeapPreAllocatedBufferBytes;
private long maxAvailableOffHeapBytes;
private boolean testMode;

protected long bufferSize = 0;
protected AtomicLong preAllocatedSize = new AtomicLong(0L);
Expand Down Expand Up @@ -155,6 +158,7 @@ public ShuffleBufferManager(ShuffleServerConf conf, ShuffleFlushManager shuffleF
Math.round(
capacity * conf.get(ShuffleServerConf.HUGE_PARTITION_MEMORY_USAGE_LIMITATION_RATIO));
this.nettyServerEnabled = conf.get(ShuffleServerConf.NETTY_SERVER_PORT) >= 0;
this.testMode = conf.getBoolean(RSS_TEST_MODE_ENABLE);
}

public void setShuffleTaskManager(ShuffleTaskManager taskManager) {
Expand Down Expand Up @@ -689,7 +693,13 @@ private void addPickedShuffle(String shuffleIdKey, Map<String, Set<Integer>> pic
shuffleIdSet.add(shuffleId);
}

private static long getPinnedDirectMemory() {
private long getPinnedDirectMemory() {
if (testMode) {
// In test mode, return the actual pinned direct memory immediately
long pinnedDirectMemory = NettyUtils.getNettyBufferAllocator().pinnedDirectMemory();
return pinnedDirectMemory == -1L ? 0L : pinnedDirectMemory;
}
// To optimize performance, return a periodically retrieved pinned direct memory
long pinnedDirectMemory =
(long) ShuffleServerMetrics.gaugePinnedDirectMemorySize.get() == -1L
? 0L
Expand Down

0 comments on commit 4314b5b

Please sign in to comment.