diff --git a/libs/core/src/main/java/org/opensearch/core/index/Index.java b/libs/core/src/main/java/org/opensearch/core/index/Index.java index fc5c5152a5500..50c3ccd2d5382 100644 --- a/libs/core/src/main/java/org/opensearch/core/index/Index.java +++ b/libs/core/src/main/java/org/opensearch/core/index/Index.java @@ -32,6 +32,8 @@ package org.opensearch.core.index; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; import org.opensearch.core.ParseField; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -134,6 +136,16 @@ public static Index fromXContent(final XContentParser parser) throws IOException return INDEX_PARSER.parse(parser, null).build(); } + public void writeTo(IndexOutput out) throws IOException { + out.writeString(name); + out.writeString(uuid); + } + + public Index(IndexInput in) throws IOException { + this.name = in.readString(); + this.uuid = in.readString(); + } + /** * Builder for Index objects. Used by ObjectParser instances only. * diff --git a/libs/core/src/main/java/org/opensearch/core/index/shard/ShardId.java b/libs/core/src/main/java/org/opensearch/core/index/shard/ShardId.java index b01121c3f30d4..9fb8c1d66b649 100644 --- a/libs/core/src/main/java/org/opensearch/core/index/shard/ShardId.java +++ b/libs/core/src/main/java/org/opensearch/core/index/shard/ShardId.java @@ -32,6 +32,8 @@ package org.opensearch.core.index.shard; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; import org.opensearch.core.common.Strings; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -147,4 +149,15 @@ public int compareTo(ShardId o) { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { return builder.value(toString()); } + + public void writeTo(IndexOutput out) throws IOException { + index.writeTo(out); + out.writeVInt(shardId); + } + + public ShardId(IndexInput in) throws IOException { + index = new Index(in); + shardId = in.readVInt(); + hashCode = computeHashCode(); + } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 5deaf5f9a483a..9f9f9674eb6aa 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2319,7 +2319,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b assert currentEngineReference.get() == null : "engine is running"; verifyNotClosed(); if (indexSettings.isRemoteStoreEnabled() && syncFromRemote) { - syncSegmentsFromRemoteSegmentStore(false, true, true); + syncSegmentsFromRemoteSegmentStore(false, true); } if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { if (syncFromRemote) { @@ -4547,7 +4547,7 @@ public void close() throws IOException { }; IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine)); if (indexSettings.isRemoteStoreEnabled()) { - syncSegmentsFromRemoteSegmentStore(false, true, true); + syncSegmentsFromRemoteSegmentStore(false, true); } if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { syncRemoteTranslogAndUpdateGlobalCheckpoint(); @@ -4604,14 +4604,53 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException { RemoteFsTranslog.download(repository, shardId, getThreadPool(), shardPath().resolveTranslog(), logger); } + /** + * Segment Replication method + * + * Downloads specified segments from remote store + * @param filesToFetch Files to download from remote store + * + */ + public List syncSegmentsFromRemoteSegmentStore(List filesToFetch) throws IOException { + assert indexSettings.isSegRepEnabled() && indexSettings.isRemoteStoreEnabled(); + logger.trace("Downloading segments files from remote store {}", filesToFetch); + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = getRemoteDirectory(); + RemoteSegmentMetadata remoteSegmentMetadata = remoteSegmentStoreDirectory.init(); + List downloadedSegments = new ArrayList<>(); + if (remoteSegmentMetadata != null) { + try { + store.incRef(); + remoteStore.incRef(); + final Directory storeDirectory = store.directory(); + logger.info("--> storeDirectory {}", storeDirectory.getClass()); + String segmentNFile = null; + for (StoreFileMetadata fileMetadata : filesToFetch) { + String file = fileMetadata.name(); + logger.info("--> Copying file {}", file); + storeDirectory.copyFrom(remoteSegmentStoreDirectory, file, file, IOContext.DEFAULT); + downloadedSegments.add(fileMetadata); + if (file.startsWith(IndexFileNames.SEGMENTS)) { + assert segmentNFile == null : "There should be only one SegmentInfosSnapshot file"; + segmentNFile = file; + } + } + storeDirectory.sync(downloadedSegments.stream().map(metadata -> metadata.name()).collect(Collectors.toList())); + } finally { + store.decRef(); + remoteStore.decRef(); + logger.trace("Downloaded segments from remote store {}", downloadedSegments); + } + } + return downloadedSegments; + } + /** * Downloads segments from remote segment store. * @param overrideLocal flag to override local segment files with those in remote store * @param refreshLevelSegmentSync last refresh checkpoint is used if true, commit checkpoint otherwise - * @param shouldCommit if the shard requires committing the changes after sync from remote. * @throws IOException if exception occurs while reading segments from remote store */ - public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync, boolean shouldCommit) + public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync) throws IOException { assert indexSettings.isRemoteStoreEnabled(); logger.info("Downloading segments from remote segment store"); @@ -4653,34 +4692,30 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re indexInput, remoteSegmentMetadata.getGeneration() ); - if (shouldCommit) { - long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY)); - // Following code block makes sure to use SegmentInfosSnapshot in the remote store if generation differs - // with local filesystem. If local filesystem already has segments_N+2 and infosSnapshot has generation N, - // after commit, there would be 2 files that would be created segments_N+1 and segments_N+2. With the - // policy of preserving only the latest commit, we will delete segments_N+1 which in fact is the part of the latest - // commit. - Optional localMaxSegmentInfos = localSegmentFiles.stream() - .filter(file -> file.startsWith(IndexFileNames.SEGMENTS)) - .max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName)); - if (localMaxSegmentInfos.isPresent() - && infosSnapshot.getGeneration() < SegmentInfos.generationFromSegmentsFileName(localMaxSegmentInfos.get()) - - 1) { - // If remote translog is not enabled, local translog will be created with different UUID. - // This fails in Store.trimUnsafeCommits() as translog UUID of checkpoint and SegmentInfos needs - // to be same. Following code block make sure to have the same UUID. - if (indexSettings.isRemoteTranslogStoreEnabled() == false) { - SegmentInfos localSegmentInfos = store.readLastCommittedSegmentsInfo(); - Map userData = new HashMap<>(infosSnapshot.getUserData()); - userData.put(TRANSLOG_UUID_KEY, localSegmentInfos.userData.get(TRANSLOG_UUID_KEY)); - infosSnapshot.setUserData(userData, false); - } - storeDirectory.deleteFile(localMaxSegmentInfos.get()); + long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY)); + // Following code block makes sure to use SegmentInfosSnapshot in the remote store if generation differs + // with local filesystem. If local filesystem already has segments_N+2 and infosSnapshot has generation N, + // after commit, there would be 2 files that would be created segments_N+1 and segments_N+2. With the + // policy of preserving only the latest commit, we will delete segments_N+1 which in fact is the part of the latest + // commit. + Optional localMaxSegmentInfos = localSegmentFiles.stream() + .filter(file -> file.startsWith(IndexFileNames.SEGMENTS)) + .max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName)); + if (localMaxSegmentInfos.isPresent() + && infosSnapshot.getGeneration() < SegmentInfos.generationFromSegmentsFileName(localMaxSegmentInfos.get()) + - 1) { + // If remote translog is not enabled, local translog will be created with different UUID. + // This fails in Store.trimUnsafeCommits() as translog UUID of checkpoint and SegmentInfos needs + // to be same. Following code block make sure to have the same UUID. + if (indexSettings.isRemoteTranslogStoreEnabled() == false) { + SegmentInfos localSegmentInfos = store.readLastCommittedSegmentsInfo(); + Map userData = new HashMap<>(infosSnapshot.getUserData()); + userData.put(TRANSLOG_UUID_KEY, localSegmentInfos.userData.get(TRANSLOG_UUID_KEY)); + infosSnapshot.setUserData(userData, false); } - store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint); - } else { - finalizeReplication(infosSnapshot); + storeDirectory.deleteFile(localMaxSegmentInfos.get()); } + store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint); } } } catch (IOException e) { diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 8f520c0e007bb..50ae733791b30 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -254,7 +254,7 @@ private synchronized void syncSegments(boolean isRetry) { public void onResponse(Void unused) { try { // Start metadata file upload - uploadMetadata(localSegmentsPostRefresh, segmentInfos); + uploadMetadata(localSegmentsPostRefresh, segmentInfos, checkpoint); clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh); onSuccessfulSegmentsSync( refreshTimeMs, @@ -382,7 +382,7 @@ private boolean isRefreshAfterCommit() throws IOException { && !remoteDirectory.containsFile(lastCommittedLocalSegmentFileName, getChecksumOfLocalFile(lastCommittedLocalSegmentFileName))); } - void uploadMetadata(Collection localSegmentsPostRefresh, SegmentInfos segmentInfos) throws IOException { + void uploadMetadata(Collection localSegmentsPostRefresh, SegmentInfos segmentInfos, ReplicationCheckpoint replicationCheckpoint) throws IOException { final long maxSeqNo = ((InternalEngine) indexShard.getEngine()).currentOngoingRefreshCheckpoint(); SegmentInfos segmentInfosSnapshot = segmentInfos.clone(); Map userData = segmentInfosSnapshot.getUserData(); @@ -399,8 +399,8 @@ void uploadMetadata(Collection localSegmentsPostRefresh, SegmentInfos se localSegmentsPostRefresh, segmentInfosSnapshot, storeDirectory, - indexShard.getOperationPrimaryTerm(), - translogFileGeneration + translogFileGeneration, + replicationCheckpoint ); } } diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index d4e779c83644f..289854be7c1e0 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -530,7 +530,7 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco remoteStore.incRef(); try { // Download segments from remote segment store - indexShard.syncSegmentsFromRemoteSegmentStore(true, true, true); + indexShard.syncSegmentsFromRemoteSegmentStore(true, true); if (store.directory().listAll().length == 0) { store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion); diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index d3e8d961337cc..ae60eb9284527 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -41,6 +41,7 @@ import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.threadpool.ThreadPool; import java.io.FileNotFoundException; @@ -569,12 +570,12 @@ public void uploadMetadata( Collection segmentFiles, SegmentInfos segmentInfosSnapshot, Directory storeDirectory, - long primaryTerm, - long translogGeneration + long translogGeneration, + ReplicationCheckpoint replicationCheckpoint ) throws IOException { synchronized (this) { String metadataFilename = MetadataFilenameUtils.getMetadataFilename( - primaryTerm, + replicationCheckpoint.getPrimaryTerm(), segmentInfosSnapshot.getGeneration(), translogGeneration, metadataUploadCounter.incrementAndGet(), @@ -602,8 +603,7 @@ public void uploadMetadata( new RemoteSegmentMetadata( RemoteSegmentMetadata.fromMapOfStrings(uploadedSegments), segmentInfoSnapshotByteArray, - primaryTerm, - segmentInfosSnapshot.getGeneration() + replicationCheckpoint ) ); } diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 8967100d4faf0..d7837f85e79ac 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -845,22 +845,23 @@ private void cleanupFiles(Collection filesToConsiderForCleanup, String r * @param tmpToFileName Map of temporary replication file to actual file name * @param infosBytes bytes[] of SegmentInfos supposed to be sent over by primary excluding segment_N file * @param segmentsGen segment generation number - * @param consumer consumer for generated SegmentInfos + * @param finalizeConsumer consumer for generated SegmentInfos * @throws IOException Exception while reading store and building segment infos */ public void buildInfosFromBytes( Map tmpToFileName, byte[] infosBytes, long segmentsGen, - CheckedConsumer consumer + CheckedConsumer finalizeConsumer, + CheckedConsumer, IOException> renameConsumer ) throws IOException { metadataLock.writeLock().lock(); try { final List values = new ArrayList<>(tmpToFileName.values()); incRefFileDeleter(values); try { - renameTempFilesSafe(tmpToFileName); - consumer.accept(buildSegmentInfos(infosBytes, segmentsGen)); + renameConsumer.accept(tmpToFileName); + finalizeConsumer.accept(buildSegmentInfos(infosBytes, segmentsGen)); } finally { decRefFileDeleter(values); } diff --git a/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadata.java b/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadata.java index 9a479346ff711..a94429eb2ecc6 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadata.java +++ b/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadata.java @@ -15,6 +15,7 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.opensearch.index.store.RemoteSegmentStoreDirectory; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; /** * Metadata object for Remote Segment @@ -38,19 +39,29 @@ public class RemoteSegmentMetadata { private final byte[] segmentInfosBytes; - private final long primaryTerm; - private final long generation; +// private final long primaryTerm; +// private final long generation; +// +// private final long version; +// +// private final long length; +// +// private final String codec; + + public ReplicationCheckpoint getReplicationCheckpoint() { + return replicationCheckpoint; + } + + private final ReplicationCheckpoint replicationCheckpoint; public RemoteSegmentMetadata( Map metadata, byte[] segmentInfosBytes, - long primaryTerm, - long generation + ReplicationCheckpoint replicationCheckpoint ) { this.metadata = metadata; this.segmentInfosBytes = segmentInfosBytes; - this.generation = generation; - this.primaryTerm = primaryTerm; + this.replicationCheckpoint = replicationCheckpoint; } /** @@ -66,11 +77,11 @@ public byte[] getSegmentInfosBytes() { } public long getGeneration() { - return generation; + return replicationCheckpoint.getSegmentsGen(); } public long getPrimaryTerm() { - return primaryTerm; + return replicationCheckpoint.getPrimaryTerm(); } /** @@ -99,19 +110,17 @@ public static Map f public void write(IndexOutput out) throws IOException { out.writeMapOfStrings(toMapOfStrings()); - out.writeLong(generation); - out.writeLong(primaryTerm); + replicationCheckpoint.writeTo(out); out.writeLong(segmentInfosBytes.length); out.writeBytes(segmentInfosBytes, segmentInfosBytes.length); } public static RemoteSegmentMetadata read(IndexInput indexInput) throws IOException { Map metadata = indexInput.readMapOfStrings(); - long generation = indexInput.readLong(); - long primaryTerm = indexInput.readLong(); + ReplicationCheckpoint replicationCheckpoint = new ReplicationCheckpoint(indexInput); int byteArraySize = (int) indexInput.readLong(); byte[] segmentInfosBytes = new byte[byteArraySize]; indexInput.readBytes(segmentInfosBytes, 0, byteArraySize); - return new RemoteSegmentMetadata(RemoteSegmentMetadata.fromMapOfStrings(metadata), segmentInfosBytes, primaryTerm, generation); + return new RemoteSegmentMetadata(RemoteSegmentMetadata.fromMapOfStrings(metadata), segmentInfosBytes, replicationCheckpoint); } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index a289c8f8a04b7..88dd2da7bf5d9 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -245,7 +245,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi indexShard.prepareForIndexRecovery(); final boolean hasRemoteSegmentStore = indexShard.indexSettings().isRemoteStoreEnabled(); if (hasRemoteSegmentStore) { - indexShard.syncSegmentsFromRemoteSegmentStore(false, false, true); + indexShard.syncSegmentsFromRemoteSegmentStore(false, false); } final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled(); final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot(); diff --git a/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesResponse.java b/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesResponse.java index 89d50a17464a6..222b1cfaa8574 100644 --- a/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesResponse.java +++ b/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesResponse.java @@ -23,6 +23,10 @@ */ public class GetSegmentFilesResponse extends TransportResponse { + public List getFiles() { + return files; + } + List files; public GetSegmentFilesResponse(List files) { diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java index c5be7635782af..8e71fd2295e35 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java @@ -33,7 +33,7 @@ */ public class RemoteStoreReplicationSource implements SegmentReplicationSource { - private static final Logger logger = LogManager.getLogger(PrimaryShardReplicationSource.class); + private static final Logger logger = LogManager.getLogger(RemoteStoreReplicationSource.class); private final IndexShard indexShard; @@ -77,8 +77,8 @@ public void getCheckpointMetadata( ) ) ); - // TODO: GET current checkpoint from remote store. - listener.onResponse(new CheckpointInfoResponse(checkpoint, metadataMap, null)); + logger.info("--> Sending empty checkpoint"); + listener.onResponse(new CheckpointInfoResponse(mdFile.getReplicationCheckpoint(), metadataMap, mdFile.getSegmentInfosBytes())); } catch (Exception e) { listener.onFailure(e); } @@ -93,8 +93,9 @@ public void getSegmentFiles( ActionListener listener ) { try { - indexShard.syncSegmentsFromRemoteSegmentStore(false, true, false); - listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList())); + List downloadedFiles = indexShard.syncSegmentsFromRemoteSegmentStore(filesToFetch); + assert downloadedFiles.size() == filesToFetch.size() && downloadedFiles.containsAll(filesToFetch); + listener.onResponse(new GetSegmentFilesResponse(downloadedFiles)); } catch (Exception e) { listener.onFailure(e); } @@ -102,6 +103,6 @@ public void getSegmentFiles( @Override public String getDescription() { - return "remote store"; + return "RemoteStoreReplicationSource"; } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 2e0f5a8c0ad1f..cac2509882f16 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -38,8 +38,11 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Locale; +import java.util.Map; +import java.util.stream.Collectors; /** * Represents the target of a replication event. @@ -148,7 +151,7 @@ public void startReplication(ActionListener listener) { final StepListener checkpointInfoListener = new StepListener<>(); final StepListener getFilesListener = new StepListener<>(); - logger.trace(new ParameterizedMessage("Starting Replication Target: {}", description())); + logger.info(new ParameterizedMessage("Starting Replication Target: {}", description())); // Get list of files to copy from this checkpoint. state.setStage(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO); cancellableThreads.checkForCancel(); @@ -158,11 +161,13 @@ public void startReplication(ActionListener listener) { final List filesToFetch = getFiles(checkpointInfo); state.setStage(SegmentReplicationState.Stage.GET_FILES); cancellableThreads.checkForCancel(); + logger.info("--> Before getFiles {}", Arrays.toString(indexShard.store().directory().listAll())); source.getSegmentFiles(getId(), checkpointInfo.getCheckpoint(), filesToFetch, indexShard, getFilesListener); }, listener::onFailure); getFilesListener.whenComplete(response -> { - finalizeReplication(checkpointInfoListener.result()); + logger.info("--> After getFiles {}", Arrays.toString(indexShard.store().directory().listAll())); + finalizeReplication(checkpointInfoListener.result(), getFilesListener.result()); listener.onResponse(null); }, listener::onFailure); } @@ -193,23 +198,26 @@ private List getFiles(CheckpointInfoResponse checkpointInfo) return diff.missing; } - private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse) throws OpenSearchCorruptionException { - // TODO: Refactor the logic so that finalize doesn't have to be invoked for remote store as source - if (source instanceof RemoteStoreReplicationSource) { - state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION); - return; - } + private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, GetSegmentFilesResponse getSegmentFilesResponse) throws OpenSearchCorruptionException { cancellableThreads.checkForCancel(); state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION); Store store = null; try { store = store(); store.incRef(); + Map tempFileNames = null; + if (this.indexShard.indexSettings().isRemoteStoreEnabled() == true) { + tempFileNames = getSegmentFilesResponse.getFiles() != null ? getSegmentFilesResponse.getFiles().stream().collect(Collectors.toMap(StoreFileMetadata::name, StoreFileMetadata::name)) : Collections.emptyMap(); + } else { + tempFileNames = multiFileWriter.getTempFileNames(); + } + logger.info("--> tempFileNames {} checkpointInfoResponse.getCheckpoint().getSegmentsGen() {}", tempFileNames, checkpointInfoResponse.getCheckpoint().getSegmentsGen()); store.buildInfosFromBytes( - multiFileWriter.getTempFileNames(), + tempFileNames, checkpointInfoResponse.getInfosBytes(), checkpointInfoResponse.getCheckpoint().getSegmentsGen(), - indexShard::finalizeReplication + indexShard::finalizeReplication, + this.indexShard.indexSettings().isRemoteStoreEnabled() == true ? (files) -> {}: (files) -> indexShard.store().renameTempFilesSafe(files) ); } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { // this is a fatal exception at this stage. @@ -247,23 +255,13 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse) } } - /** - * This method formats our byte[] containing the primary's SegmentInfos into lucene's {@link ChecksumIndexInput} that can be - * passed to SegmentInfos.readCommit - */ - private ChecksumIndexInput toIndexInput(byte[] input) { - return new BufferedChecksumIndexInput( - new ByteBuffersIndexInput(new ByteBuffersDataInput(Arrays.asList(ByteBuffer.wrap(input))), "SegmentInfos") - ); - } - /** * Trigger a cancellation, this method will not close the target a subsequent call to #fail is required from target service. */ @Override public void cancel(String reason) { if (finished.get() == false) { - logger.trace(new ParameterizedMessage("Cancelling replication for target {}", description())); + logger.info(new ParameterizedMessage("Cancelling replication for target {}", description())); cancellableThreads.cancel(reason); source.cancel(); } diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java index da01023ace47c..4743d6efb7eb6 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java @@ -8,6 +8,8 @@ package org.opensearch.indices.replication.checkpoint; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; import org.opensearch.Version; import org.opensearch.common.Nullable; import org.opensearch.core.common.io.stream.StreamInput; @@ -182,4 +184,22 @@ public String toString() { + codec + '}'; } + + public void writeTo(IndexOutput out) throws IOException { + shardId.writeTo(out); + out.writeLong(primaryTerm); + out.writeLong(segmentsGen); + out.writeLong(segmentInfosVersion); + out.writeLong(length); + out.writeString(codec); + } + + public ReplicationCheckpoint(IndexInput in) throws IOException { + shardId = new ShardId(in); + primaryTerm = in.readLong(); + segmentsGen = in.readLong(); + segmentInfosVersion = in.readLong(); + length = in.readLong(); + codec = in.readString(); + } } diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithRemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithRemoteIndexShardTests.java index cae15a2c53f3f..01e21f17ee3f4 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithRemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithRemoteIndexShardTests.java @@ -36,7 +36,7 @@ public void testReplicaSyncingFromRemoteStore() throws IOException { assertDocs(primaryShard, "1", "2"); flushShard(primaryShard); - replicaShard.syncSegmentsFromRemoteSegmentStore(true, true, false); + replicaShard.syncSegmentsFromRemoteSegmentStore(true, true); assertDocs(replicaShard, "1", "2"); closeShards(primaryShard, replicaShard); } diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 72d8c234d71c8..c89e4529a1ab0 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -8,6 +8,7 @@ package org.opensearch.index.store; +import org.apache.lucene.codecs.Codec; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexFormatTooNewException; @@ -42,6 +43,7 @@ import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; @@ -644,7 +646,7 @@ public void testUploadMetadataEmpty() throws IOException { Collection segmentFiles = List.of("s1", "s2", "s3"); assertThrows( NoSuchFileException.class, - () -> remoteSegmentStoreDirectory.uploadMetadata(segmentFiles, segmentInfos, storeDirectory, 12L, 34L) + () -> remoteSegmentStoreDirectory.uploadMetadata(segmentFiles, segmentInfos, storeDirectory, 12L, ReplicationCheckpoint.empty(indexShard.shardId(), Codec.getDefault().getName())) ); } @@ -663,7 +665,7 @@ public void testUploadMetadataNonEmpty() throws IOException { ); Collection segmentFiles = List.of("_0.si", "_0.cfe", "_0.cfs", "segments_1"); - remoteSegmentStoreDirectory.uploadMetadata(segmentFiles, segmentInfos, storeDirectory, 12L, 34L); + remoteSegmentStoreDirectory.uploadMetadata(segmentFiles, segmentInfos, storeDirectory, 12L, indexShard.getLatestReplicationCheckpoint()); verify(remoteMetadataDirectory).copyFrom( eq(storeDirectory), diff --git a/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java b/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java index dc7940529f024..5387750d14de1 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java @@ -105,8 +105,7 @@ public void testWriteContent() throws IOException { RemoteSegmentMetadata remoteSegmentMetadata = new RemoteSegmentMetadata( RemoteSegmentMetadata.fromMapOfStrings(expectedOutput), segmentInfosBytes, - 1234, - 1234 + indexShard.getLatestReplicationCheckpoint() ); remoteSegmentMetadataHandler.writeContent(indexOutput, remoteSegmentMetadata); indexOutput.close(); diff --git a/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java b/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java index deae4eaca19e7..05ec83c530db1 100644 --- a/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java @@ -188,7 +188,7 @@ public void testGetSegmentFilesFailure() throws IOException { ); Mockito.doThrow(new RuntimeException("testing")) .when(mockShard) - .syncSegmentsFromRemoteSegmentStore(Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean()); + .syncSegmentsFromRemoteSegmentStore(Mockito.anyBoolean(), Mockito.anyBoolean()); assertThrows(ExecutionException.class, () -> { final PlainActionFuture res = PlainActionFuture.newFuture(); replicationSource.getSegmentFiles(REPLICATION_ID, checkpoint, Collections.emptyList(), mockShard, res);