From acdabb8ac781a04139f69aa8b8cdd2bbc2614631 Mon Sep 17 00:00:00 2001 From: vikasvb90 Date: Wed, 9 Oct 2024 21:07:09 +0530 Subject: [PATCH] Remote Store refresh/commit fixes --- .../repositories/azure/AzureBlobStore.java | 2 +- .../gcs/GoogleCloudStorageBlobStore.java | 2 +- .../repositories/hdfs/HdfsBlobContainer.java | 2 +- .../repositories/s3/S3BlobContainer.java | 4 ++-- .../shardsplit/InPlaceShardSplitIT.java | 24 ++++++++++++++----- .../routing/RoutingChangesObserver.java | 21 ++++++++++++++++ .../allocation/IndexMetadataUpdater.java | 10 ++------ .../RoutingNodesChangedObserver.java | 6 +++++ .../InPlaceShardSplitAllocationDecider.java | 2 +- .../common/blobstore/BlobContainer.java | 3 ++- .../common/blobstore/BlobMetadata.java | 5 ++++ .../blobstore/EncryptedBlobMetadata.java | 5 ++++ .../common/blobstore/fs/FsBlobContainer.java | 2 +- .../blobstore/support/PlainBlobMetadata.java | 10 +++++++- .../index/engine/InternalEngine.java | 2 -- .../shard/RemoteStoreRefreshListener.java | 20 ++++++++++++---- .../index/store/RemoteDirectory.java | 6 ++++- .../store/RemoteSegmentStoreDirectory.java | 6 +++-- ...nPlaceShardSplitRecoverySourceHandler.java | 6 ++--- .../RemoteClusterStateServiceTests.java | 10 ++++---- .../index/remote/RemoteStoreUtilsTests.java | 10 ++++---- .../index/store/RemoteDirectoryTests.java | 12 +++++----- .../TranslogTransferManagerTests.java | 16 ++++++------- .../MockEventuallyConsistentRepository.java | 2 +- .../AbstractThirdPartyRepositoryTestCase.java | 2 +- 25 files changed, 129 insertions(+), 61 deletions(-) diff --git a/plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureBlobStore.java b/plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureBlobStore.java index e76a6bdd16764..9be5aa8004227 100644 --- a/plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureBlobStore.java +++ b/plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureBlobStore.java @@ -311,7 +311,7 @@ public Map listBlobsByPrefix(String keyPath, String prefix final BlobItemProperties properties = blobItem.getProperties(); logger.trace(() -> new ParameterizedMessage("blob name [{}], size [{}]", name, properties.getContentLength())); - blobsBuilder.put(name, new PlainBlobMetadata(name, properties.getContentLength())); + blobsBuilder.put(name, new PlainBlobMetadata(name, properties.getContentLength(), properties.getLastModified().toInstant().toEpochMilli())); } }); diff --git a/plugins/repository-gcs/src/main/java/org/opensearch/repositories/gcs/GoogleCloudStorageBlobStore.java b/plugins/repository-gcs/src/main/java/org/opensearch/repositories/gcs/GoogleCloudStorageBlobStore.java index f5c20003ea7b6..100b62248e327 100644 --- a/plugins/repository-gcs/src/main/java/org/opensearch/repositories/gcs/GoogleCloudStorageBlobStore.java +++ b/plugins/repository-gcs/src/main/java/org/opensearch/repositories/gcs/GoogleCloudStorageBlobStore.java @@ -172,7 +172,7 @@ Map listBlobsByPrefix(String path, String prefix) throws I assert blob.getName().startsWith(path); if (blob.isDirectory() == false) { final String suffixName = blob.getName().substring(path.length()); - mapBuilder.put(suffixName, new PlainBlobMetadata(suffixName, blob.getSize())); + mapBuilder.put(suffixName, new PlainBlobMetadata(suffixName, blob.getSize(), blob.getUpdateTime())); } }) ); diff --git a/plugins/repository-hdfs/src/main/java/org/opensearch/repositories/hdfs/HdfsBlobContainer.java b/plugins/repository-hdfs/src/main/java/org/opensearch/repositories/hdfs/HdfsBlobContainer.java index 669190f4e2490..cbae9a0995bf1 100644 --- a/plugins/repository-hdfs/src/main/java/org/opensearch/repositories/hdfs/HdfsBlobContainer.java +++ b/plugins/repository-hdfs/src/main/java/org/opensearch/repositories/hdfs/HdfsBlobContainer.java @@ -203,7 +203,7 @@ public Map listBlobsByPrefix(@Nullable final String prefix Map map = new LinkedHashMap<>(); for (FileStatus file : files) { if (file.isFile()) { - map.put(file.getPath().getName(), new PlainBlobMetadata(file.getPath().getName(), file.getLen())); + map.put(file.getPath().getName(), new PlainBlobMetadata(file.getPath().getName(), file.getLen(), file.getModificationTime())); } } return Collections.unmodifiableMap(map); diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index e42b584fc3bdb..128556db5afbd 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -523,7 +523,7 @@ public List listBlobsByPrefixInSortedOrder(String blobNamePrefix, try (AmazonS3Reference clientReference = blobStore.clientReference()) { List blobs = executeListing(clientReference, listObjectsRequest(prefix, limit), limit).stream() .flatMap(listing -> listing.contents().stream()) - .map(s3Object -> new PlainBlobMetadata(s3Object.key().substring(keyPath.length()), s3Object.size())) + .map(s3Object -> new PlainBlobMetadata(s3Object.key().substring(keyPath.length()), s3Object.size(), s3Object.lastModified().toEpochMilli())) .collect(Collectors.toList()); return blobs.subList(0, Math.min(limit, blobs.size())); } catch (final Exception e) { @@ -538,7 +538,7 @@ public Map listBlobsByPrefix(@Nullable String blobNamePref try (AmazonS3Reference clientReference = blobStore.clientReference()) { return executeListing(clientReference, listObjectsRequest(prefix, blobStore)).stream() .flatMap(listing -> listing.contents().stream()) - .map(s3Object -> new PlainBlobMetadata(s3Object.key().substring(keyPath.length()), s3Object.size())) + .map(s3Object -> new PlainBlobMetadata(s3Object.key().substring(keyPath.length()), s3Object.size(), s3Object.lastModified().toEpochMilli())) .collect(Collectors.toMap(PlainBlobMetadata::name, Function.identity())); } catch (final SdkException e) { throw new IOException("Exception when listing blobs by prefix [" + prefix + "]", e); diff --git a/server/src/internalClusterTest/java/org/opensearch/shardsplit/InPlaceShardSplitIT.java b/server/src/internalClusterTest/java/org/opensearch/shardsplit/InPlaceShardSplitIT.java index b7b136a40af92..d582a3f79eac9 100644 --- a/server/src/internalClusterTest/java/org/opensearch/shardsplit/InPlaceShardSplitIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/shardsplit/InPlaceShardSplitIT.java @@ -8,16 +8,21 @@ package org.opensearch.shardsplit; +import org.opensearch.OpenSearchParseException; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.indices.split.InPlaceShardSplitRequest; import org.opensearch.action.admin.indices.split.InPlaceShardSplitResponse; import org.opensearch.action.admin.indices.stats.ShardStats; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.health.ClusterHealthStatus; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.rest.RestStatus; import org.opensearch.index.mapper.MapperService; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.cat.RestClusterManagerAction; import org.opensearch.search.SearchHits; import org.opensearch.test.BackgroundIndexer; import org.opensearch.test.OpenSearchIntegTestCase; @@ -26,7 +31,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; -import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.*; import static org.opensearch.index.query.QueryBuilders.matchAllQuery; import static org.opensearch.index.query.QueryBuilders.matchQuery; import static org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; @@ -62,6 +67,11 @@ private void waitForSplit(int numberOfSplits, Set childShardIds, int pa assertEquals(numberOfSplits, startedChildShards); }, maxWaitTimeMs, TimeUnit.MILLISECONDS); + assertClusterHealth(); + logger.info("Shard split completed"); + } + + private void assertClusterHealth() { ClusterHealthResponse clusterHealthResponse = client().admin() .cluster() .prepareHealth() @@ -70,12 +80,14 @@ private void waitForSplit(int numberOfSplits, Set childShardIds, int pa .setTimeout(ACCEPTABLE_RELOCATION_TIME) .execute() .actionGet(); + assertThat(clusterHealthResponse, notNullValue()); assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); - System.out.println("Shard split completed"); + assertThat(clusterHealthResponse.status(), equalTo(RestStatus.OK)); + assertThat(clusterHealthResponse.getStatus(), equalTo(ClusterHealthStatus.GREEN)); } private void verifyAfterSplit(long totalIndexedDocs, Set ids, int parentShardId, Set childShardIds) throws InterruptedException { - ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + ClusterState clusterState = internalCluster().clusterManagerClient().admin().cluster().prepareState().get().getState(); IndexMetadata indexMetadata = clusterState.metadata().index("test"); assertTrue(indexMetadata.isParentShard(parentShardId)); assertEquals(childShardIds, new HashSet<>(indexMetadata.getChildShardIds(parentShardId))); @@ -115,7 +127,7 @@ public void testShardSplit() throws Exception { .put("index.number_of_replicas", 0)).get(); ensureGreen(); int numDocs = scaledRandomIntBetween(200, 2500); - try (BackgroundIndexer indexer = new BackgroundIndexer("test", MapperService.SINGLE_MAPPING_NAME, client(), numDocs)) { + try (BackgroundIndexer indexer = new BackgroundIndexer("test", MapperService.SINGLE_MAPPING_NAME, client(), numDocs, 4)) { logger.info("--> waiting for {} docs to be indexed ...", numDocs); waitForDocs(numDocs, indexer); logger.info("--> {} docs indexed", numDocs); @@ -141,7 +153,7 @@ public void testSplittingShardHavingNonEmptyCommit() throws Exception { .put("index.number_of_replicas", 0)).get(); ensureGreen(); int numDocs = scaledRandomIntBetween(200, 2500); - try (BackgroundIndexer indexer = new BackgroundIndexer("test", MapperService.SINGLE_MAPPING_NAME, client(), numDocs)) { + try (BackgroundIndexer indexer = new BackgroundIndexer("test", MapperService.SINGLE_MAPPING_NAME, client(), numDocs, 4)) { indexer.setIgnoreIndexingFailures(false); logger.info("--> waiting for {} docs to be indexed ...", numDocs); waitForDocs(numDocs, indexer); @@ -173,7 +185,7 @@ public void testSplittingShardWithNoTranslogReplay() throws Exception { .put("index.number_of_replicas", 0)).get(); ensureGreen(); int numDocs = scaledRandomIntBetween(200, 2500); - try (BackgroundIndexer indexer = new BackgroundIndexer("test", MapperService.SINGLE_MAPPING_NAME, client(), numDocs)) { + try (BackgroundIndexer indexer = new BackgroundIndexer("test", MapperService.SINGLE_MAPPING_NAME, client(), numDocs, 4)) { indexer.setIgnoreIndexingFailures(false); logger.info("--> waiting for {} docs to be indexed ...", numDocs); waitForDocs(numDocs, indexer); diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingChangesObserver.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingChangesObserver.java index 765eed9f54801..2ab52b9bf5040 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingChangesObserver.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingChangesObserver.java @@ -95,6 +95,11 @@ public interface RoutingChangesObserver { */ void splitFailed(ShardRouting splitSource, IndexMetadata indexMetadata); + /** + * Called to determine if shard split has failed in current cluster update. + */ + boolean isSplitOfShardFailed(ShardRouting parentShard); + /** * Called when started replica is promoted to primary. */ @@ -142,6 +147,11 @@ public void shardFailed(ShardRouting activeShard, UnassignedInfo unassignedInfo) } + @Override + public boolean isSplitOfShardFailed(ShardRouting parentShard) { + return false; + } + @Override public void relocationCompleted(ShardRouting removedRelocationSource) { @@ -214,6 +224,17 @@ public void splitStarted(ShardRouting startedShard, List childSpli } } + @Override + public boolean isSplitOfShardFailed(ShardRouting parentShard) { + for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) { + if (routingChangesObserver.isSplitOfShardFailed(parentShard)) { + return true; + } + } + + return false; + } + @Override public void unassignedInfoUpdated(ShardRouting unassignedShard, UnassignedInfo newUnassignedInfo) { for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) { diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java index 37f54d27ae4bb..9d1d9ed2567b5 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java @@ -118,14 +118,8 @@ public void shardFailed(ShardRouting failedShard, UnassignedInfo unassignedInfo) @Override public boolean isSplitOfShardFailed(ShardRouting parentShard) { - if (failedShard.active() && failedShard.primary() && failedShard.isSplitTarget() == false) { - Updates updates = changes(failedShard.shardId()); - if (updates.firstFailedPrimary == null) { - // more than one primary can be failed (because of batching, primary can be failed, replica promoted and then failed...) - updates.firstFailedPrimary = failedShard; - } - increasePrimaryTerm(failedShard.shardId()); - } + Updates updates = changes(parentShard.shardId()); + return updates.splitFailed; } @Override diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/RoutingNodesChangedObserver.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/RoutingNodesChangedObserver.java index c9d67dd7755e5..367d66a550174 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/RoutingNodesChangedObserver.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/RoutingNodesChangedObserver.java @@ -120,6 +120,12 @@ public void splitFailed(ShardRouting splitSource, IndexMetadata indexMetadata) { setChanged(); } + @Override + public boolean isSplitOfShardFailed(ShardRouting parentShard) { + // Nothing to do here since splitFailed would have already marked changes in case of split failure. + return false; + } + @Override public void replicaPromoted(ShardRouting replicaShard) { assert replicaShard.started() && replicaShard.primary() == false : "expected started replica shard " + replicaShard; diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/InPlaceShardSplitAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/InPlaceShardSplitAllocationDecider.java index 1d826aef9117d..d7299378d606a 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/InPlaceShardSplitAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/InPlaceShardSplitAllocationDecider.java @@ -25,7 +25,7 @@ public static Decision canRemainDecision(ShardRouting shardRouting, RoutingNode // If shardRouting is a started parent shard and fact that it exists is sufficient to conclude // that it needs to be split. if (allocation.metadata().getIndexSafe(shardRouting.index()).isParentShard(shardRouting.shardId().id()) - && shardRouting.started()) { + && shardRouting.started() && allocation.changes().isSplitOfShardFailed(shardRouting) == false) { return Decision.SPLIT; } return Decision.ALWAYS; diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java index 514e6f9355d09..db6b7c50e75c3 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java @@ -207,7 +207,8 @@ default long readBlobPreferredLength() { */ enum BlobNameSortOrder { - LEXICOGRAPHIC(Comparator.comparing(BlobMetadata::name)); + LEXICOGRAPHIC(Comparator.comparing(BlobMetadata::name)), + CHRONOLOGICAL(Comparator.comparing(BlobMetadata::lastModified)),; final Comparator comparator; diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobMetadata.java b/server/src/main/java/org/opensearch/common/blobstore/BlobMetadata.java index 37c70365b6a11..b594d322ac820 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobMetadata.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobMetadata.java @@ -48,4 +48,9 @@ public interface BlobMetadata { * Gets the size of the blob in bytes. */ long length(); + + /** + * Gets last modified time of blob in epoch millis. + */ + long lastModified(); } diff --git a/server/src/main/java/org/opensearch/common/blobstore/EncryptedBlobMetadata.java b/server/src/main/java/org/opensearch/common/blobstore/EncryptedBlobMetadata.java index 8917bba806d08..4abf1596184ea 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/EncryptedBlobMetadata.java +++ b/server/src/main/java/org/opensearch/common/blobstore/EncryptedBlobMetadata.java @@ -36,6 +36,11 @@ public String name() { return delegate.name(); } + @Override + public long lastModified() { + return delegate.lastModified(); + } + @Override public long length() { U cryptoContext; diff --git a/server/src/main/java/org/opensearch/common/blobstore/fs/FsBlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/fs/FsBlobContainer.java index db826b121c1b4..8dbe63dcecff4 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/fs/FsBlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/fs/FsBlobContainer.java @@ -123,7 +123,7 @@ public Map listBlobsByPrefix(String blobNamePrefix) throws continue; } if (attrs.isRegularFile()) { - builder.put(file.getFileName().toString(), new PlainBlobMetadata(file.getFileName().toString(), attrs.size())); + builder.put(file.getFileName().toString(), new PlainBlobMetadata(file.getFileName().toString(), attrs.size(), attrs.lastModifiedTime().toMillis())); } } } diff --git a/server/src/main/java/org/opensearch/common/blobstore/support/PlainBlobMetadata.java b/server/src/main/java/org/opensearch/common/blobstore/support/PlainBlobMetadata.java index 8d2aee199ea61..69aaa6365c05b 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/support/PlainBlobMetadata.java +++ b/server/src/main/java/org/opensearch/common/blobstore/support/PlainBlobMetadata.java @@ -45,9 +45,12 @@ public class PlainBlobMetadata implements BlobMetadata { private final long length; - public PlainBlobMetadata(String name, long length) { + private final long lastModified; + + public PlainBlobMetadata(String name, long length, long lastModified) { this.name = name; this.length = length; + this.lastModified = lastModified; } @Override @@ -60,6 +63,11 @@ public long length() { return this.length; } + @Override + public long lastModified() { + return this.lastModified; + } + @Override public String toString() { return "name [" + name + "], length [" + length + "]"; diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 75b79d16d21b0..4706e3610a25d 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -1899,8 +1899,6 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException { shouldPeriodicallyFlush ); - logger.info("Refresing in flush"); - Thread.sleep(2000); // we need to refresh in order to clear older version values refresh("version_table_flush", SearcherScope.INTERNAL, true); diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 6bdc21b17ebee..d699a2dc8cba3 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -48,6 +48,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY; @@ -89,6 +90,7 @@ public final class RemoteStoreRefreshListener extends ReleasableRetryableRefresh private final RemoteSegmentTransferTracker segmentTracker; private final Map localSegmentChecksumMap; private volatile long primaryTerm; + private volatile long commitGen = -1; private volatile Iterator backoffDelayIterator; private final SegmentReplicationCheckpointPublisher checkpointPublisher; private final AtomicBoolean staleCommitDeletionDelayed = new AtomicBoolean(); @@ -153,7 +155,7 @@ protected void runAfterRefreshExactlyOnce(boolean didRefresh) { protected boolean performAfterRefreshWithPermit(boolean didRefresh) { boolean successful; if (shouldSync(didRefresh, false)) { - successful = syncSegments(); + successful = syncSegments(false); } else { successful = true; } @@ -198,7 +200,7 @@ private boolean shouldSync(boolean didRefresh, boolean skipPrimaryTermCheck) { /* @return false if retry is needed */ - private boolean syncSegments() { + private boolean syncSegments(boolean firstSyncAfterCommit) { if (isReadyForUpload() == false) { // Following check is required to enable retry and make sure that we do not lose this refresh event // When primary shard is restored from remote store, the recovery happens first followed by changing @@ -223,8 +225,19 @@ private boolean syncSegments() { remoteDirectory.deleteStaleSegmentsAsync(indexShard.getRecoverySettings().getMinRemoteSegmentMetadataFiles()); } - try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { + try (GatedCloseable segmentInfosGatedCloseable = ( + firstSyncAfterCommit ? + new GatedCloseable<>(indexShard.store().readLastCommittedSegmentsInfo(), ()->{}) : + indexShard.getSegmentInfosSnapshot() + ) + ) { + indexShard.store().readLastCommittedSegmentsInfo(); SegmentInfos segmentInfos = segmentInfosGatedCloseable.get(); + + if (firstSyncAfterCommit == false && segmentInfos.getGeneration() != commitGen) { + syncSegments(true); + commitGen = segmentInfos.getGeneration(); + } final ReplicationCheckpoint checkpoint = indexShard.computeReplicationCheckpoint(segmentInfos); if (checkpoint.getPrimaryTerm() != indexShard.getOperationPrimaryTerm()) { throw new IllegalStateException( @@ -240,7 +253,6 @@ private boolean syncSegments() { // move. long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint(); Collection localSegmentsPostRefresh = segmentInfos.files(true); - // Create a map of file name to size and update the refresh segment tracker updateLocalSizeMapAndTracker(localSegmentsPostRefresh); CountDownLatch latch = new CountDownLatch(1); diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index 1233686c89bda..73d23f808210c 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -111,6 +111,10 @@ public Collection listFilesByPrefix(String filenamePrefix) throws IOExce } public List listFilesByPrefixInLexicographicOrder(String filenamePrefix, int limit) throws IOException { + return listFilesByPrefixInOrder(filenamePrefix, limit, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); + } + + public List listFilesByPrefixInOrder(String filenamePrefix, int limit, BlobContainer.BlobNameSortOrder order) throws IOException { List sortedBlobList = new ArrayList<>(); AtomicReference exception = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); @@ -130,7 +134,7 @@ public void onFailure(Exception e) { blobContainer.listBlobsByPrefixInSortedOrder( filenamePrefix, limit, - BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC, + order, actionListener ); latch.await(); diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index c4835ec40779a..22e0f9ac6809b 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -25,6 +25,7 @@ import org.apache.lucene.util.Version; import org.opensearch.common.UUIDs; import org.opensearch.common.annotation.PublicApi; +import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.collect.Tuple; import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.common.logging.Loggers; @@ -562,9 +563,10 @@ Boolean isLockAcquired(String metadataFile) throws IOException { // Visible for testing public String getMetadataFileForCommit(long primaryTerm, long generation) throws IOException { - List metadataFiles = remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( + List metadataFiles = remoteMetadataDirectory.listFilesByPrefixInOrder( MetadataFilenameUtils.getMetadataFilePrefixForCommit(primaryTerm, generation), - 1 + 1, + BlobContainer.BlobNameSortOrder.CHRONOLOGICAL ); if (metadataFiles.isEmpty()) { diff --git a/server/src/main/java/org/opensearch/indices/recovery/inplacesplit/InPlaceShardSplitRecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/inplacesplit/InPlaceShardSplitRecoverySourceHandler.java index 41ae6cfdd984f..b990e7a64613f 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/inplacesplit/InPlaceShardSplitRecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/inplacesplit/InPlaceShardSplitRecoverySourceHandler.java @@ -27,6 +27,7 @@ import org.opensearch.index.seqno.RetentionLeases; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; @@ -44,6 +45,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -125,7 +127,6 @@ private Consumer consumerForCleanupOnFailure(Consumer onFa protected void innerRecoveryToTarget(ActionListener listener, Consumer onFailure) throws IOException { // onFailure = consumerForCleanupOnFailure(onFailure); // Clean up shard directories if previous shard closures failed. - logger.info("Starting split"); cleanupChildShardDirectories(); List delayedStaleCommitDeleteOps = sourceShard.delayStaleCommitDeletions(); @@ -205,9 +206,8 @@ protected void innerRecoveryToTarget(ActionListener listener, } private void ensureMetadataHasAllSegmentsFromCommit(IndexCommit indexCommit, RemoteSegmentMetadata metadata) throws IOException { - Collection files = indexCommit.getFileNames(); List missingFiles = new ArrayList<>(); - for (String file : files) { + for (String file : indexCommit.getFileNames()) { if (metadata.getMetadata().containsKey(file) == false) { missingFiles.add(file); } diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 65477051cdb30..f44406d9cd48d 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -674,7 +674,7 @@ public void testReadLatestMetadataManifestFailedManifestFileRemoveAfterFetchInRe final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); BlobContainer blobContainer = mockBlobStoreObjects(); - BlobMetadata blobMetadata = new PlainBlobMetadata("manifestFileName", 1); + BlobMetadata blobMetadata = new PlainBlobMetadata("manifestFileName", 1, System.currentTimeMillis()); when(blobContainer.listBlobsByPrefixInSortedOrder("manifest" + DELIMITER, 1, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC)) .thenReturn(Arrays.asList(blobMetadata)); when(blobContainer.readBlob("manifestFileName")).thenThrow(FileNotFoundException.class); @@ -1054,14 +1054,14 @@ public void testDeleteStaleClusterUUIDs() throws IOException { Integer.MAX_VALUE, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC ) - ).thenReturn(List.of(new PlainBlobMetadata("mainfest2", 1L))); + ).thenReturn(List.of(new PlainBlobMetadata("mainfest2", 1L, System.currentTimeMillis()))); when( manifest3Container.listBlobsByPrefixInSortedOrder( MANIFEST_FILE_PREFIX + DELIMITER, Integer.MAX_VALUE, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC ) - ).thenReturn(List.of(new PlainBlobMetadata("mainfest3", 1L))); + ).thenReturn(List.of(new PlainBlobMetadata("mainfest3", 1L, System.currentTimeMillis()))); remoteClusterStateService.start(); remoteClusterStateService.deleteStaleClusterUUIDs(clusterState, clusterMetadataManifest); try { @@ -1421,7 +1421,7 @@ private void mockBlobContainer( String manifestFileName = codecVersion >= ClusterMetadataManifest.CODEC_V1 ? "manifest__manifestFileName__abcd__abcd__abcd__1" : "manifestFileName"; - BlobMetadata blobMetadata = new PlainBlobMetadata(manifestFileName, 1); + BlobMetadata blobMetadata = new PlainBlobMetadata(manifestFileName, 1, System.currentTimeMillis()); when(blobContainer.listBlobsByPrefixInSortedOrder("manifest" + DELIMITER, 1, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC)) .thenReturn(Arrays.asList(blobMetadata)); @@ -1461,7 +1461,7 @@ private void mockBlobContainerForGlobalMetadata( Metadata metadata ) throws IOException { String mockManifestFileName = "manifest__1__2__C__456__1"; - BlobMetadata blobMetadata = new PlainBlobMetadata(mockManifestFileName, 1); + BlobMetadata blobMetadata = new PlainBlobMetadata(mockManifestFileName, 1, System.currentTimeMillis()); when( blobContainer.listBlobsByPrefixInSortedOrder( "manifest" + RemoteClusterStateService.DELIMITER, diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java index d3c7d754d6b61..2996538c21fad 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java @@ -157,18 +157,18 @@ public void testVerifyMultipleWriters_Translog() throws InterruptedException { TranslogTransferMetadata tm2 = new TranslogTransferMetadata(1, 1, 1, 2, "node--1"); String mdFilename2 = tm2.getFileName(); List bmList = new LinkedList<>(); - bmList.add(new PlainBlobMetadata(mdFilename, 1)); - bmList.add(new PlainBlobMetadata(mdFilename2, 1)); - bmList.add(new PlainBlobMetadata(getOldTranslogMetadataFilename(1, 1, 1), 1)); + bmList.add(new PlainBlobMetadata(mdFilename, 1, System.currentTimeMillis())); + bmList.add(new PlainBlobMetadata(mdFilename2, 1, System.currentTimeMillis())); + bmList.add(new PlainBlobMetadata(getOldTranslogMetadataFilename(1, 1, 1), 1, System.currentTimeMillis())); RemoteStoreUtils.verifyNoMultipleWriters( bmList.stream().map(BlobMetadata::name).collect(Collectors.toList()), TranslogTransferMetadata::getNodeIdByPrimaryTermAndGen ); bmList = new LinkedList<>(); - bmList.add(new PlainBlobMetadata(mdFilename, 1)); + bmList.add(new PlainBlobMetadata(mdFilename, 1, System.currentTimeMillis())); TranslogTransferMetadata tm3 = new TranslogTransferMetadata(1, 1, 1, 2, "node--2"); - bmList.add(new PlainBlobMetadata(tm3.getFileName(), 1)); + bmList.add(new PlainBlobMetadata(tm3.getFileName(), 1, System.currentTimeMillis())); List finalBmList = bmList; assertThrows( IllegalStateException.class, diff --git a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java index 9e38e1749d434..d1742a83b973c 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java @@ -150,7 +150,7 @@ public void onFailure(Exception e) { public void testListAll() throws IOException { Map fileNames = Stream.of("abc", "xyz", "pqr", "lmn", "jkl") - .collect(Collectors.toMap(filename -> filename, filename -> new PlainBlobMetadata(filename, 100))); + .collect(Collectors.toMap(filename -> filename, filename -> new PlainBlobMetadata(filename, 100, System.currentTimeMillis()))); when(blobContainer.listBlobs()).thenReturn(fileNames); @@ -167,7 +167,7 @@ public void testListAllException() throws IOException { public void testListFilesByPrefix() throws IOException { Map fileNames = Stream.of("abc", "abd", "abe", "abf", "abg") - .collect(Collectors.toMap(filename -> filename, filename -> new PlainBlobMetadata(filename, 100))); + .collect(Collectors.toMap(filename -> filename, filename -> new PlainBlobMetadata(filename, 100, System.currentTimeMillis()))); when(blobContainer.listBlobsByPrefix("ab")).thenReturn(fileNames); @@ -206,7 +206,7 @@ public void testOpenInput() throws IOException { InputStream mockInputStream = mock(InputStream.class); when(blobContainer.readBlob("segment_1")).thenReturn(mockInputStream); - BlobMetadata blobMetadata = new PlainBlobMetadata("segment_1", 100); + BlobMetadata blobMetadata = new PlainBlobMetadata("segment_1", 100, System.currentTimeMillis()); when(blobContainer.listBlobsByPrefixInSortedOrder("segment_1", 1, LEXICOGRAPHIC)).thenReturn(List.of(blobMetadata)); @@ -220,7 +220,7 @@ public void testOpenInputWithLength() throws IOException { InputStream mockInputStream = mock(InputStream.class); when(blobContainer.readBlob("segment_1")).thenReturn(mockInputStream); - BlobMetadata blobMetadata = new PlainBlobMetadata("segment_1", 100); + BlobMetadata blobMetadata = new PlainBlobMetadata("segment_1", 100, System.currentTimeMillis()); when(blobContainer.listBlobsByPrefixInSortedOrder("segment_1", 1, LEXICOGRAPHIC)).thenReturn(List.of(blobMetadata)); @@ -245,7 +245,7 @@ public void testOpenInputNoSuchFileException() throws IOException { } public void testFileLength() throws IOException { - BlobMetadata blobMetadata = new PlainBlobMetadata("segment_1", 100); + BlobMetadata blobMetadata = new PlainBlobMetadata("segment_1", 100, System.currentTimeMillis()); when(blobContainer.listBlobsByPrefixInSortedOrder("segment_1", 1, LEXICOGRAPHIC)).thenReturn(List.of(blobMetadata)); assertEquals(100, remoteDirectory.fileLength("segment_1")); @@ -260,7 +260,7 @@ public void testFileLengthIOException() throws IOException { public void testListFilesByPrefixInLexicographicOrder() throws IOException { doAnswer(invocation -> { LatchedActionListener> latchedActionListener = invocation.getArgument(3); - latchedActionListener.onResponse(List.of(new PlainBlobMetadata("metadata_1", 1))); + latchedActionListener.onResponse(List.of(new PlainBlobMetadata("metadata_1", 1, System.currentTimeMillis()))); return null; }).when(blobContainer).listBlobsByPrefixInSortedOrder(eq("metadata"), eq(1), eq(LEXICOGRAPHIC), any(ActionListener.class)); diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index e34bc078896f9..cf337af134bec 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -366,8 +366,8 @@ public void testReadMetadataFile() throws IOException { doAnswer(invocation -> { LatchedActionListener> latchedActionListener = invocation.getArgument(3); List bmList = new LinkedList<>(); - bmList.add(new PlainBlobMetadata(mdFilename1, 1)); - bmList.add(new PlainBlobMetadata(mdFilename2, 1)); + bmList.add(new PlainBlobMetadata(mdFilename1, 1, System.currentTimeMillis())); + bmList.add(new PlainBlobMetadata(mdFilename2, 1, System.currentTimeMillis())); latchedActionListener.onResponse(bmList); return null; }).when(transferService) @@ -400,7 +400,7 @@ public void testReadMetadataReadException() throws IOException { doAnswer(invocation -> { LatchedActionListener> latchedActionListener = invocation.getArgument(3); List bmList = new LinkedList<>(); - bmList.add(new PlainBlobMetadata(mdFilename, 1)); + bmList.add(new PlainBlobMetadata(mdFilename, 1, System.currentTimeMillis())); latchedActionListener.onResponse(bmList); return null; }).when(transferService) @@ -528,9 +528,9 @@ public void testDeleteStaleTranslogMetadata() { doAnswer(invocation -> { ActionListener> actionListener = invocation.getArgument(4); List bmList = new LinkedList<>(); - bmList.add(new PlainBlobMetadata(tm1, 1)); - bmList.add(new PlainBlobMetadata(tm2, 1)); - bmList.add(new PlainBlobMetadata(tm3, 1)); + bmList.add(new PlainBlobMetadata(tm1, 1, System.currentTimeMillis())); + bmList.add(new PlainBlobMetadata(tm2, 1, System.currentTimeMillis())); + bmList.add(new PlainBlobMetadata(tm3, 1, System.currentTimeMillis())); actionListener.onResponse(bmList); return null; }).when(transferService) @@ -628,8 +628,8 @@ public void testMetadataConflict() throws InterruptedException { doAnswer(invocation -> { LatchedActionListener> latchedActionListener = invocation.getArgument(3); List bmList = new LinkedList<>(); - bmList.add(new PlainBlobMetadata(mdFilename, 1)); - bmList.add(new PlainBlobMetadata(mdFilename2, 1)); + bmList.add(new PlainBlobMetadata(mdFilename, 1, System.currentTimeMillis())); + bmList.add(new PlainBlobMetadata(mdFilename2, 1, System.currentTimeMillis())); latchedActionListener.onResponse(bmList); return null; }).when(transferService) diff --git a/server/src/test/java/org/opensearch/snapshots/mockstore/MockEventuallyConsistentRepository.java b/server/src/test/java/org/opensearch/snapshots/mockstore/MockEventuallyConsistentRepository.java index f9388c9e4b86e..91b3fe792f320 100644 --- a/server/src/test/java/org/opensearch/snapshots/mockstore/MockEventuallyConsistentRepository.java +++ b/server/src/test/java/org/opensearch/snapshots/mockstore/MockEventuallyConsistentRepository.java @@ -300,7 +300,7 @@ public Map listBlobs() { .collect( Collectors.toMap( action -> action.path.substring(thisPath.length()), - action -> new PlainBlobMetadata(action.path.substring(thisPath.length()), action.data.length) + action -> new PlainBlobMetadata(action.path.substring(thisPath.length()), action.data.length, System.currentTimeMillis()) ) ) ); diff --git a/test/framework/src/main/java/org/opensearch/repositories/AbstractThirdPartyRepositoryTestCase.java b/test/framework/src/main/java/org/opensearch/repositories/AbstractThirdPartyRepositoryTestCase.java index 5c28d9f83a4ee..8218b12976917 100644 --- a/test/framework/src/main/java/org/opensearch/repositories/AbstractThirdPartyRepositoryTestCase.java +++ b/test/framework/src/main/java/org/opensearch/repositories/AbstractThirdPartyRepositoryTestCase.java @@ -160,7 +160,7 @@ public void testListChildren() throws Exception { assertBlobsByPrefix( repo.basePath().add("foo"), "nest", - Collections.singletonMap("nested-blob", new PlainBlobMetadata("nested-blob", testBlobLen)) + Collections.singletonMap("nested-blob", new PlainBlobMetadata("nested-blob", testBlobLen, System.currentTimeMillis())) ); assertChildren(repo.basePath().add("foo").add("nested"), Collections.emptyList()); if (randomBoolean()) {