Skip to content

Commit

Permalink
Add more tests
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <sachinpkale@gmail.com>
  • Loading branch information
sachinpkale committed Sep 13, 2024
1 parent 1a26e5c commit afac97d
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,26 @@

package org.opensearch.remotestore;

import org.opensearch.action.LatchedActionListener;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.translog.transfer.TranslogTransferMetadata;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.junit.annotations.TestIssueLogging;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

import static org.opensearch.index.IndexSettings.INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING;
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA;
Expand All @@ -43,14 +44,6 @@ protected Settings nodeSettings(int nodeOrdinal) {
.build();
}

ActionListener<Void> noOpActionListener = new ActionListener<>() {
@Override
public void onResponse(Void unused) {}

@Override
public void onFailure(Exception e) {}
};

private void keepPinnedTimestampSchedulerUpdated() throws InterruptedException {
long currentTime = System.currentTimeMillis();
int maxRetry = 10;
Expand Down Expand Up @@ -88,15 +81,199 @@ public void testLiveIndexNoPinnedTimestamps() throws Exception {
String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings());
String shardDataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, DATA, translogPathFixedPrefix)
.buildAsString();
Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath);
Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1");
String shardMetadataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, METADATA, translogPathFixedPrefix)
.buildAsString();
Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath);

assertBusy(() -> {
List<Path> metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList());
assertEquals(1, metadataFiles.size());

verifyTranslogDataFileCount(metadataFiles, translogDataPath);
});
}

public void testLiveIndexNoPinnedTimestampsWithExtraGenSettingWithinLimit() throws Exception {
prepareCluster(1, 1, Settings.EMPTY);
Settings indexSettings = Settings.builder()
.put(remoteStoreIndexSettings(0, 1))
.put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 10)
.build();
createIndex(INDEX_NAME, indexSettings);
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
primaryNodeName(INDEX_NAME)
);

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));

int numDocs = randomIntBetween(5, 10);
for (int i = 0; i < numDocs; i++) {
keepPinnedTimestampSchedulerUpdated();
indexSingleDoc(INDEX_NAME, true);
}

String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings());
String shardDataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, DATA, translogPathFixedPrefix)
.buildAsString();
Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1");
String shardMetadataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, METADATA, translogPathFixedPrefix)
.buildAsString();
Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath);

assertBusy(() -> {
assertEquals(1, Files.list(translogDataPath).collect(Collectors.toList()).size());
assertEquals(1, Files.list(translogMetadataPath).collect(Collectors.toList()).size());
List<Path> metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList());
assertEquals(numDocs + 1, metadataFiles.size());

verifyTranslogDataFileCount(metadataFiles, translogDataPath);
});
}

public void testLiveIndexNoPinnedTimestampsWithExtraGenSetting() throws Exception {
prepareCluster(1, 1, Settings.EMPTY);
Settings indexSettings = Settings.builder()
.put(remoteStoreIndexSettings(0, 1))
.put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 4)
.build();
createIndex(INDEX_NAME, indexSettings);
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
primaryNodeName(INDEX_NAME)
);

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));

int numDocs = 5;
for (int i = 0; i < numDocs; i++) {
keepPinnedTimestampSchedulerUpdated();
indexSingleDoc(INDEX_NAME, true);
}

String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings());
String shardDataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, DATA, translogPathFixedPrefix)
.buildAsString();
Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1");
String shardMetadataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, METADATA, translogPathFixedPrefix)
.buildAsString();
Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath);

assertBusy(() -> {
List<Path> metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList());
assertEquals(3, metadataFiles.size());

verifyTranslogDataFileCount(metadataFiles, translogDataPath);
});
}

public void testIndexDeletionNoPinnedTimestamps() throws Exception {
prepareCluster(1, 1, Settings.EMPTY);
Settings indexSettings = Settings.builder()
.put(remoteStoreIndexSettings(0, 1))
.put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 0)
.build();
createIndex(INDEX_NAME, indexSettings);
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
primaryNodeName(INDEX_NAME)
);

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));

int numDocs = randomIntBetween(5, 10);
for (int i = 0; i < numDocs; i++) {
keepPinnedTimestampSchedulerUpdated();
indexSingleDoc(INDEX_NAME, true);
}

String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings());
String shardDataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, DATA, translogPathFixedPrefix)
.buildAsString();
Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1");
String shardMetadataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, METADATA, translogPathFixedPrefix)
.buildAsString();
Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath);

assertBusy(() -> {
List<Path> metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList());
assertEquals(1, metadataFiles.size());

verifyTranslogDataFileCount(metadataFiles, translogDataPath);
});

keepPinnedTimestampSchedulerUpdated();
client().admin().indices().prepareDelete(INDEX_NAME).get();

assertBusy(() -> {
assertEquals(0, Files.list(translogMetadataPath).collect(Collectors.toList()).size());
assertEquals(0, Files.list(translogDataPath).collect(Collectors.toList()).size());
}, 30, TimeUnit.SECONDS);
}

// public void testLiveIndexPinnedTimestamps() throws Exception {
// prepareCluster(1, 1, Settings.EMPTY);
// Settings indexSettings = Settings.builder()
// .put(remoteStoreIndexSettings(0, 1))
// .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 0)
// .build();
// createIndex(INDEX_NAME, indexSettings);
// ensureYellowAndNoInitializingShards(INDEX_NAME);
// ensureGreen(INDEX_NAME);
//
// RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);
//
// RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
// RemoteStorePinnedTimestampService.class,
// primaryNodeName(INDEX_NAME)
// );
//
// remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));
//
// int numDocs = randomIntBetween(5, 10);
// for (int i = 0; i < numDocs; i++) {
// keepPinnedTimestampSchedulerUpdated();
// indexSingleDoc(INDEX_NAME, true);
// }
//
// String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings());
// String shardDataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, DATA, translogPathFixedPrefix)
// .buildAsString();
// Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1");
// String shardMetadataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, METADATA, translogPathFixedPrefix)
// .buildAsString();
// Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath);
//
// assertBusy(() -> {
// List<Path> metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList());
// assertEquals(1, metadataFiles.size());
//
// verifyTranslogDataFileCount(metadataFiles, translogDataPath);
// });
// }

private void verifyTranslogDataFileCount(List<Path> metadataFiles, Path translogDataPath) throws IOException {
List<String> mdFiles = metadataFiles.stream().map(p -> p.getFileName().toString()).collect(Collectors.toList());
Set<Long> generations = new HashSet<>();
for(String mdFile : mdFiles) {
Tuple<Long, Long> minMaxGen = TranslogTransferMetadata.getMinMaxTranslogGenerationFromFilename(mdFile);
generations.addAll(LongStream.rangeClosed(minMaxGen.v1(), minMaxGen.v2()).boxed().collect(Collectors.toList()));
}
assertEquals(generations.size() * 2, Files.list(translogDataPath).collect(Collectors.toList()).size());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ protected void trimUnreferencedReaders(boolean indexDeleted, boolean trimLocal)

// This is to ensure that after the permits are acquired during primary relocation, there are no further modification on remote
// store.
if (startedPrimarySupplier.getAsBoolean() == false || pauseSync.get()) {
if ((indexDeleted == false && startedPrimarySupplier.getAsBoolean() == false) || pauseSync.get()) {
return;
}

Expand All @@ -146,26 +146,26 @@ protected void trimUnreferencedReaders(boolean indexDeleted, boolean trimLocal)
return;
}

// This code block ensures parity with RemoteFsTranslog. Without this, we will end up making list translog metadata
// call in each invocation of trimUnreferencedReaders
if (indexDeleted == false && (minRemoteGenReferenced - maxDeletedGenerationOnRemote) < indexSettings().getRemoteTranslogExtraKeep()) {
return;
}

// Since remote generation deletion is async, this ensures that only one generation deletion happens at a time.
// Remote generations involves 2 async operations - 1) Delete translog generation files 2) Delete metadata files
// We try to acquire 2 permits and if we can not, we return from here itself.
if (remoteGenerationDeletionPermits.tryAcquire(REMOTE_DELETION_PERMITS) == false) {
return;
}

// This code block ensures parity with RemoteFsTranslog. Without this, we will end up making list translog metadata
// call in each invocation of trimUnreferencedReaders
if (indexDeleted == false && (minRemoteGenReferenced - maxDeletedGenerationOnRemote) < indexSettings().getRemoteTranslogExtraKeep()) {
return;
}

ActionListener<List<BlobMetadata>> listMetadataFilesListener = new ActionListener<>() {
@Override
public void onResponse(List<BlobMetadata> blobMetadata) {
List<String> metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList());

try {
if (metadataFiles.size() <= 1) {
if (indexDeleted == false && metadataFiles.size() <= 1) {
logger.debug("No stale translog metadata files found");
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
return;
Expand Down Expand Up @@ -201,6 +201,7 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted);

logger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted);

Set<Long> generationsToBeDeleted = getGenerationsToBeDeleted(
metadataFilesNotToBeDeleted,
metadataFilesToBeDeleted
Expand Down

0 comments on commit afac97d

Please sign in to comment.