From 4314b5b4efe7a7776ba9794342cb91cba7279a45 Mon Sep 17 00:00:00 2001 From: rickyma Date: Mon, 5 Feb 2024 22:55:57 +0800 Subject: [PATCH] [#1472] fix(server): Inaccurate flow control leads to Shuffle server OOM when enabling Netty --- .../uniffle/server/buffer/ShuffleBufferManager.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java index f907a4b105..26fb7dca2e 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java @@ -50,6 +50,8 @@ import org.apache.uniffle.server.ShuffleServerMetrics; import org.apache.uniffle.server.ShuffleTaskManager; +import static org.apache.uniffle.common.config.RssBaseConf.RSS_TEST_MODE_ENABLE; + public class ShuffleBufferManager { private static final Logger LOG = LoggerFactory.getLogger(ShuffleBufferManager.class); @@ -78,6 +80,7 @@ public class ShuffleBufferManager { private long reservedOnHeapPreAllocatedBufferBytes; private long reservedOffHeapPreAllocatedBufferBytes; private long maxAvailableOffHeapBytes; + private boolean testMode; protected long bufferSize = 0; protected AtomicLong preAllocatedSize = new AtomicLong(0L); @@ -155,6 +158,7 @@ public ShuffleBufferManager(ShuffleServerConf conf, ShuffleFlushManager shuffleF Math.round( capacity * conf.get(ShuffleServerConf.HUGE_PARTITION_MEMORY_USAGE_LIMITATION_RATIO)); this.nettyServerEnabled = conf.get(ShuffleServerConf.NETTY_SERVER_PORT) >= 0; + this.testMode = conf.getBoolean(RSS_TEST_MODE_ENABLE); } public void setShuffleTaskManager(ShuffleTaskManager taskManager) { @@ -689,7 +693,13 @@ private void addPickedShuffle(String shuffleIdKey, Map> pic shuffleIdSet.add(shuffleId); } - private static long getPinnedDirectMemory() { + private long getPinnedDirectMemory() { + if (testMode) { + // In test mode, return the actual pinned direct memory immediately + long pinnedDirectMemory = NettyUtils.getNettyBufferAllocator().pinnedDirectMemory(); + return pinnedDirectMemory == -1L ? 0L : pinnedDirectMemory; + } + // To optimize performance, return a periodically retrieved pinned direct memory long pinnedDirectMemory = (long) ShuffleServerMetrics.gaugePinnedDirectMemorySize.get() == -1L ? 0L