Skip to content

Commit

Permalink
[apache#1501] fix(server): storage selection cache accidentally delet…
Browse files Browse the repository at this point in the history
…ed when clearing stage level data. (apache#1505)

### What changes were proposed in this pull request?

avoid storage selection cache accidentally deleted

### Why are the changes needed?

Fix: (apache#1501) 

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit tests
  • Loading branch information
dingshun3016 authored Feb 4, 2024
1 parent 18fa146 commit 576a925
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -309,12 +309,12 @@ private void cleanupStorageSelectionCache(PurgeEvent event) {
Function<String, Boolean> deleteConditionFunc = null;
String prefixKey = null;
if (event instanceof AppPurgeEvent) {
prefixKey = UnionKey.buildKey(event.getAppId());
prefixKey = UnionKey.buildKey(event.getAppId(), "");
deleteConditionFunc =
partitionUnionKey -> UnionKey.startsWith(partitionUnionKey, event.getAppId());
} else if (event instanceof ShufflePurgeEvent) {
int shuffleId = event.getShuffleIds().get(0);
prefixKey = UnionKey.buildKey(event.getAppId(), shuffleId);
prefixKey = UnionKey.buildKey(event.getAppId(), shuffleId, "");
deleteConditionFunc =
partitionUnionKey -> UnionKey.startsWith(partitionUnionKey, event.getAppId(), shuffleId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
Expand Down Expand Up @@ -454,7 +455,7 @@ public void clearTest() throws Exception {
@Test
public void clearLocalTest(@TempDir File tempDir) throws Exception {
final String appId1 = "clearLocalTest_appId1";
final String appId2 = "clearLocalTest_appId2";
final String appId2 = "clearLocalTest_appId12";
ShuffleServerConf serverConf = new ShuffleServerConf();
serverConf.set(
ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(tempDir.getAbsolutePath()));
Expand All @@ -470,14 +471,18 @@ public void clearLocalTest(@TempDir File tempDir) throws Exception {
manager.addToFlushQueue(event2);
ShuffleDataFlushEvent event3 = createShuffleDataFlushEvent(appId2, 2, 0, 1, null);
manager.addToFlushQueue(event3);
ShuffleDataFlushEvent event5 = createShuffleDataFlushEvent(appId2, 11, 0, 1, null);
manager.addToFlushQueue(event5);
assertEquals(storageManager.selectStorage(event1), storageManager.selectStorage(event2));
final AbstractStorage storage = (AbstractStorage) storageManager.selectStorage(event1);
waitForFlush(manager, appId1, 1, 5);
waitForFlush(manager, appId2, 1, 5);
waitForFlush(manager, appId2, 2, 5);
waitForFlush(manager, appId2, 11, 5);
assertEquals(5, manager.getCommittedBlockIds(appId1, 1).getLongCardinality());
assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
assertEquals(5, manager.getCommittedBlockIds(appId2, 2).getLongCardinality());
assertEquals(5, manager.getCommittedBlockIds(appId2, 11).getLongCardinality());
assertEquals(2, storage.getHandlerSize());
File file = new File(tempDir, appId1);
assertTrue(file.exists());
Expand All @@ -490,13 +495,23 @@ public void clearLocalTest(@TempDir File tempDir) throws Exception {
new AppPurgeEvent(appId1, StringUtils.EMPTY, Lists.newArrayList(1)));
manager.removeResources(appId1);
assertFalse(file.exists());

ShuffleDataReadEvent shuffleReadEvent = new ShuffleDataReadEvent(appId2, 1, 0, 0);
assertNotNull(storageManager.selectStorage(shuffleReadEvent));

assertEquals(0, manager.getCommittedBlockIds(appId1, 1).getLongCardinality());
assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
assertEquals(5, manager.getCommittedBlockIds(appId2, 2).getLongCardinality());
assertEquals(1, storage.getHandlerSize());
manager.removeResources(appId2);
storageManager.removeResources(
new ShufflePurgeEvent(appId2, StringUtils.EMPTY, Lists.newArrayList(1)));

ShuffleDataReadEvent shuffle1ReadEvent = new ShuffleDataReadEvent(appId2, 1, 0, 0);
ShuffleDataReadEvent shuffle11ReadEvent = new ShuffleDataReadEvent(appId2, 11, 0, 0);
assertNull(storageManager.selectStorage(shuffle1ReadEvent));
assertNotNull(storageManager.selectStorage(shuffle11ReadEvent));

storageManager.removeResources(
new ShufflePurgeEvent(appId2, StringUtils.EMPTY, Lists.newArrayList(2)));
storageManager.removeResources(
Expand Down

0 comments on commit 576a925

Please sign in to comment.