Skip to content

Commit

Permalink
[Snapshot Interop] Add Changes in Snapshot Status Flow for remote sto… (
Browse files Browse the repository at this point in the history
#7495) (#8612)

(cherry picked from commit a5752cb)

Signed-off-by: Bansi Kasundra <kasundra@amazon.com>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
opensearch-trigger-bot[bot] authored Jul 11, 2023
1 parent d8dc158 commit b40d956
Show file tree
Hide file tree
Showing 9 changed files with 283 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,20 @@ public URL url() {
}

/**
* This operation is not supported by URLBlobContainer
* Tests whether a blob with the given blob name exists in the container.
*
* @param blobName
* The name of the blob whose existence is to be determined.
* @return {@code true} if a blob exists in the {@link BlobContainer} with the given name, and {@code false} otherwise.
*/
@Override
public boolean blobExists(String blobName) {
assert false : "should never be called for a read-only url repo";
throw new UnsupportedOperationException("URL repository doesn't support this operation");
public boolean blobExists(String blobName) throws IOException {
try {
readBlob(blobName);
return true;
} catch (FileNotFoundException e) {
return false;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.snapshots.blobstore.IndexShardSnapshot;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.RepositoryData;
import org.opensearch.snapshots.mockstore.MockRepository;
Expand Down Expand Up @@ -850,18 +851,14 @@ private static BlobStoreIndexShardSnapshot readShardSnapshot(
RepositoryShardId repositoryShardId,
SnapshotId snapshotId
) {
return PlainActionFuture.get(
f -> repository.threadPool()
.generic()
.execute(
ActionRunnable.supply(
f,
() -> repository.loadShardSnapshot(
repository.shardContainer(repositoryShardId.index(), repositoryShardId.shardId()),
snapshotId
)
)
)
);
return PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.supply(f, () -> {
IndexShardSnapshot indexShardSnapshot = repository.loadShardSnapshot(
repository.shardContainer(repositoryShardId.index(), repositoryShardId.shardId()),
snapshotId
);
assert indexShardSnapshot instanceof BlobStoreIndexShardSnapshot
: "indexShardSnapshot should be an instance of BlobStoreIndexShardSnapshot";
return (BlobStoreIndexShardSnapshot) indexShardSnapshot;
})));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.common.Strings;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
Expand Down Expand Up @@ -76,6 +77,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.getKey(), 0) // We have tests that check by-timestamp order
.put(FeatureFlags.REMOTE_STORE, "true")
.build();
}

Expand Down Expand Up @@ -110,6 +112,61 @@ public void testStatusApiConsistency() {
assertEquals(snStatus.getStats().getTime(), snapshotInfo.endTime() - snapshotInfo.startTime());
}

public void testStatusAPICallForShallowCopySnapshot() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used for the test");
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();

final String snapshotRepoName = "snapshot-repo-name";
createRepository(snapshotRepoName, "fs", snapshotRepoSettingsForShallowCopy());

final Path remoteStoreRepoPath = randomRepoPath();
final String remoteStoreRepoName = "remote-store-repo-name";
createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath);

final String indexName = "index-1";
createIndex(indexName);
ensureGreen();
logger.info("--> indexing some data");
for (int i = 0; i < 100; i++) {
index(indexName, "_doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();

final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName);
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
ensureGreen();

logger.info("--> indexing some data");
for (int i = 0; i < 100; i++) {
index(remoteStoreEnabledIndexName, "_doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();

final String snapshot = "snapshot";
createFullSnapshot(snapshotRepoName, snapshot);
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 1);

final SnapshotStatus snapshotStatus = getSnapshotStatus(snapshotRepoName, snapshot);
assertThat(snapshotStatus.getState(), is(SnapshotsInProgress.State.SUCCESS));

final SnapshotIndexShardStatus snapshotShardState = stateFirstShard(snapshotStatus, indexName);
assertThat(snapshotShardState.getStage(), is(SnapshotIndexShardStage.DONE));
assertThat(snapshotShardState.getStats().getTotalFileCount(), greaterThan(0));
assertThat(snapshotShardState.getStats().getTotalSize(), greaterThan(0L));
assertThat(snapshotShardState.getStats().getIncrementalFileCount(), greaterThan(0));
assertThat(snapshotShardState.getStats().getIncrementalSize(), greaterThan(0L));

// Validating that the incremental file count and incremental file size is zero for shallow copy
final SnapshotIndexShardStatus shallowSnapshotShardState = stateFirstShard(snapshotStatus, remoteStoreEnabledIndexName);
assertThat(shallowSnapshotShardState.getStage(), is(SnapshotIndexShardStage.DONE));
assertThat(shallowSnapshotShardState.getStats().getTotalFileCount(), greaterThan(0));
assertThat(shallowSnapshotShardState.getStats().getTotalSize(), greaterThan(0L));
assertThat(shallowSnapshotShardState.getStats().getIncrementalFileCount(), is(0));
assertThat(shallowSnapshotShardState.getStats().getIncrementalSize(), is(0L));
}

public void testStatusAPICallInProgressSnapshot() throws Exception {
createRepository("test-repo", "mock", Settings.builder().put("location", randomRepoPath()).put("block_on_data", true));

Expand Down Expand Up @@ -188,6 +245,63 @@ public void testExceptionOnMissingShardLevelSnapBlob() throws IOException {
);
}

public void testStatusAPIStatsForBackToBackShallowSnapshot() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used for the test");
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();

final String snapshotRepoName = "snapshot-repo-name";
createRepository(snapshotRepoName, "fs", snapshotRepoSettingsForShallowCopy());

final Path remoteStoreRepoPath = randomRepoPath();
final String remoteStoreRepoName = "remote-store-repo-name";
createRepository(remoteStoreRepoName, "fs", remoteStoreRepoPath);

final String indexName = "index-1";
createIndex(indexName);
ensureGreen();
logger.info("--> indexing some data");
for (int i = 0; i < 100; i++) {
index(indexName, "_doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();

final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName);
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
ensureGreen();

logger.info("--> indexing some data");
for (int i = 0; i < 100; i++) {
index(remoteStoreEnabledIndexName, "_doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();

createFullSnapshot(snapshotRepoName, "test-snap-1");
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 1);

SnapshotStatus snapshotStatus = getSnapshotStatus(snapshotRepoName, "test-snap-1");
assertThat(snapshotStatus.getState(), is(SnapshotsInProgress.State.SUCCESS));

SnapshotIndexShardStatus shallowSnapshotShardState = stateFirstShard(snapshotStatus, remoteStoreEnabledIndexName);
assertThat(shallowSnapshotShardState.getStage(), is(SnapshotIndexShardStage.DONE));
final int totalFileCount = shallowSnapshotShardState.getStats().getTotalFileCount();
final long totalSize = shallowSnapshotShardState.getStats().getTotalSize();
final int incrementalFileCount = shallowSnapshotShardState.getStats().getIncrementalFileCount();
final long incrementalSize = shallowSnapshotShardState.getStats().getIncrementalSize();

createFullSnapshot(snapshotRepoName, "test-snap-2");
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, remoteStoreRepoName).length == 2);

snapshotStatus = getSnapshotStatus(snapshotRepoName, "test-snap-2");
assertThat(snapshotStatus.getState(), is(SnapshotsInProgress.State.SUCCESS));
shallowSnapshotShardState = stateFirstShard(snapshotStatus, remoteStoreEnabledIndexName);
assertThat(shallowSnapshotShardState.getStats().getTotalFileCount(), equalTo(totalFileCount));
assertThat(shallowSnapshotShardState.getStats().getTotalSize(), equalTo(totalSize));
assertThat(shallowSnapshotShardState.getStats().getIncrementalFileCount(), equalTo(incrementalFileCount));
assertThat(shallowSnapshotShardState.getStats().getIncrementalSize(), equalTo(incrementalSize));
}

public void testGetSnapshotsWithoutIndices() throws Exception {
createRepository("test-repo", "fs");

Expand Down Expand Up @@ -326,6 +440,63 @@ public void testSnapshotStatusOnFailedSnapshot() throws Exception {
assertEquals(SnapshotsInProgress.State.FAILED, snapshotsStatusResponse.getSnapshots().get(0).getState());
}

public void testStatusAPICallInProgressShallowSnapshot() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used for the test");
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();

final String snapshotRepoName = "snapshot-repo-name";
createRepository(snapshotRepoName, "mock", snapshotRepoSettingsForShallowCopy().put("block_on_data", true));

final Path remoteStoreRepoPath = randomRepoPath();
final String remoteStoreRepoName = "remote-store-repo-name";
createRepository(remoteStoreRepoName, "mock", remoteStoreRepoPath);

final String indexName = "index-1";
createIndex(indexName);
ensureGreen();
logger.info("--> indexing some data");
for (int i = 0; i < 100; i++) {
index(indexName, "_doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();

final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepoName);
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
ensureGreen();

logger.info("--> indexing some data");
for (int i = 0; i < 100; i++) {
index(remoteStoreEnabledIndexName, "_doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();

logger.info("--> snapshot");
ActionFuture<CreateSnapshotResponse> createSnapshotResponseActionFuture = startFullSnapshot(snapshotRepoName, "test-snap");

logger.info("--> wait for data nodes to get blocked");
waitForBlockOnAnyDataNode(snapshotRepoName, TimeValue.timeValueMinutes(1));
awaitNumberOfSnapshotsInProgress(1);
assertEquals(
SnapshotsInProgress.State.STARTED,
client().admin()
.cluster()
.prepareSnapshotStatus(snapshotRepoName)
.setSnapshots("test-snap")
.get()
.getSnapshots()
.get(0)
.getState()
);

logger.info("--> unblock all data nodes");
unblockAllDataNodes(snapshotRepoName);

logger.info("--> wait for snapshot to finish");
createSnapshotResponseActionFuture.actionGet();
}

public void testGetSnapshotsRequest() throws Exception {
final String repositoryName = "test-repo";
final String indexName = "test-idx";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.common.xcontent.XContentParserUtils;
import org.opensearch.index.snapshots.IndexShardSnapshotStatus;
import org.opensearch.index.store.StoreFileMetadata;

import java.io.IOException;
Expand All @@ -57,7 +58,7 @@
*
* @opensearch.internal
*/
public class BlobStoreIndexShardSnapshot implements ToXContentFragment {
public class BlobStoreIndexShardSnapshot implements ToXContentFragment, IndexShardSnapshot {

/**
* Information about snapshotted file
Expand Down Expand Up @@ -592,4 +593,17 @@ public static BlobStoreIndexShardSnapshot fromXContent(XContentParser parser) th
incrementalSize
);
}

@Override
public IndexShardSnapshotStatus getIndexShardSnapshotStatus() {
return IndexShardSnapshotStatus.newDone(
startTime,
time,
incrementalFileCount,
totalFileCount(),
incrementalSize,
totalSize(),
null
); // Not adding a real generation here as it doesn't matter to callers
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.snapshots.blobstore;

import org.opensearch.index.snapshots.IndexShardSnapshotStatus;

/**
* Base interface for shard snapshot status
*
* @opensearch.internal
*/
@FunctionalInterface
public interface IndexShardSnapshot {
IndexShardSnapshotStatus getIndexShardSnapshotStatus();
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.snapshots.IndexShardSnapshotStatus;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -23,7 +24,7 @@
*
* @opensearch.internal
*/
public class RemoteStoreShardShallowCopySnapshot implements ToXContentFragment {
public class RemoteStoreShardShallowCopySnapshot implements ToXContentFragment, IndexShardSnapshot {

private final String snapshot;
private final String version;
Expand Down Expand Up @@ -433,4 +434,17 @@ public RemoteStoreShardShallowCopySnapshot asClone(String targetSnapshotName, lo
fileNames
);
}

@Override
public IndexShardSnapshotStatus getIndexShardSnapshotStatus() {
return IndexShardSnapshotStatus.newDone(
startTime,
time,
incrementalFileCount(),
totalFileCount,
incrementalSize(),
totalSize,
null
); // Not adding a real generation here as it doesn't matter to callers
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.opensearch.index.snapshots.blobstore.IndexShardSnapshot;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.utils.TransferManager;
import org.opensearch.plugins.IndexStorePlugin;
Expand Down Expand Up @@ -89,7 +90,10 @@ private Future<RemoteSnapshotDirectory> createRemoteSnapshotDirectoryFromSnapsho
// index restore is invoked
return threadPool.executor(ThreadPool.Names.SNAPSHOT).submit(() -> {
final BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(blobPath);
final BlobStoreIndexShardSnapshot snapshot = blobStoreRepository.loadShardSnapshot(blobContainer, snapshotId);
final IndexShardSnapshot indexShardSnapshot = blobStoreRepository.loadShardSnapshot(blobContainer, snapshotId);
assert indexShardSnapshot instanceof BlobStoreIndexShardSnapshot
: "indexShardSnapshot should be an instance of BlobStoreIndexShardSnapshot";
final BlobStoreIndexShardSnapshot snapshot = (BlobStoreIndexShardSnapshot) indexShardSnapshot;
TransferManager transferManager = new TransferManager(blobContainer, remoteStoreFileCache);
return new RemoteSnapshotDirectory(snapshot, localStoreDir, transferManager);
});
Expand Down
Loading

0 comments on commit b40d956

Please sign in to comment.