From b8cde1247b4d631bb1e8054ef705a102482b04a4 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Thu, 10 Aug 2023 16:11:29 -0700 Subject: [PATCH] [Segment Replication] Refactor file cleanup logic and fix PIT/Scroll with remote store. (#9111) * Remove divergent commit logic with segment replication. This change removes divergent commit paths for segrep node-node and remote store. All replicas with segrep enabled will perform local commits and ignore any incoming segments_n file. This changes the recovery sync with remote store to also exclude the segments_n so that only the fetched infos bytes are committed before an engine is opened. This change also updates deletion logic with segment replication to automatically delete when a file is decref'd to 0. Signed-off-by: Marc Handalian * Add more NRTReplicationEngineTests. Signed-off-by: Marc Handalian * Ensure old commit files are wiped on remote store sync before we commit a new segmentInfos. Signed-off-by: Marc Handalian * Add more shard level tests. Signed-off-by: Marc Handalian * Add test ensuring commits are cleaned up on replicas. Signed-off-by: Marc Handalian * Self review. Signed-off-by: Marc Handalian * Use refresh level sync before recovery Signed-off-by: Marc Handalian * PR feedback. Signed-off-by: Marc Handalian --------- Signed-off-by: Marc Handalian Signed-off-by: Shivansh Arora --- .../replication/SegmentReplicationIT.java | 29 ++- .../opensearch/remotestore/RemoteStoreIT.java | 8 +- .../index/engine/NRTReplicationEngine.java | 74 +++--- .../index/engine/ReplicaFileTracker.java | 92 ++++++++ .../opensearch/index/shard/IndexShard.java | 35 +-- .../store/RemoteSegmentStoreDirectory.java | 2 +- .../index/store/ReplicaFileTracker.java | 51 ---- .../org/opensearch/index/store/Store.java | 101 +------- .../recovery/PeerRecoveryTargetService.java | 2 +- .../RemoteStoreReplicationSource.java | 9 +- .../replication/SegmentReplicationTarget.java | 27 +-- .../engine/NRTReplicationEngineTests.java | 217 ++++++++++++++---- .../index/shard/RemoteIndexShardTests.java | 150 ++++++++++++ .../SegmentReplicationIndexShardTests.java | 28 +++ ...licationWithNodeToNodeIndexShardTests.java | 48 ++-- .../opensearch/index/store/StoreTests.java | 46 ---- ...enSearchIndexLevelReplicationTestCase.java | 4 +- .../index/shard/IndexShardTestCase.java | 16 +- 18 files changed, 593 insertions(+), 346 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/engine/ReplicaFileTracker.java delete mode 100644 server/src/main/java/org/opensearch/index/store/ReplicaFileTracker.java diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 66469241b83d8..b14c59869ccbc 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -282,7 +282,6 @@ public void testIndexReopenClose() throws Exception { } public void testScrollWithConcurrentIndexAndSearch() throws Exception { - assumeFalse("Skipping the test with Remote store as its flaky.", segmentReplicationWithRemoteEnabled()); final String primary = internalCluster().startDataOnlyNode(); final String replica = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME); @@ -657,7 +656,6 @@ public void testDeleteOperations() throws Exception { * from xlog. */ public void testReplicationPostDeleteAndForceMerge() throws Exception { - assumeFalse("Skipping the test with Remote store as its flaky.", segmentReplicationWithRemoteEnabled()); final String primary = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME); final String replica = internalCluster().startDataOnlyNode(); @@ -966,7 +964,6 @@ private void assertAllocationIdsInReplicaShardStats(Set expected, Set rs.getRecoverySource().getType() == RecoverySource.Type.PEER) .findFirst(); assertFalse(recoverySource.isEmpty()); - if (numberOfIterations == 1 && invokeFlush) { - // segments_N file is copied to new replica - assertEquals(1, recoverySource.get().getIndex().recoveredFileCount()); - } else { - assertEquals(0, recoverySource.get().getIndex().recoveredFileCount()); - } + // segments_N file is copied to new replica + assertEquals(1, recoverySource.get().getIndex().recoveredFileCount()); IndexResponse response = indexSingleDoc(INDEX_NAME); assertEquals(indexStats.get(MAX_SEQ_NO_TOTAL) + 1, response.getSeqNo()); diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 6b09b8d86dc6c..d545d9c1f6f45 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -57,9 +57,9 @@ public class NRTReplicationEngine extends Engine { private final CompletionStatsCache completionStatsCache; private final LocalCheckpointTracker localCheckpointTracker; private final WriteOnlyTranslogManager translogManager; - private final boolean shouldCommit; + protected final ReplicaFileTracker replicaFileTracker; - private volatile long lastReceivedGen = SequenceNumbers.NO_OPS_PERFORMED; + private volatile long lastReceivedPrimaryGen = SequenceNumbers.NO_OPS_PERFORMED; private static final int SI_COUNTER_INCREMENT = 10; @@ -69,7 +69,12 @@ public NRTReplicationEngine(EngineConfig engineConfig) { NRTReplicationReaderManager readerManager = null; WriteOnlyTranslogManager translogManagerRef = null; try { - lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); + this.replicaFileTracker = new ReplicaFileTracker(store::deleteQuiet); + this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); + // always protect latest commit on disk. + replicaFileTracker.incRef(this.lastCommittedSegmentInfos.files(true)); + // cleanup anything not referenced by the latest infos. + cleanUnreferencedFiles(); readerManager = buildReaderManager(); final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit( this.lastCommittedSegmentInfos.getUserData().entrySet() @@ -85,7 +90,7 @@ public NRTReplicationEngine(EngineConfig engineConfig) { for (ReferenceManager.RefreshListener listener : engineConfig.getInternalRefreshListener()) { this.readerManager.addListener(listener); } - final Map userData = store.readLastCommittedSegmentsInfo().getUserData(); + final Map userData = this.lastCommittedSegmentInfos.getUserData(); final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY)); translogManagerRef = new WriteOnlyTranslogManager( engineConfig.getTranslogConfig(), @@ -116,18 +121,21 @@ public void onAfterTranslogSync() { engineConfig.getPrimaryModeSupplier() ); this.translogManager = translogManagerRef; - this.shouldCommit = engineConfig.getIndexSettings().isRemoteStoreEnabled() == false; } catch (IOException e) { IOUtils.closeWhileHandlingException(store::decRef, readerManager, translogManagerRef); throw new EngineCreationFailureException(shardId, "failed to create engine", e); } } + public void cleanUnreferencedFiles() throws IOException { + replicaFileTracker.deleteUnreferencedFiles(store.directory().listAll()); + } + private NRTReplicationReaderManager buildReaderManager() throws IOException { return new NRTReplicationReaderManager( OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId), - store::incRefFileDeleter, - store::decRefFileDeleter + replicaFileTracker::incRef, + replicaFileTracker::decRef ); } @@ -143,15 +151,16 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep final long maxSeqNo = Long.parseLong(infos.userData.get(MAX_SEQ_NO)); final long incomingGeneration = infos.getGeneration(); readerManager.updateSegments(infos); - - // Commit and roll the translog when we receive a different generation than what was last received. - // lower/higher gens are possible from a new primary that was just elected. - if (incomingGeneration != lastReceivedGen) { + // Ensure that we commit and clear the local translog if a new commit has been made on the primary. + // We do not compare against the last local commit gen here because it is possible to receive + // a lower gen from a newly elected primary shard that is behind this shard's last commit gen. + // In that case we still commit into the next local generation. + if (incomingGeneration != this.lastReceivedPrimaryGen) { commitSegmentInfos(); translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(maxSeqNo); translogManager.rollTranslogGeneration(); } - lastReceivedGen = incomingGeneration; + this.lastReceivedPrimaryGen = incomingGeneration; localCheckpointTracker.fastForwardProcessedSeqNo(maxSeqNo); } } @@ -159,18 +168,19 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep /** * Persist the latest live SegmentInfos. * - * This method creates a commit point from the latest SegmentInfos. It is intended to be used when this shard is about to be promoted as the new primary. - * - * TODO: If this method is invoked while the engine is currently updating segments on its reader, wait for that update to complete so the updated segments are used. - * + * This method creates a commit point from the latest SegmentInfos. * * @throws IOException - When there is an IO error committing the SegmentInfos. */ private void commitSegmentInfos(SegmentInfos infos) throws IOException { - if (shouldCommit) { - store.commitSegmentInfos(infos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint()); - } + // get a reference to the previous commit files so they can be decref'd once a new commit is made. + final Collection previousCommitFiles = getLastCommittedSegmentInfos().files(true); + store.commitSegmentInfos(infos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint()); this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); + // incref the latest on-disk commit. + replicaFileTracker.incRef(this.lastCommittedSegmentInfos.files(true)); + // decref the prev commit. + replicaFileTracker.decRef(previousCommitFiles); translogManager.syncTranslog(); } @@ -379,21 +389,19 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) { assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread() : "Either the write lock must be held or the engine must be currently be failing itself"; try { - // if remote store is enabled, all segments durably persisted - if (shouldCommit) { - final SegmentInfos latestSegmentInfos = getLatestSegmentInfos(); - /* - This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied - from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is - used to generate new segment file names. The ideal solution is to identify the counter from previous primary. - */ + final SegmentInfos latestSegmentInfos = getLatestSegmentInfos(); + /* + This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied + from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is + used to generate new segment file names. The ideal solution is to identify the counter from previous primary. + This is not required for remote store implementations given on failover the replica re-syncs with the store + during promotion. + */ + if (engineConfig.getIndexSettings().isRemoteStoreEnabled() == false) { latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT; latestSegmentInfos.changed(); - commitSegmentInfos(latestSegmentInfos); - } else { - store.directory().sync(List.of(store.directory().listAll())); - store.directory().syncMetaData(); } + commitSegmentInfos(latestSegmentInfos); IOUtils.close(readerManager, translogManager, store::decRef); } catch (Exception e) { logger.warn("failed to close engine", e); @@ -453,8 +461,8 @@ public synchronized GatedCloseable getSegmentInfosSnapshot() { // incref all files try { final Collection files = latestSegmentInfos.files(false); - store.incRefFileDeleter(files); - return new GatedCloseable<>(latestSegmentInfos, () -> store.decRefFileDeleter(files)); + replicaFileTracker.incRef(files); + return new GatedCloseable<>(latestSegmentInfos, () -> { replicaFileTracker.decRef(files); }); } catch (IOException e) { throw new EngineException(shardId, e.getMessage(), e); } diff --git a/server/src/main/java/org/opensearch/index/engine/ReplicaFileTracker.java b/server/src/main/java/org/opensearch/index/engine/ReplicaFileTracker.java new file mode 100644 index 0000000000000..2e8bd6409c2f6 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/ReplicaFileTracker.java @@ -0,0 +1,92 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.BiConsumer; + +/** + * This class is heavily influenced by Lucene's ReplicaFileDeleter class used to keep track of + * segment files that should be preserved on replicas between replication events. + * + * https://github.com/apache/lucene/blob/main/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaFileDeleter.java + * + * @opensearch.internal + */ +final class ReplicaFileTracker { + + public static final Logger logger = LogManager.getLogger(ReplicaFileTracker.class); + private final Map refCounts = new HashMap<>(); + private final BiConsumer fileDeleter; + private final Set EXCLUDE_FILES = Set.of("write.lock"); + + public ReplicaFileTracker(BiConsumer fileDeleter) { + this.fileDeleter = fileDeleter; + } + + public synchronized void incRef(Collection fileNames) { + for (String fileName : fileNames) { + refCounts.merge(fileName, 1, Integer::sum); + } + } + + public synchronized int refCount(String file) { + return Optional.ofNullable(refCounts.get(file)).orElse(0); + } + + public synchronized void decRef(Collection fileNames) { + Set toDelete = new HashSet<>(); + for (String fileName : fileNames) { + Integer curCount = refCounts.get(fileName); + assert curCount != null : "fileName=" + fileName; + assert curCount > 0; + if (curCount == 1) { + refCounts.remove(fileName); + toDelete.add(fileName); + } else { + refCounts.put(fileName, curCount - 1); + } + } + if (toDelete.isEmpty() == false) { + delete(toDelete); + } + } + + public void deleteUnreferencedFiles(String... toDelete) { + for (String file : toDelete) { + if (canDelete(file)) { + delete(file); + } + } + } + + private synchronized void delete(Collection toDelete) { + for (String fileName : toDelete) { + delete(fileName); + } + } + + private synchronized void delete(String fileName) { + assert canDelete(fileName); + fileDeleter.accept("delete unreferenced", fileName); + } + + private synchronized boolean canDelete(String fileName) { + return EXCLUDE_FILES.contains(fileName) == false && refCounts.containsKey(fileName) == false; + } + +} diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 7b5f96892c005..16559eeabfd9b 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -56,7 +56,6 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.util.ThreadInterruptedException; -import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.cluster.metadata.DataStream; import org.opensearch.core.Assertions; import org.opensearch.ExceptionsHelper; @@ -4657,7 +4656,13 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re RemoteSegmentMetadata remoteSegmentMetadata = remoteDirectory.init(); Map uploadedSegments = remoteDirectory - .getSegmentsUploadedToRemoteStore(); + .getSegmentsUploadedToRemoteStore() + .entrySet() + .stream() + // if this is a refresh level sync, ignore any segments_n uploaded to the store, we will commit the received infos bytes + // locally. + .filter(entry -> refreshLevelSegmentSync && entry.getKey().startsWith(IndexFileNames.SEGMENTS) == false) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); store.incRef(); remoteStore.incRef(); try { @@ -4678,19 +4683,21 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal); if (refreshLevelSegmentSync && remoteSegmentMetadata != null) { - try ( - ChecksumIndexInput indexInput = new BufferedChecksumIndexInput( - new ByteArrayIndexInput("Snapshot of SegmentInfos", remoteSegmentMetadata.getSegmentInfosBytes()) - ); - ) { - SegmentInfos infosSnapshot = SegmentInfos.readCommit( - store.directory(), - indexInput, - remoteSegmentMetadata.getGeneration() - ); - long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY)); - store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint); + final SegmentInfos infosSnapshot = store.buildSegmentInfos( + remoteSegmentMetadata.getSegmentInfosBytes(), + remoteSegmentMetadata.getGeneration() + ); + long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY)); + // delete any other commits, we want to start the engine only from a new commit made with the downloaded infos bytes. + // Extra segments will be wiped on engine open. + for (String file : List.of(store.directory().listAll())) { + if (file.startsWith(IndexFileNames.SEGMENTS)) { + store.deleteQuiet(file); + } } + assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty() + : "There should not be any segments file in the dir"; + store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint); } } catch (IOException e) { throw new IndexShardRecoveryException(shardId, "Exception while copying segment files from remote segment store", e); diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 2e5ac10fefdda..9d5adc241ea0e 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -197,7 +197,7 @@ public RemoteSegmentMetadata readLatestMetadataFile() throws IOException { if (metadataFiles.isEmpty() == false) { String latestMetadataFile = metadataFiles.get(0); - logger.info("Reading latest Metadata file {}", latestMetadataFile); + logger.trace("Reading latest Metadata file {}", latestMetadataFile); remoteSegmentMetadata = readMetadataFile(latestMetadataFile); } else { logger.info("No metadata file found, this can happen for new index with no data uploaded to remote segment store"); diff --git a/server/src/main/java/org/opensearch/index/store/ReplicaFileTracker.java b/server/src/main/java/org/opensearch/index/store/ReplicaFileTracker.java deleted file mode 100644 index 0ec282619337c..0000000000000 --- a/server/src/main/java/org/opensearch/index/store/ReplicaFileTracker.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.store; - -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - -/** - * This class is a version of Lucene's ReplicaFileDeleter class used to keep track of - * segment files that should be preserved on replicas between replication events. - * The difference is this component does not actually perform any deletions, it only handles refcounts. - * Our deletions are made through Store.java. - * - * https://github.com/apache/lucene/blob/main/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaFileDeleter.java - * - * @opensearch.internal - */ -final class ReplicaFileTracker { - - private final Map refCounts = new HashMap<>(); - - public synchronized void incRef(Collection fileNames) { - for (String fileName : fileNames) { - refCounts.merge(fileName, 1, Integer::sum); - } - } - - public synchronized void decRef(Collection fileNames) { - for (String fileName : fileNames) { - Integer curCount = refCounts.get(fileName); - assert curCount != null : "fileName=" + fileName; - assert curCount > 0; - if (curCount == 1) { - refCounts.remove(fileName); - } else { - refCounts.put(fileName, curCount - 1); - } - } - } - - public synchronized boolean canDelete(String fileName) { - return refCounts.containsKey(fileName) == false; - } -} diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 921deae41946a..e94b89efb6cf6 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -65,7 +65,6 @@ import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.Version; import org.opensearch.ExceptionsHelper; -import org.opensearch.common.CheckedConsumer; import org.opensearch.common.UUIDs; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.common.io.stream.BytesStreamOutput; @@ -105,7 +104,6 @@ import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -124,7 +122,6 @@ import static java.util.Collections.unmodifiableMap; import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY; import static org.opensearch.index.store.Store.MetadataSnapshot.loadMetadata; -import static org.opensearch.indices.replication.SegmentReplicationTarget.REPLICATION_PREFIX; /** * A Store provides plain access to files written by an opensearch index shard. Each shard @@ -185,7 +182,6 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref // used to ref count files when a new Reader is opened for PIT/Scroll queries // prevents segment files deletion until the PIT/Scroll expires or is discarded - private final ReplicaFileTracker replicaFileTracker; private final AbstractRefCounted refCounter = new AbstractRefCounted("store") { @Override @@ -207,8 +203,6 @@ public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, this.directory = new StoreDirectory(sizeCachingDir, Loggers.getLogger("index.store.deletes", shardId)); this.shardLock = shardLock; this.onClose = onClose; - this.replicaFileTracker = indexSettings.isSegRepEnabled() ? new ReplicaFileTracker() : null; - assert onClose != null; assert shardLock != null; assert shardLock.getShardId().equals(shardId); @@ -788,90 +782,17 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr } } - /** - * Segment Replication method - * This method deletes files in store that are not referenced by latest on-disk commit point - * - * @param reason the reason for this cleanup operation logged for each deleted file - * @param fileToConsiderForCleanUp Files to consider for clean up. - * - * @throws IOException Exception on locking. - */ - public void cleanupAndPreserveLatestCommitPoint(Collection fileToConsiderForCleanUp, String reason) throws IOException { - assert indexSettings.isSegRepEnabled(); - // fetch a snapshot from the latest on disk Segments_N file. This can be behind - // the passed in local in memory snapshot, so we want to ensure files it references are not removed. - metadataLock.writeLock().lock(); - try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { - cleanupFiles(fileToConsiderForCleanUp, reason, this.readLastCommittedSegmentsInfo().files(true)); - } finally { - metadataLock.writeLock().unlock(); - } - } - - private void cleanupFiles(Collection filesToConsiderForCleanup, String reason, Collection lastCommittedSegmentInfos) { - assert metadataLock.isWriteLockedByCurrentThread(); - for (String existingFile : filesToConsiderForCleanup) { - if (Store.isAutogenerated(existingFile) || lastCommittedSegmentInfos != null && lastCommittedSegmentInfos.contains(existingFile) - // also ensure we are not deleting a file referenced by an active reader. - || replicaFileTracker != null && replicaFileTracker.canDelete(existingFile) == false - // Prevent temporary replication files as it should be cleaned up MultiFileWriter - || existingFile.startsWith(REPLICATION_PREFIX)) { - // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete - // checksum) - continue; - } - try { - directory.deleteFile(reason, existingFile); - } catch (IOException ex) { - if (existingFile.startsWith(IndexFileNames.SEGMENTS) || existingFile.startsWith(CORRUPTED_MARKER_NAME_PREFIX)) { - // TODO do we need to also fail this if we can't delete the pending commit file? - // if one of those files can't be deleted we better fail the cleanup otherwise we might leave an old commit - // point around? - throw new IllegalStateException("Can't delete " + existingFile + " - cleanup failed", ex); - } - logger.debug(() -> new ParameterizedMessage("failed to delete file [{}]", existingFile), ex); - // ignore, we don't really care, will get deleted later on - } - } - } - /** * Segment replication method * * This method takes the segment info bytes to build SegmentInfos. It inc'refs files pointed by passed in SegmentInfos * bytes to ensure they are not deleted. * - * @param tmpToFileName Map of temporary replication file to actual file name * @param infosBytes bytes[] of SegmentInfos supposed to be sent over by primary excluding segment_N file * @param segmentsGen segment generation number - * @param finalizeConsumer consumer for action on passed in SegmentInfos - * @param renameConsumer consumer for action on temporary copied over files * @throws IOException Exception while reading store and building segment infos */ - public void buildInfosFromBytes( - Map tmpToFileName, - byte[] infosBytes, - long segmentsGen, - CheckedConsumer finalizeConsumer, - CheckedConsumer, IOException> renameConsumer - ) throws IOException { - metadataLock.writeLock().lock(); - try { - final List values = new ArrayList<>(tmpToFileName.values()); - incRefFileDeleter(values); - try { - renameConsumer.accept(tmpToFileName); - finalizeConsumer.accept(buildSegmentInfos(infosBytes, segmentsGen)); - } finally { - decRefFileDeleter(values); - } - } finally { - metadataLock.writeLock().unlock(); - } - } - - private SegmentInfos buildSegmentInfos(byte[] infosBytes, long segmentsGen) throws IOException { + public SegmentInfos buildSegmentInfos(byte[] infosBytes, long segmentsGen) throws IOException { try (final ChecksumIndexInput input = toIndexInput(infosBytes)) { return SegmentInfos.readCommit(directory, input, segmentsGen); } @@ -959,7 +880,6 @@ public void commitSegmentInfos(SegmentInfos latestSegmentInfos, long maxSeqNo, l latestSegmentInfos.commit(directory()); directory.sync(latestSegmentInfos.files(true)); directory.syncMetaData(); - cleanupAndPreserveLatestCommitPoint(List.of(this.directory.listAll()), "After commit"); } finally { metadataLock.writeLock().unlock(); } @@ -2017,23 +1937,4 @@ private static IndexWriterConfig newIndexWriterConfig() { // we also don't specify a codec here and merges should use the engines for this index .setMergePolicy(NoMergePolicy.INSTANCE); } - - public void incRefFileDeleter(Collection files) { - if (this.indexSettings.isSegRepEnabled()) { - this.replicaFileTracker.incRef(files); - } - } - - public void decRefFileDeleter(Collection files) { - if (this.indexSettings.isSegRepEnabled()) { - this.replicaFileTracker.decRef(files); - try { - this.cleanupAndPreserveLatestCommitPoint(files, "On reader close"); - } catch (IOException e) { - // Log but do not rethrow - we can try cleaning up again after next replication cycle. - // If that were to fail, the shard will as well. - logger.error("Unable to clean store after reader closed", e); - } - } - } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index d3e62282d6b5c..d216721d702cc 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -245,7 +245,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi indexShard.prepareForIndexRecovery(); final boolean hasRemoteSegmentStore = indexShard.indexSettings().isRemoteStoreEnabled(); if (hasRemoteSegmentStore) { - indexShard.syncSegmentsFromRemoteSegmentStore(false, false); + indexShard.syncSegmentsFromRemoteSegmentStore(false, true); } final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled(); final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot(); diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java index 2b22af070023a..c463e6abd3df2 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java @@ -11,9 +11,11 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.store.Directory; +import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; import org.apache.lucene.util.Version; +import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.core.action.ActionListener; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardState; @@ -57,10 +59,11 @@ public void getCheckpointMetadata( ) { Map metadataMap; // TODO: Need to figure out a way to pass this information for segment metadata via remote store. - final Version version = indexShard.getSegmentInfosSnapshot().get().getCommitLuceneVersion(); - try { + try (final GatedCloseable segmentInfosSnapshot = indexShard.getSegmentInfosSnapshot()) { + final Version version = segmentInfosSnapshot.get().getCommitLuceneVersion(); RemoteSegmentMetadata mdFile = remoteDirectory.init(); - // During initial recovery flow, the remote store might not have metadata as primary hasn't uploaded anything yet. + // During initial recovery flow, the remote store might not + // have metadata as primary hasn't uploaded anything yet. if (mdFile == null && indexShard.state().equals(IndexShardState.STARTED) == false) { listener.onResponse(new CheckpointInfoResponse(checkpoint, Collections.emptyMap(), null)); return; diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 91c09e86f2602..376dae40cda90 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -12,6 +12,7 @@ import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; +import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.AlreadyClosedException; import org.opensearch.OpenSearchCorruptionException; import org.opensearch.OpenSearchException; @@ -34,8 +35,6 @@ import java.io.IOException; import java.util.List; import java.util.Locale; -import java.util.Map; -import java.util.stream.Collectors; /** * Represents the target of a replication event. @@ -170,7 +169,7 @@ public void startReplication(ActionListener listener) { }, listener::onFailure); getFilesListener.whenComplete(response -> { - finalizeReplication(checkpointInfoListener.result(), getFilesListener.result()); + finalizeReplication(checkpointInfoListener.result()); listener.onResponse(null); }, listener::onFailure); } @@ -201,8 +200,7 @@ private List getFiles(CheckpointInfoResponse checkpointInfo) return diff.missing; } - private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, GetSegmentFilesResponse getSegmentFilesResponse) - throws OpenSearchCorruptionException { + private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse) throws OpenSearchCorruptionException { cancellableThreads.checkForCancel(); state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION); // Handle empty SegmentInfos bytes for recovering replicas @@ -213,23 +211,12 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, try { store = store(); store.incRef(); - Map tempFileNames; - if (this.indexShard.indexSettings().isRemoteStoreEnabled() == true) { - tempFileNames = getSegmentFilesResponse.getFiles() - .stream() - .collect(Collectors.toMap(StoreFileMetadata::name, StoreFileMetadata::name)); - } else { - tempFileNames = multiFileWriter.getTempFileNames(); - } - store.buildInfosFromBytes( - tempFileNames, + multiFileWriter.renameAllTempFiles(); + final SegmentInfos infos = store.buildSegmentInfos( checkpointInfoResponse.getInfosBytes(), - checkpointInfoResponse.getCheckpoint().getSegmentsGen(), - indexShard::finalizeReplication, - this.indexShard.indexSettings().isRemoteStoreEnabled() == true - ? (files) -> {} - : (files) -> indexShard.store().renameTempFilesSafe(files) + checkpointInfoResponse.getCheckpoint().getSegmentsGen() ); + indexShard.finalizeReplication(infos); } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { // this is a fatal exception at this stage. // this means we transferred files from the remote that have not be checksummed and they are diff --git a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java index 4c87df48f583f..0aa96e38ebf3e 100644 --- a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java @@ -13,6 +13,7 @@ import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.store.IOContext; +import org.apache.lucene.util.Version; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.UUIDs; import org.opensearch.common.concurrent.GatedCloseable; @@ -51,14 +52,6 @@ public class NRTReplicationEngineTests extends EngineTestCase { Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build() ); - private static final IndexSettings REMOTE_STORE_INDEX_SETTINGS = IndexSettingsModule.newIndexSettings( - "index", - Settings.builder() - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true") - .build() - ); - public void testCreateEngine() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try ( @@ -144,29 +137,6 @@ public void testUpdateSegments_replicaReceivesSISWithHigherGen() throws IOExcept } } - public void testUpdateSegments_replicaReceivesSISWithHigherGen_remoteStoreEnabled() throws IOException { - final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - - try ( - final Store nrtEngineStore = createStore(REMOTE_STORE_INDEX_SETTINGS, newDirectory()); - final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, REMOTE_STORE_INDEX_SETTINGS) - ) { - // assume we start at the same gen. - assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration()); - assertEquals(nrtEngine.getLatestSegmentInfos().getGeneration(), nrtEngine.getLastCommittedSegmentInfos().getGeneration()); - assertEquals(engine.getLatestSegmentInfos().getGeneration(), nrtEngine.getLatestSegmentInfos().getGeneration()); - - // flush the primary engine - we don't need any segments, just force a new commit point. - engine.flush(true, true); - assertEquals(3, engine.getLatestSegmentInfos().getGeneration()); - - // When remote store is enabled, we don't commit on replicas since all segments are durably persisted in the store - nrtEngine.updateSegments(engine.getLatestSegmentInfos()); - assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); - assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration()); - } - } - public void testUpdateSegments_replicaReceivesSISWithLowerGen() throws IOException { // if the replica is already at segments_N that is received, it will commit segments_N+1. final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); @@ -175,8 +145,12 @@ public void testUpdateSegments_replicaReceivesSISWithLowerGen() throws IOExcepti final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) ) { + assertEquals(5, nrtEngine.getLatestSegmentInfos().getVersion()); nrtEngine.getLatestSegmentInfos().changed(); nrtEngine.getLatestSegmentInfos().changed(); + assertEquals(7, nrtEngine.getLatestSegmentInfos().getVersion()); + assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); + // commit the infos to push us to segments_3. nrtEngine.commitSegmentInfos(); assertEquals(3, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); @@ -185,6 +159,7 @@ public void testUpdateSegments_replicaReceivesSISWithLowerGen() throws IOExcepti // update the replica with segments_2 from the primary. final SegmentInfos primaryInfos = engine.getLatestSegmentInfos(); assertEquals(2, primaryInfos.getGeneration()); + assertEquals(5, primaryInfos.getVersion()); nrtEngine.updateSegments(primaryInfos); assertEquals(4, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); assertEquals(4, nrtEngine.getLatestSegmentInfos().getGeneration()); @@ -375,16 +350,9 @@ private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint, public void testGetSegmentInfosSnapshotPreservesFilesUntilRelease() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - // TODO: Remove this divergent commit logic and copy Segments_N from primary with node-node. - // randomly toggle commit / no commit. - IndexSettings settings = REMOTE_STORE_INDEX_SETTINGS; - final boolean shouldCommit = randomBoolean(); - if (shouldCommit) { - settings = INDEX_SETTINGS; - } try ( - final Store nrtEngineStore = createStore(REMOTE_STORE_INDEX_SETTINGS, newDirectory()); - final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, settings) + final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, INDEX_SETTINGS) ) { // only index 2 docs here, this will create segments _0 and _1 and after forcemerge into _2. final int docCount = 2; @@ -436,7 +404,174 @@ public void testGetSegmentInfosSnapshotPreservesFilesUntilRelease() throws Excep // Ensure we still have all the active files. Note - we exclude the infos file here if we aren't committing // the nrt reader will still reference segments_n-1 after being loaded until a local commit occurs. - assertTrue(replicaFiles.containsAll(nrtEngine.getLatestSegmentInfos().files(shouldCommit))); + assertTrue(replicaFiles.containsAll(nrtEngine.getLatestSegmentInfos().files(false))); + } + } + + public void testRemoveExtraSegmentsOnStartup() throws Exception { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + List operations = generateHistoryOnReplica(2, randomBoolean(), randomBoolean(), randomBoolean()); + for (Engine.Operation op : operations) { + applyOperation(engine, op); + // refresh to create a lot of segments. + engine.refresh("test"); + } + try (final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());) { + nrtEngineStore.createEmpty(Version.LATEST); + final Collection extraSegments = engine.getLatestSegmentInfos().files(false); + for (String file : extraSegments) { + nrtEngineStore.directory().copyFrom(store.directory(), file, file, IOContext.DEFAULT); + } + List replicaFiles = List.of(nrtEngineStore.directory().listAll()); + for (String file : extraSegments) { + assertTrue(replicaFiles.contains(file)); + } + assertTrue(storeContainsAll(nrtEngineStore, extraSegments)); + try (NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, INDEX_SETTINGS)) { + replicaFiles = List.of(nrtEngineStore.directory().listAll()); + for (String file : extraSegments) { + assertFalse(replicaFiles.contains(file)); + } + } + } + } + + public void testPreserveLatestCommit() throws Exception { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + + try ( + final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, INDEX_SETTINGS) + ) { + final int docCount = 4; + List operations = generateHistoryOnReplica(docCount, randomBoolean(), randomBoolean(), randomBoolean()); + indexOperations(nrtEngine, operations.subList(0, 2)); + // wipe the nrt directory initially so we can sync with primary. + cleanAndCopySegmentsFromPrimary(nrtEngine); + SegmentInfos primaryInfos; + + final SegmentInfos lastCommittedSegmentInfos = nrtEngine.getLastCommittedSegmentInfos(); + final Collection lastCommittedFiles = lastCommittedSegmentInfos.files(true); + assertTrue(storeContainsAll(nrtEngineStore, lastCommittedFiles)); + + // ensure segments and commit file are incref'd: + assertEquals( + "Segments_N is incref'd once", + 1, + nrtEngine.replicaFileTracker.refCount(lastCommittedSegmentInfos.getSegmentsFileName()) + ); + // segments are incref'd twice because they are loaded on the reader. + assertRefCount(nrtEngine, lastCommittedSegmentInfos.files(false), 2); + + // get and close a snapshot - this will decref files when closed. + final GatedCloseable segmentInfosSnapshot = nrtEngine.getSegmentInfosSnapshot(); + segmentInfosSnapshot.close(); + assertTrue(storeContainsAll(nrtEngineStore, lastCommittedFiles)); + + // index more docs and refresh the reader - this will incref/decref files again + indexOperations(nrtEngine, operations.subList(2, 4)); + primaryInfos = engine.getLatestSegmentInfos(); + copySegments(primaryInfos.files(false), nrtEngine); + nrtEngine.updateSegments(primaryInfos); + + // get the additional segments that are only on the reader - not part of a commit. + final Collection readerOnlySegments = primaryInfos.files(false); + readerOnlySegments.removeAll(lastCommittedFiles); + assertRefCount(nrtEngine, readerOnlySegments, 1); + + assertTrue(storeContainsAll(nrtEngineStore, lastCommittedFiles)); + assertEquals( + "Segments_N is incref'd once", + 1, + nrtEngine.replicaFileTracker.refCount(lastCommittedSegmentInfos.getSegmentsFileName()) + ); + assertRefCount(nrtEngine, lastCommittedSegmentInfos.files(false), 2); + + // flush the primary + engine.flush(true, true); + copySegments(engine.getLatestSegmentInfos().files(false), nrtEngine); + nrtEngine.updateSegments(engine.getLatestSegmentInfos()); + // after flush our segment_n is removed. + assertEquals( + "Segments_N is removed", + 0, + nrtEngine.replicaFileTracker.refCount(lastCommittedSegmentInfos.getSegmentsFileName()) + ); + assertFalse(storeContainsAll(nrtEngineStore, lastCommittedFiles)); + // close the engine - ensure we preserved the last commit + final SegmentInfos infosBeforeClose = nrtEngine.getLatestSegmentInfos(); + nrtEngine.close(); + assertTrue(storeContainsAll(nrtEngineStore, infosBeforeClose.files(false))); + assertEquals(store.readLastCommittedSegmentsInfo().files(false), infosBeforeClose.files(false)); + } + } + + private void assertRefCount(NRTReplicationEngine nrtEngine, Collection files, int count) { + for (String file : files) { + // refCount for our segments is 2 because they are still active on the reader + assertEquals(count, nrtEngine.replicaFileTracker.refCount(file)); + } + } + + private boolean storeContainsAll(Store nrtEngineStore, Collection lastCommittedFiles) throws IOException { + return List.of(nrtEngineStore.directory().listAll()).containsAll(lastCommittedFiles); + } + + private void cleanAndCopySegmentsFromPrimary(NRTReplicationEngine nrtEngine) throws IOException { + Lucene.cleanLuceneIndex(nrtEngine.store.directory()); + assertFalse( + Arrays.stream(nrtEngine.store.directory().listAll()) + .anyMatch(file -> file.equals("write.lock") == false && file.equals("extra0") == false) + ); + SegmentInfos primaryInfos = engine.getLatestSegmentInfos(); + copySegments(primaryInfos.files(false), nrtEngine); + nrtEngine.updateSegments(primaryInfos); + } + + private void indexOperations(NRTReplicationEngine nrtEngine, List operations) throws IOException { + for (Engine.Operation op : operations) { + applyOperation(engine, op); + applyOperation(nrtEngine, op); + engine.refresh("test"); + } + } + + public void testDecrefToZeroRemovesFile() throws IOException { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + + try ( + final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, INDEX_SETTINGS) + ) { + Lucene.cleanLuceneIndex(nrtEngineStore.directory()); + copySegments(engine.getLatestSegmentInfos().files(true), nrtEngine); + nrtEngine.updateSegments(engine.getLatestSegmentInfos()); + final SegmentInfos lastCommittedSegmentInfos = nrtEngine.getLastCommittedSegmentInfos(); + assertEquals( + "Segments_N is incref'd to 1", + 1, + nrtEngine.replicaFileTracker.refCount(lastCommittedSegmentInfos.getSegmentsFileName()) + ); + // create a new commit and update infos + engine.flush(true, true); + nrtEngine.updateSegments(engine.getLatestSegmentInfos()); + assertEquals( + "Segments_N is removed", + 0, + nrtEngine.replicaFileTracker.refCount(lastCommittedSegmentInfos.getSegmentsFileName()) + ); + assertFalse(List.of(nrtEngineStore.directory().listAll()).contains(lastCommittedSegmentInfos.getSegmentsFileName())); + } + } + + private void copySegments(Collection latestPrimaryFiles, Engine nrtEngine) throws IOException { + final Store store = nrtEngine.store; + final List replicaFiles = List.of(store.directory().listAll()); + // copy new segments in and load reader. + for (String file : latestPrimaryFiles) { + if (replicaFiles.contains(file) == false) { + store.directory().copyFrom(this.store.directory(), file, file, IOContext.DEFAULT); + } } } } diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java index a01169480de0b..c22fbdd8850ff 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java @@ -8,19 +8,32 @@ package org.opensearch.index.shard; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.util.Version; +import org.hamcrest.MatcherAssert; import org.junit.Before; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.engine.DocIdSeqNoAndSource; +import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.common.ReplicationType; import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.stream.Collectors; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.opensearch.index.engine.EngineTestCase.assertAtMostOneLuceneDocumentPerSequenceNumber; @@ -172,4 +185,141 @@ public void testNoDuplicateSeqNo() throws Exception { } } } + + public void testReplicaCommitsInfosBytesOnRecovery() throws Exception { + final Path remotePath = createTempDir(); + try (ReplicationGroup shards = createGroup(0, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory(), remotePath)) { + shards.startAll(); + // ensure primary has uploaded something + shards.indexDocs(10); + shards.refresh("test"); + + final IndexShard primary = shards.getPrimary(); + final Engine primaryEngine = getEngine(primary); + assertNotNull(primaryEngine); + final SegmentInfos latestCommit = SegmentInfos.readLatestCommit(primary.store().directory()); + assertEquals("On-disk commit references no segments", Set.of("segments_3"), latestCommit.files(true)); + assertEquals( + "Latest remote commit On-disk commit references no segments", + Set.of("segments_3"), + primary.remoteStore().readLastCommittedSegmentsInfo().files(true) + ); + MatcherAssert.assertThat( + "Segments are referenced in memory only", + primaryEngine.getSegmentInfosSnapshot().get().files(false), + containsInAnyOrder("_0.cfe", "_0.si", "_0.cfs") + ); + + final IndexShard replica = shards.addReplica(remotePath); + replica.store().createEmpty(Version.LATEST); + assertEquals( + "Replica starts at empty segment 2", + Set.of("segments_1"), + replica.store().readLastCommittedSegmentsInfo().files(true) + ); + // commit replica infos so it has a conflicting commit with remote. + final SegmentInfos segmentCommitInfos = replica.store().readLastCommittedSegmentsInfo(); + segmentCommitInfos.commit(replica.store().directory()); + segmentCommitInfos.commit(replica.store().directory()); + assertEquals( + "Replica starts recovery at empty segment 3", + Set.of("segments_3"), + replica.store().readLastCommittedSegmentsInfo().files(true) + ); + + shards.recoverReplica(replica); + + final Engine replicaEngine = getEngine(replica); + assertNotNull(replicaEngine); + final SegmentInfos latestReplicaCommit = SegmentInfos.readLatestCommit(replica.store().directory()); + logger.info(List.of(replica.store().directory().listAll())); + MatcherAssert.assertThat( + "Replica commits infos bytes referencing latest refresh point", + latestReplicaCommit.files(true), + containsInAnyOrder("_0.cfe", "_0.si", "_0.cfs", "segments_5") + ); + MatcherAssert.assertThat( + "Segments are referenced in memory", + replicaEngine.getSegmentInfosSnapshot().get().files(false), + containsInAnyOrder("_0.cfe", "_0.si", "_0.cfs") + ); + final Store.RecoveryDiff recoveryDiff = Store.segmentReplicationDiff( + primary.getSegmentMetadataMap(), + replica.getSegmentMetadataMap() + ); + assertTrue(recoveryDiff.missing.isEmpty()); + assertTrue(recoveryDiff.different.isEmpty()); + } + } + + public void testPrimaryRestart_PrimaryHasExtraCommits() throws Exception { + final Path remotePath = createTempDir(); + try (ReplicationGroup shards = createGroup(0, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory(), remotePath)) { + shards.startAll(); + // ensure primary has uploaded something + shards.indexDocs(10); + IndexShard primary = shards.getPrimary(); + if (randomBoolean()) { + flushShard(primary); + } else { + primary.refresh("test"); + } + assertDocCount(primary, 10); + // get a metadata map - we'll use segrep diff to ensure segments on reader are identical after restart. + final Map metadataBeforeRestart = primary.getSegmentMetadataMap(); + // restart the primary + shards.reinitPrimaryShard(remotePath); + // the store is open at this point but the shard has not yet run through recovery + primary = shards.getPrimary(); + SegmentInfos latestPrimaryCommit = SegmentInfos.readLatestCommit(primary.store().directory()); + latestPrimaryCommit.commit(primary.store().directory()); + latestPrimaryCommit = SegmentInfos.readLatestCommit(primary.store().directory()); + latestPrimaryCommit.commit(primary.store().directory()); + shards.startPrimary(); + assertDocCount(primary, 10); + final Store.RecoveryDiff diff = Store.segmentReplicationDiff(metadataBeforeRestart, primary.getSegmentMetadataMap()); + assertTrue(diff.missing.isEmpty()); + assertTrue(diff.different.isEmpty()); + } + } + + public void testRepicaCleansUpOldCommitsWhenReceivingNew() throws Exception { + final Path remotePath = createTempDir(); + try (ReplicationGroup shards = createGroup(1, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory(), remotePath)) { + shards.startAll(); + final IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + shards.indexDocs(1); + flushShard(primary); + replicateSegments(primary, shards.getReplicas()); + assertDocCount(primary, 1); + assertDocCount(replica, 1); + assertEquals("segments_4", replica.store().readLastCommittedSegmentsInfo().getSegmentsFileName()); + assertSingleSegmentFile(replica, "segments_4"); + + shards.indexDocs(1); + primary.refresh("test"); + replicateSegments(primary, shards.getReplicas()); + assertDocCount(replica, 2); + assertSingleSegmentFile(replica, "segments_4"); + + shards.indexDocs(1); + flushShard(primary); + replicateSegments(primary, shards.getReplicas()); + assertDocCount(replica, 3); + assertSingleSegmentFile(replica, "segments_5"); + + final Store.RecoveryDiff diff = Store.segmentReplicationDiff(primary.getSegmentMetadataMap(), replica.getSegmentMetadataMap()); + assertTrue(diff.missing.isEmpty()); + assertTrue(diff.different.isEmpty()); + } + } + + private void assertSingleSegmentFile(IndexShard shard, String fileName) throws IOException { + final Set segmentsFileNames = Arrays.stream(shard.store().directory().listAll()) + .filter(file -> file.startsWith(IndexFileNames.SEGMENTS)) + .collect(Collectors.toSet()); + assertEquals("Expected a single segment file", 1, segmentsFileNames.size()); + assertEquals(segmentsFileNames.stream().findFirst().get(), fileName); + } } diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 070bcae1b4a4b..88fcae17bf091 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -62,6 +62,7 @@ import org.opensearch.transport.TransportService; import java.io.IOException; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -773,6 +774,33 @@ public void testNoDuplicateSeqNo() throws Exception { } } + public void testPrimaryRestart() throws Exception { + final Path remotePath = createTempDir(); + try (ReplicationGroup shards = createGroup(0, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory(), remotePath)) { + shards.startAll(); + // ensure primary has uploaded something + shards.indexDocs(10); + IndexShard primary = shards.getPrimary(); + if (randomBoolean()) { + flushShard(primary); + } else { + primary.refresh("test"); + } + assertDocCount(primary, 10); + // get a metadata map - we'll use segrep diff to ensure segments on reader are identical after restart. + final Map metadataBeforeRestart = primary.getSegmentMetadataMap(); + // restart the primary + shards.reinitPrimaryShard(remotePath); + // the store is open at this point but the shard has not yet run through recovery + primary = shards.getPrimary(); + shards.startPrimary(); + assertDocCount(primary, 10); + final Store.RecoveryDiff diff = Store.segmentReplicationDiff(metadataBeforeRestart, primary.getSegmentMetadataMap()); + assertTrue(diff.missing.isEmpty()); + assertTrue(diff.different.isEmpty()); + } + } + /** * Assert persisted and searchable doc counts. This method should not be used while docs are concurrently indexed because * it asserts point in time seqNos are relative to the doc counts. diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java index 1df7c72cbc8a8..0359a9b926cd9 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java @@ -432,45 +432,57 @@ public void testTemporaryFilesNotCleanup() throws Exception { } } - // Todo: Move this test to SegmentReplicationIndexShardTests so that it runs for both node-node & remote store public void testReplicaReceivesLowerGeneration() throws Exception { // when a replica gets incoming segments that are lower than what it currently has on disk. - - // start 3 nodes Gens: P [2], R [2], R[2] - // index some docs and flush twice, push to only 1 replica. - // State Gens: P [4], R-1 [3], R-2 [2] - // Promote R-2 as the new primary and demote the old primary. - // State Gens: R[4], R-1 [3], P [4] - *commit on close of NRTEngine, xlog replayed and commit made. - // index docs on new primary and flush - // replicate to all. - // Expected result: State Gens: P[4], R-1 [4], R-2 [4] + // this can happen when a replica is promoted that is further behind the other replicas. try (ReplicationGroup shards = createGroup(2, getIndexSettings(), new NRTReplicationEngineFactory())) { shards.startAll(); final IndexShard primary = shards.getPrimary(); - final IndexShard replica_1 = shards.getReplicas().get(0); + final IndexShard behindReplicaBeforeRestart = shards.getReplicas().get(0); final IndexShard replica_2 = shards.getReplicas().get(1); int numDocs = randomIntBetween(10, 100); + int totalDocs = numDocs; shards.indexDocs(numDocs); - flushShard(primary, false); - replicateSegments(primary, List.of(replica_1)); + flushShard(primary, true); + replicateSegments(primary, List.of(behindReplicaBeforeRestart)); numDocs = randomIntBetween(numDocs + 1, numDocs + 10); + totalDocs += numDocs; shards.indexDocs(numDocs); - flushShard(primary, false); - replicateSegments(primary, List.of(replica_1)); + flushShard(primary, true); + flushShard(primary, true); + flushShard(primary, true); + replicateSegments(primary, List.of(behindReplicaBeforeRestart)); + + // close behindReplicaBeforeRestart - we will re-open it after replica_2 is promoted as new primary. + assertEqualCommittedSegments(primary, behindReplicaBeforeRestart); - assertEqualCommittedSegments(primary, replica_1); + assertDocCount(behindReplicaBeforeRestart, totalDocs); + assertDocCount(replica_2, 0); shards.promoteReplicaToPrimary(replica_2).get(); - primary.close("demoted", false, false); + primary.close("demoted", randomBoolean(), false); primary.store().close(); IndexShard oldPrimary = shards.addReplicaWithExistingPath(primary.shardPath(), primary.routingEntry().currentNodeId()); shards.recoverReplica(oldPrimary); + behindReplicaBeforeRestart.close("restart", false, false); + behindReplicaBeforeRestart.store().close(); + shards.removeReplica(behindReplicaBeforeRestart); + final IndexShard behindReplicaAfterRestart = shards.addReplicaWithExistingPath( + behindReplicaBeforeRestart.shardPath(), + behindReplicaBeforeRestart.routingEntry().currentNodeId() + ); + shards.recoverReplica(behindReplicaAfterRestart); + numDocs = randomIntBetween(numDocs + 1, numDocs + 10); + totalDocs += numDocs; shards.indexDocs(numDocs); flushShard(replica_2, false); replicateSegments(replica_2, shards.getReplicas()); - assertEqualCommittedSegments(replica_2, oldPrimary, replica_1); + assertEqualCommittedSegments(replica_2, oldPrimary, behindReplicaAfterRestart); + assertDocCount(replica_2, totalDocs); + assertDocCount(oldPrimary, totalDocs); + assertDocCount(behindReplicaAfterRestart, totalDocs); } } diff --git a/server/src/test/java/org/opensearch/index/store/StoreTests.java b/server/src/test/java/org/opensearch/index/store/StoreTests.java index 9043dcce1b779..a4812a4a771bf 100644 --- a/server/src/test/java/org/opensearch/index/store/StoreTests.java +++ b/server/src/test/java/org/opensearch/index/store/StoreTests.java @@ -100,7 +100,6 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -109,8 +108,6 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; -import java.util.stream.Stream; import static java.util.Collections.unmodifiableMap; import static org.hamcrest.Matchers.anyOf; @@ -1169,49 +1166,6 @@ public void testGetMetadataWithSegmentInfos() throws IOException { store.close(); } - public void testCleanupAndPreserveLatestCommitPoint() throws IOException { - final ShardId shardId = new ShardId("index", "_na_", 1); - Store store = new Store( - shardId, - SEGMENT_REPLICATION_INDEX_SETTINGS, - StoreTests.newDirectory(random()), - new DummyShardLock(shardId) - ); - commitRandomDocs(store); - - Store.MetadataSnapshot commitMetadata = store.getMetadata(); - - // index more docs but only IW.flush, this will create additional files we'll clean up. - final IndexWriter writer = indexRandomDocs(store); - writer.flush(); - writer.close(); - - final List additionalSegments = new ArrayList<>(); - for (String file : store.directory().listAll()) { - if (commitMetadata.contains(file) == false) { - additionalSegments.add(file); - } - } - assertFalse(additionalSegments.isEmpty()); - - Collection filesToConsiderForCleanUp = Stream.of(store.readLastCommittedSegmentsInfo().files(true), additionalSegments) - .flatMap(Collection::stream) - .collect(Collectors.toList()); - - // clean up everything not in the latest commit point. - store.cleanupAndPreserveLatestCommitPoint(filesToConsiderForCleanUp, "test"); - - // we want to ensure commitMetadata files are preserved after calling cleanup - for (String existingFile : store.directory().listAll()) { - if (!IndexWriter.WRITE_LOCK_NAME.equals(existingFile)) { - assertTrue(commitMetadata.contains(existingFile)); - assertFalse(additionalSegments.contains(existingFile)); - } - } - deleteContent(store.directory()); - IOUtils.close(store); - } - public void testGetSegmentMetadataMap() throws IOException { final ShardId shardId = new ShardId("index", "_na_", 1); Store store = new Store( diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index a0d5be240d552..53d2d4b8b40ab 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -622,8 +622,8 @@ public synchronized IndexShard getPrimary() { return primary; } - public synchronized void reinitPrimaryShard() throws IOException { - primary = reinitShard(primary); + public synchronized void reinitPrimaryShard(Path remotePath) throws IOException { + primary = reinitShard(primary, remotePath); computeReplicationTargets(); } diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index ebe43fc846899..a880aa25a4b4e 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -793,12 +793,18 @@ protected BlobContainer getBlobContainer(Path f) throws IOException { return new FsBlobContainer(fsBlobStore, blobPath, f); } + protected IndexShard reinitShard(IndexShard current, IndexingOperationListener... listeners) throws IOException { + return reinitShard(current, (Path) null, listeners); + } + /** * Takes an existing shard, closes it and starts a new initialing shard at the same location * + * @param current The current shard to reinit + * @param remotePath Remote path to recover from if remote storage is used * @param listeners new listerns to use for the newly created shard */ - protected IndexShard reinitShard(IndexShard current, IndexingOperationListener... listeners) throws IOException { + protected IndexShard reinitShard(IndexShard current, Path remotePath, IndexingOperationListener... listeners) throws IOException { final ShardRouting shardRouting = current.routingEntry(); return reinitShard( current, @@ -806,6 +812,7 @@ protected IndexShard reinitShard(IndexShard current, IndexingOperationListener.. shardRouting, shardRouting.primary() ? RecoverySource.ExistingStoreRecoverySource.INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE ), + remotePath, listeners ); } @@ -817,13 +824,18 @@ protected IndexShard reinitShard(IndexShard current, IndexingOperationListener.. * @param listeners new listerns to use for the newly created shard */ protected IndexShard reinitShard(IndexShard current, ShardRouting routing, IndexingOperationListener... listeners) throws IOException { + return reinitShard(current, routing, null, listeners); + } + + protected IndexShard reinitShard(IndexShard current, ShardRouting routing, Path remotePath, IndexingOperationListener... listeners) + throws IOException { return reinitShard( current, routing, current.indexSettings.getIndexMetadata(), current.engineFactory, current.engineConfigFactory, - null, + remotePath, listeners ); }