From 0ead7434ffe8e868a83f72540780afb59c3d8cdb Mon Sep 17 00:00:00 2001 From: rickyma Date: Mon, 5 Feb 2024 22:55:57 +0800 Subject: [PATCH] [#1472] fix(server): Inaccurate flow control leads to Shuffle server OOM when enabling Netty --- .../test/HybridStorageFaultToleranceBase.java | 6 +---- .../test/ShuffleServerFaultToleranceTest.java | 24 ++++++++++++++++++- ...ShuffleServerWithKerberizedHadoopTest.java | 1 - .../ShuffleServerWithMemLocalHadoopTest.java | 20 ++++++++++++++++ .../server/buffer/ShuffleBufferManager.java | 12 +++++++++- 5 files changed, 55 insertions(+), 8 deletions(-) diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageFaultToleranceBase.java b/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageFaultToleranceBase.java index 253b5be988..d14e3ff817 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageFaultToleranceBase.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageFaultToleranceBase.java @@ -63,11 +63,7 @@ public void createClient() throws Exception { RssConf rssConf = new RssConf(); rssConf.set(RssClientConf.RSS_CLIENT_TYPE, ClientType.GRPC_NETTY); shuffleServerNettyClient = - new ShuffleServerGrpcNettyClient( - rssConf, - LOCALHOST, - SHUFFLE_SERVER_PORT, - NETTY_PORT); + new ShuffleServerGrpcNettyClient(rssConf, LOCALHOST, SHUFFLE_SERVER_PORT, NETTY_PORT); } @AfterEach diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java index 20dd9aa3b8..48656af9fa 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java @@ -24,10 +24,13 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.mockito.MockedStatic; +import org.mockito.Mockito; import org.roaringbitmap.longlong.Roaring64NavigableMap; import org.apache.uniffle.client.TestUtils; @@ -47,6 +50,7 @@ import org.apache.uniffle.common.config.RssClientConf; import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.util.ByteBufUtils; +import org.apache.uniffle.common.util.NettyUtils; import org.apache.uniffle.coordinator.CoordinatorConf; import org.apache.uniffle.coordinator.CoordinatorServer; import org.apache.uniffle.server.MockedShuffleServer; @@ -62,17 +66,21 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.when; public class ShuffleServerFaultToleranceTest extends ShuffleReadWriteBase { private List shuffleServerClients; private List shuffleServerNettyClients; + private static MockedStatic nettyUtils; private String remoteStoragePath = HDFS_URI + "rss/test"; @BeforeEach public void setupServers(@TempDir File tmpDir) throws Exception { + nettyUtils = mockStatic(NettyUtils.class, Mockito.CALLS_REAL_METHODS); + nettyUtils.when(NettyUtils::getMaxDirectMemory).thenReturn(600L); CoordinatorConf coordinatorConf = getCoordinatorConf(); createCoordinatorServer(coordinatorConf); shuffleServers.add(createServer(0, tmpDir)); @@ -91,7 +99,9 @@ public void setupServers(@TempDir File tmpDir) throws Exception { rssConf, LOCALHOST, SHUFFLE_SERVER_PORT, - shuffleServer.getShuffleServerConf().getInteger(ShuffleServerConf.NETTY_SERVER_PORT))); + shuffleServer + .getShuffleServerConf() + .getInteger(ShuffleServerConf.NETTY_SERVER_PORT))); } } @@ -108,6 +118,11 @@ public void cleanEnv() throws Exception { cleanCluster(); } + @AfterAll + public static void tearDown() { + nettyUtils.close(); + } + @Test public void testReadFaultTolerance() throws Exception { testReadFaultTolerance(true); @@ -316,6 +331,13 @@ public static MockedShuffleServer createServer(int id, File tmpDir) throws Excep shuffleServerConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT + 20 + id); shuffleServerConf.setInteger("rss.jetty.http.port", 19081 + id * 100); shuffleServerConf.setString("rss.storage.basePath", basePath); + shuffleServerConf.set( + ShuffleServerConf.SERVER_PRE_ALLOCATION_RESERVED_OFF_HEAP_SIZE, + (long) + (NettyUtils.getMaxDirectMemory() + / 100 + * shuffleServerConf.getDouble( + ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE))); return new MockedShuffleServer(shuffleServerConf); } diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java index 34298e8a72..06460c1e32 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java @@ -128,7 +128,6 @@ public static void setup() throws Exception { ShuffleServerConf shuffleServerConf = getShuffleServerConf(); shuffleServer = new ShuffleServer(shuffleServerConf); shuffleServer.start(); - } @AfterAll diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHadoopTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHadoopTest.java index cde83f1e4a..1b8a6886d2 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHadoopTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHadoopTest.java @@ -24,11 +24,14 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.mockito.MockedStatic; +import org.mockito.Mockito; import org.roaringbitmap.longlong.Roaring64NavigableMap; import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient; @@ -44,6 +47,7 @@ import org.apache.uniffle.common.config.RssClientConf; import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.util.ByteBufUtils; +import org.apache.uniffle.common.util.NettyUtils; import org.apache.uniffle.coordinator.CoordinatorConf; import org.apache.uniffle.server.ShuffleServerConf; import org.apache.uniffle.server.buffer.ShuffleBuffer; @@ -59,6 +63,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.Mockito.mockStatic; public class ShuffleServerWithMemLocalHadoopTest extends ShuffleReadWriteBase { @@ -66,9 +71,12 @@ public class ShuffleServerWithMemLocalHadoopTest extends ShuffleReadWriteBase { private ShuffleServerGrpcNettyClient shuffleServerNettyClient; private static String REMOTE_STORAGE = HDFS_URI + "rss/test"; private static ShuffleServerConf shuffleServerConfig; + private static MockedStatic nettyUtils; @BeforeAll public static void setupServers(@TempDir File tmpDir) throws Exception { + nettyUtils = mockStatic(NettyUtils.class, Mockito.CALLS_REAL_METHODS); + nettyUtils.when(NettyUtils::getMaxDirectMemory).thenReturn(500L); CoordinatorConf coordinatorConf = getCoordinatorConf(); createCoordinatorServer(coordinatorConf); ShuffleServerConf shuffleServerConf = getShuffleServerConf(); @@ -83,6 +91,13 @@ public static void setupServers(@TempDir File tmpDir) throws Exception { shuffleServerConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 40.0); shuffleServerConf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 500L); shuffleServerConf.set(ShuffleServerConf.SERVER_TRIGGER_FLUSH_CHECK_INTERVAL, 500L); + shuffleServerConf.set( + ShuffleServerConf.SERVER_PRE_ALLOCATION_RESERVED_OFF_HEAP_SIZE, + (long) + (NettyUtils.getMaxDirectMemory() + / 100 + * shuffleServerConf.getDouble( + ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE))); createShuffleServer(shuffleServerConf); startServers(); shuffleServerConfig = shuffleServerConf; @@ -107,6 +122,11 @@ public void closeClient() { shuffleServerNettyClient.close(); } + @AfterAll + public static void tearDown() { + nettyUtils.close(); + } + @Test public void memoryLocalFileHadoopReadWithFilterAndSkipTest() throws Exception { runTest(true, true); diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java index f907a4b105..7c3bd3749f 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java @@ -50,6 +50,8 @@ import org.apache.uniffle.server.ShuffleServerMetrics; import org.apache.uniffle.server.ShuffleTaskManager; +import static org.apache.uniffle.common.config.RssBaseConf.RSS_TEST_MODE_ENABLE; + public class ShuffleBufferManager { private static final Logger LOG = LoggerFactory.getLogger(ShuffleBufferManager.class); @@ -78,6 +80,7 @@ public class ShuffleBufferManager { private long reservedOnHeapPreAllocatedBufferBytes; private long reservedOffHeapPreAllocatedBufferBytes; private long maxAvailableOffHeapBytes; + private boolean testMode; protected long bufferSize = 0; protected AtomicLong preAllocatedSize = new AtomicLong(0L); @@ -122,6 +125,7 @@ public ShuffleBufferManager(ShuffleServerConf conf, ShuffleFlushManager shuffleF conf.getSizeAsBytes(ShuffleServerConf.SERVER_PRE_ALLOCATION_RESERVED_ON_HEAP_SIZE); this.reservedOffHeapPreAllocatedBufferBytes = conf.getSizeAsBytes(ShuffleServerConf.SERVER_PRE_ALLOCATION_RESERVED_OFF_HEAP_SIZE); + this.testMode = conf.getBoolean(RSS_TEST_MODE_ENABLE); if (heapSize < this.reservedOnHeapPreAllocatedBufferBytes) { throw new IllegalArgumentException( "The reserved on-heap memory size for pre-allocated buffer " @@ -689,7 +693,13 @@ private void addPickedShuffle(String shuffleIdKey, Map> pic shuffleIdSet.add(shuffleId); } - private static long getPinnedDirectMemory() { + private long getPinnedDirectMemory() { + if (testMode) { + // In test mode, return the actual pinned direct memory immediately + long pinnedDirectMemory = NettyUtils.getNettyBufferAllocator().pinnedDirectMemory(); + return pinnedDirectMemory == -1L ? 0L : pinnedDirectMemory; + } + // To optimize performance, return a periodically retrieved pinned direct memory long pinnedDirectMemory = (long) ShuffleServerMetrics.gaugePinnedDirectMemorySize.get() == -1L ? 0L