Skip to content

Commit

Permalink
[Segment Replication] Refactor RemoteStoreReplicationSource (#8767)
Browse files Browse the repository at this point in the history
* [Segment Replication] Refactor remote replication source

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Unit test updates

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Self review

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Self review

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Segregate shard level tests for node to node and remote store segment replication

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Fix failing unit tests

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Fix failing UT

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Fix failing UT

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Address review comments

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Fix more unit tests

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Improve RemoteStoreReplicationSourceTests, remove unnecessary mocks and use actual failures for failure/exception use cases

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Spotless check fix

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Address review comments

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Ignore files already in store while computing segment file diff with primary

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Spotless fix

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Fix failing UT

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Spotless fix

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Move read/writes from IndexInput/Output to RemoteSegmentMetadata

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Address review commnt

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Update recovery flow to perform commits during recovery

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Remove un-necessary char

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Address review comments

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Update comment nit-pick

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Remove deletion logic causing read issues due to deleted segments_N

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Spotless fix

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Fix unit tests

Signed-off-by: Suraj Singh <surajrider@gmail.com>

---------

Signed-off-by: Suraj Singh <surajrider@gmail.com>
  • Loading branch information
dreamer-89 authored Aug 2, 2023
1 parent 11c8426 commit 57eb105
Show file tree
Hide file tree
Showing 24 changed files with 1,291 additions and 1,128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.indices.replication;

import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand All @@ -24,7 +23,6 @@
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexService;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.shard.IndexShard;
Expand Down Expand Up @@ -134,24 +132,6 @@ protected void waitForSearchableDocs(long docCount, String... nodes) throws Exce
waitForSearchableDocs(docCount, Arrays.stream(nodes).collect(Collectors.toList()));
}

protected void waitForSegmentReplication(String node) throws Exception {
assertBusy(() -> {
SegmentReplicationStatsResponse segmentReplicationStatsResponse = client(node).admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.setDetailed(true)
.execute()
.actionGet();
final SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats()
.get(INDEX_NAME)
.get(0);
assertEquals(
perGroupStats.getReplicaStats().stream().findFirst().get().getCurrentReplicationState().getStage(),
SegmentReplicationState.Stage.DONE
);
}, 1, TimeUnit.MINUTES);
}

protected void verifyStoreContent() throws Exception {
assertBusy(() -> {
final ClusterState clusterState = getClusterState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,12 @@ public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exce
assertTrue(
replicaStats.directoryFileTransferTrackerStats.transferredBytesStarted > 0
&& primaryStats.uploadBytesStarted
- zeroStatePrimaryStats.uploadBytesStarted == replicaStats.directoryFileTransferTrackerStats.transferredBytesStarted
- zeroStatePrimaryStats.uploadBytesStarted >= replicaStats.directoryFileTransferTrackerStats.transferredBytesStarted
);
assertTrue(
replicaStats.directoryFileTransferTrackerStats.transferredBytesSucceeded > 0
&& primaryStats.uploadBytesSucceeded
- zeroStatePrimaryStats.uploadBytesSucceeded == replicaStats.directoryFileTransferTrackerStats.transferredBytesSucceeded
- zeroStatePrimaryStats.uploadBytesSucceeded >= replicaStats.directoryFileTransferTrackerStats.transferredBytesSucceeded
);
// Assert zero failures
assertEquals(0, primaryStats.uploadBytesFailed);
Expand Down Expand Up @@ -369,8 +369,8 @@ public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() thr
assertEquals(0, uploadsFailed);
assertEquals(0, uploadBytesFailed);
for (int j = 0; j < response.getSuccessfulShards() - 1; j++) {
assertEquals(uploadBytesStarted - zeroStatePrimaryStats.uploadBytesStarted, (long) downloadBytesStarted.get(j));
assertEquals(uploadBytesSucceeded - zeroStatePrimaryStats.uploadBytesSucceeded, (long) downloadBytesSucceeded.get(j));
assertTrue(uploadBytesStarted - zeroStatePrimaryStats.uploadBytesStarted > downloadBytesStarted.get(j));
assertTrue(uploadBytesSucceeded - zeroStatePrimaryStats.uploadBytesSucceeded > downloadBytesSucceeded.get(j));
assertEquals(0, (long) downloadBytesFailed.get(j));
}
}, 60, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
.put(super.nodeSettings(nodeOrdinal))
.put(ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.getKey(), 0) // We have tests that check by-timestamp order
.put(FeatureFlags.REMOTE_STORE, "true")
.put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true")
.put(remoteStoreClusterSettings("remote-store-repo-name"))
.build();
}
Expand Down
54 changes: 10 additions & 44 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -1988,7 +1986,7 @@ private long recoverLocallyUpToGlobalCheckpoint() {
final Optional<SequenceNumbers.CommitInfo> safeCommit;
final long globalCheckpoint;
try {
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(TRANSLOG_UUID_KEY);
globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
safeCommit = store.findSafeIndexCommit(globalCheckpoint);
} catch (org.apache.lucene.index.IndexNotFoundException e) {
Expand Down Expand Up @@ -2088,7 +2086,7 @@ private long recoverLocallyUptoLastCommit() {
try {
seqNo = Long.parseLong(store.readLastCommittedSegmentsInfo().getUserData().get(MAX_SEQ_NO));
} catch (org.apache.lucene.index.IndexNotFoundException e) {
logger.error("skip local recovery as no index commit found", e);
logger.error("skip local recovery as no index commit found");
return UNASSIGNED_SEQ_NO;
} catch (Exception e) {
logger.error("skip local recovery as failed to find the safe commit", e);
Expand Down Expand Up @@ -2242,7 +2240,7 @@ private void loadGlobalCheckpointToReplicationTracker() throws IOException {
// we have to set it before we open an engine and recover from the translog because
// acquiring a snapshot from the translog causes a sync which causes the global checkpoint to be pulled in,
// and an engine can be forced to close in ctor which also causes the global checkpoint to be pulled in.
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(TRANSLOG_UUID_KEY);
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint");
}
Expand Down Expand Up @@ -2326,7 +2324,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b
assert currentEngineReference.get() == null : "engine is running";
verifyNotClosed();
if (indexSettings.isRemoteStoreEnabled() && syncFromRemote) {
syncSegmentsFromRemoteSegmentStore(false, true, true);
syncSegmentsFromRemoteSegmentStore(false, true);
}
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
if (syncFromRemote) {
Expand Down Expand Up @@ -4555,7 +4553,7 @@ public void close() throws IOException {
};
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
if (indexSettings.isRemoteStoreEnabled()) {
syncSegmentsFromRemoteSegmentStore(false, true, true);
syncSegmentsFromRemoteSegmentStore(false, true);
}
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
syncRemoteTranslogAndUpdateGlobalCheckpoint();
Expand Down Expand Up @@ -4616,13 +4614,11 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException {
* Downloads segments from remote segment store.
* @param overrideLocal flag to override local segment files with those in remote store
* @param refreshLevelSegmentSync last refresh checkpoint is used if true, commit checkpoint otherwise
* @param shouldCommit if the shard requires committing the changes after sync from remote.
* @throws IOException if exception occurs while reading segments from remote store
*/
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync, boolean shouldCommit)
throws IOException {
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync) throws IOException {
assert indexSettings.isRemoteStoreEnabled();
logger.info("Downloading segments from remote segment store");
logger.trace("Downloading segments from remote segment store");
RemoteSegmentStoreDirectory remoteDirectory = getRemoteDirectory();
// We need to call RemoteSegmentStoreDirectory.init() in order to get latest metadata of the files that
// are uploaded to the remote segment store.
Expand All @@ -4647,7 +4643,6 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
} else {
storeDirectory = store.directory();
}
Set<String> localSegmentFiles = Sets.newHashSet(storeDirectory.listAll());
copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal);

if (refreshLevelSegmentSync && remoteSegmentMetadata != null) {
Expand All @@ -4661,37 +4656,8 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
indexInput,
remoteSegmentMetadata.getGeneration()
);
// Replicas never need a local commit
if (shouldCommit) {
if (this.shardRouting.primary()) {
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
// Following code block makes sure to use SegmentInfosSnapshot in the remote store if generation differs
// with local filesystem. If local filesystem already has segments_N+2 and infosSnapshot has generation N,
// after commit, there would be 2 files that would be created segments_N+1 and segments_N+2. With the
// policy of preserving only the latest commit, we will delete segments_N+1 which in fact is the part of the
// latest commit.
Optional<String> localMaxSegmentInfos = localSegmentFiles.stream()
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
.max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName));
if (localMaxSegmentInfos.isPresent()
&& infosSnapshot.getGeneration() < SegmentInfos.generationFromSegmentsFileName(localMaxSegmentInfos.get())
- 1) {
// If remote translog is not enabled, local translog will be created with different UUID.
// This fails in Store.trimUnsafeCommits() as translog UUID of checkpoint and SegmentInfos needs
// to be same. Following code block make sure to have the same UUID.
if (indexSettings.isRemoteTranslogStoreEnabled() == false) {
SegmentInfos localSegmentInfos = store.readLastCommittedSegmentsInfo();
Map<String, String> userData = new HashMap<>(infosSnapshot.getUserData());
userData.put(TRANSLOG_UUID_KEY, localSegmentInfos.userData.get(TRANSLOG_UUID_KEY));
infosSnapshot.setUserData(userData, false);
}
storeDirectory.deleteFile(localMaxSegmentInfos.get());
}
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
}
} else {
finalizeReplication(infosSnapshot);
}
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
}
}
} catch (IOException e) {
Expand All @@ -4716,7 +4682,7 @@ public void syncSegmentsFromGivenRemoteSegmentStore(
long primaryTerm,
long commitGeneration
) throws IOException {
logger.info("Downloading segments from given remote segment store");
logger.trace("Downloading segments from given remote segment store");
RemoteSegmentStoreDirectory remoteDirectory = null;
if (remoteStore != null) {
remoteDirectory = getRemoteDirectory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ private synchronized boolean syncSegments() {
public void onResponse(Void unused) {
try {
// Start metadata file upload
uploadMetadata(localSegmentsPostRefresh, segmentInfos);
uploadMetadata(localSegmentsPostRefresh, segmentInfos, checkpoint);
clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh);
onSuccessfulSegmentsSync(
refreshTimeMs,
Expand Down Expand Up @@ -327,7 +327,8 @@ private boolean isRefreshAfterCommit() throws IOException {
&& !remoteDirectory.containsFile(lastCommittedLocalSegmentFileName, getChecksumOfLocalFile(lastCommittedLocalSegmentFileName)));
}

void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos segmentInfos) throws IOException {
void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos segmentInfos, ReplicationCheckpoint replicationCheckpoint)
throws IOException {
final long maxSeqNo = ((InternalEngine) indexShard.getEngine()).currentOngoingRefreshCheckpoint();
SegmentInfos segmentInfosSnapshot = segmentInfos.clone();
Map<String, String> userData = segmentInfosSnapshot.getUserData();
Expand All @@ -344,8 +345,8 @@ void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos se
localSegmentsPostRefresh,
segmentInfosSnapshot,
storeDirectory,
indexShard.getOperationPrimaryTerm(),
translogFileGeneration
translogFileGeneration,
replicationCheckpoint
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco
remoteStore.incRef();
try {
// Download segments from remote segment store
indexShard.syncSegmentsFromRemoteSegmentStore(true, true, true);
indexShard.syncSegmentsFromRemoteSegmentStore(true, true);

if (store.directory().listAll().length == 0) {
store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.threadpool.ThreadPool;

import java.io.FileNotFoundException;
Expand Down Expand Up @@ -603,19 +604,20 @@ public boolean containsFile(String localFilename, String checksum) {
* @param segmentFiles segment files that are part of the shard at the time of the latest refresh
* @param segmentInfosSnapshot SegmentInfos bytes to store as part of metadata file
* @param storeDirectory instance of local directory to temporarily create metadata file before upload
* @param primaryTerm primary term to be used in the name of metadata file
* @param translogGeneration translog generation
* @param replicationCheckpoint ReplicationCheckpoint of primary shard
* @throws IOException in case of I/O error while uploading the metadata file
*/
public void uploadMetadata(
Collection<String> segmentFiles,
SegmentInfos segmentInfosSnapshot,
Directory storeDirectory,
long primaryTerm,
long translogGeneration
long translogGeneration,
ReplicationCheckpoint replicationCheckpoint
) throws IOException {
synchronized (this) {
String metadataFilename = MetadataFilenameUtils.getMetadataFilename(
primaryTerm,
replicationCheckpoint.getPrimaryTerm(),
segmentInfosSnapshot.getGeneration(),
translogGeneration,
metadataUploadCounter.incrementAndGet(),
Expand Down Expand Up @@ -646,8 +648,7 @@ public void uploadMetadata(
new RemoteSegmentMetadata(
RemoteSegmentMetadata.fromMapOfStrings(uploadedSegments),
segmentInfoSnapshotByteArray,
primaryTerm,
segmentInfosSnapshot.getGeneration()
replicationCheckpoint
)
);
}
Expand Down
10 changes: 6 additions & 4 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -845,22 +845,24 @@ private void cleanupFiles(Collection<String> filesToConsiderForCleanup, String r
* @param tmpToFileName Map of temporary replication file to actual file name
* @param infosBytes bytes[] of SegmentInfos supposed to be sent over by primary excluding segment_N file
* @param segmentsGen segment generation number
* @param consumer consumer for generated SegmentInfos
* @param finalizeConsumer consumer for action on passed in SegmentInfos
* @param renameConsumer consumer for action on temporary copied over files
* @throws IOException Exception while reading store and building segment infos
*/
public void buildInfosFromBytes(
Map<String, String> tmpToFileName,
byte[] infosBytes,
long segmentsGen,
CheckedConsumer<SegmentInfos, IOException> consumer
CheckedConsumer<SegmentInfos, IOException> finalizeConsumer,
CheckedConsumer<Map<String, String>, IOException> renameConsumer
) throws IOException {
metadataLock.writeLock().lock();
try {
final List<String> values = new ArrayList<>(tmpToFileName.values());
incRefFileDeleter(values);
try {
renameTempFilesSafe(tmpToFileName);
consumer.accept(buildSegmentInfos(infosBytes, segmentsGen));
renameConsumer.accept(tmpToFileName);
finalizeConsumer.accept(buildSegmentInfos(infosBytes, segmentsGen));
} finally {
decRefFileDeleter(values);
}
Expand Down
Loading

0 comments on commit 57eb105

Please sign in to comment.