diff --git a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java index 229b309a12..772aae1d00 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java @@ -309,12 +309,12 @@ private void cleanupStorageSelectionCache(PurgeEvent event) { Function deleteConditionFunc = null; String prefixKey = null; if (event instanceof AppPurgeEvent) { - prefixKey = UnionKey.buildKey(event.getAppId()); + prefixKey = UnionKey.buildKey(event.getAppId(), ""); deleteConditionFunc = partitionUnionKey -> UnionKey.startsWith(partitionUnionKey, event.getAppId()); } else if (event instanceof ShufflePurgeEvent) { int shuffleId = event.getShuffleIds().get(0); - prefixKey = UnionKey.buildKey(event.getAppId(), shuffleId); + prefixKey = UnionKey.buildKey(event.getAppId(), shuffleId, ""); deleteConditionFunc = partitionUnionKey -> UnionKey.startsWith(partitionUnionKey, event.getAppId(), shuffleId); } diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java index f2b0b3048d..c170870737 100644 --- a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java @@ -72,6 +72,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -454,7 +455,7 @@ public void clearTest() throws Exception { @Test public void clearLocalTest(@TempDir File tempDir) throws Exception { final String appId1 = "clearLocalTest_appId1"; - final String appId2 = "clearLocalTest_appId2"; + final String appId2 = "clearLocalTest_appId12"; ShuffleServerConf serverConf = new ShuffleServerConf(); serverConf.set( ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(tempDir.getAbsolutePath())); @@ -470,14 +471,18 @@ public void clearLocalTest(@TempDir File tempDir) throws Exception { manager.addToFlushQueue(event2); ShuffleDataFlushEvent event3 = createShuffleDataFlushEvent(appId2, 2, 0, 1, null); manager.addToFlushQueue(event3); + ShuffleDataFlushEvent event5 = createShuffleDataFlushEvent(appId2, 11, 0, 1, null); + manager.addToFlushQueue(event5); assertEquals(storageManager.selectStorage(event1), storageManager.selectStorage(event2)); final AbstractStorage storage = (AbstractStorage) storageManager.selectStorage(event1); waitForFlush(manager, appId1, 1, 5); waitForFlush(manager, appId2, 1, 5); waitForFlush(manager, appId2, 2, 5); + waitForFlush(manager, appId2, 11, 5); assertEquals(5, manager.getCommittedBlockIds(appId1, 1).getLongCardinality()); assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality()); assertEquals(5, manager.getCommittedBlockIds(appId2, 2).getLongCardinality()); + assertEquals(5, manager.getCommittedBlockIds(appId2, 11).getLongCardinality()); assertEquals(2, storage.getHandlerSize()); File file = new File(tempDir, appId1); assertTrue(file.exists()); @@ -490,6 +495,10 @@ public void clearLocalTest(@TempDir File tempDir) throws Exception { new AppPurgeEvent(appId1, StringUtils.EMPTY, Lists.newArrayList(1))); manager.removeResources(appId1); assertFalse(file.exists()); + + ShuffleDataReadEvent shuffleReadEvent = new ShuffleDataReadEvent(appId2, 1, 0, 0); + assertNotNull(storageManager.selectStorage(shuffleReadEvent)); + assertEquals(0, manager.getCommittedBlockIds(appId1, 1).getLongCardinality()); assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality()); assertEquals(5, manager.getCommittedBlockIds(appId2, 2).getLongCardinality()); @@ -497,6 +506,12 @@ public void clearLocalTest(@TempDir File tempDir) throws Exception { manager.removeResources(appId2); storageManager.removeResources( new ShufflePurgeEvent(appId2, StringUtils.EMPTY, Lists.newArrayList(1))); + + ShuffleDataReadEvent shuffle1ReadEvent = new ShuffleDataReadEvent(appId2, 1, 0, 0); + ShuffleDataReadEvent shuffle11ReadEvent = new ShuffleDataReadEvent(appId2, 11, 0, 0); + assertNull(storageManager.selectStorage(shuffle1ReadEvent)); + assertNotNull(storageManager.selectStorage(shuffle11ReadEvent)); + storageManager.removeResources( new ShufflePurgeEvent(appId2, StringUtils.EMPTY, Lists.newArrayList(2))); storageManager.removeResources(