Skip to content

Commit

Permalink
[Segment Replication] Refactor file cleanup logic and fix PIT/Scroll …
Browse files Browse the repository at this point in the history
…with remote store. (opensearch-project#9111)

* Remove divergent commit logic with segment replication.

This change removes divergent commit paths for segrep node-node and remote store.
All replicas with segrep enabled will perform local commits and ignore any incoming segments_n file.
This changes the recovery sync with remote store to also exclude the segments_n so that only the fetched infos bytes are committed before
an engine is opened.
This change also updates deletion logic with segment replication to automatically delete when a file is decref'd to 0.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Add more NRTReplicationEngineTests.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Ensure old commit files are wiped on remote store sync before we commit a new segmentInfos.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Add more shard level tests.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Add test ensuring commits are cleaned up on replicas.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Self review.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Use refresh level sync before recovery

Signed-off-by: Marc Handalian <handalm@amazon.com>

* PR feedback.

Signed-off-by: Marc Handalian <handalm@amazon.com>

---------

Signed-off-by: Marc Handalian <handalm@amazon.com>
Signed-off-by: Shivansh Arora <hishiv@amazon.com>
  • Loading branch information
mch2 authored and shiv0408 committed Apr 25, 2024
1 parent 5276e21 commit b8cde12
Show file tree
Hide file tree
Showing 18 changed files with 593 additions and 346 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ public void testIndexReopenClose() throws Exception {
}

public void testScrollWithConcurrentIndexAndSearch() throws Exception {
assumeFalse("Skipping the test with Remote store as its flaky.", segmentReplicationWithRemoteEnabled());
final String primary = internalCluster().startDataOnlyNode();
final String replica = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
Expand Down Expand Up @@ -657,7 +656,6 @@ public void testDeleteOperations() throws Exception {
* from xlog.
*/
public void testReplicationPostDeleteAndForceMerge() throws Exception {
assumeFalse("Skipping the test with Remote store as its flaky.", segmentReplicationWithRemoteEnabled());
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
final String replica = internalCluster().startDataOnlyNode();
Expand Down Expand Up @@ -966,7 +964,6 @@ private void assertAllocationIdsInReplicaShardStats(Set<String> expected, Set<Se
* @throws Exception when issue is encountered
*/
public void testScrollCreatedOnReplica() throws Exception {
assumeFalse("Skipping the test with Remote store as its flaky.", segmentReplicationWithRemoteEnabled());
// create the cluster with one primary node containing primary shard and replica node containing replica shard
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
Expand Down Expand Up @@ -1059,8 +1056,9 @@ public void testScrollCreatedOnReplica() throws Exception {
* @throws Exception when issue is encountered
*/
public void testScrollWithOngoingSegmentReplication() throws Exception {
// this test stubs transport calls specific to node-node replication.
assumeFalse(
"Skipping the test as its not compatible with segment replication with remote store yet.",
"Skipping the test as its not compatible with segment replication with remote store.",
segmentReplicationWithRemoteEnabled()
);

Expand Down Expand Up @@ -1188,10 +1186,6 @@ public void testScrollWithOngoingSegmentReplication() throws Exception {
}

public void testPitCreatedOnReplica() throws Exception {
assumeFalse(
"Skipping the test as it is flaky with remote store. Tracking issue https://github.com/opensearch-project/OpenSearch/issues/8850",
segmentReplicationWithRemoteEnabled()
);
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
Expand Down Expand Up @@ -1425,4 +1419,23 @@ public void testIndexWhileRecoveringReplica() throws Exception {
.get();
assertNoFailures(response);
}

public void testRestartPrimary_NoReplicas() throws Exception {
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellow(INDEX_NAME);

assertEquals(getNodeContainingPrimaryShard().getName(), primary);

client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
if (randomBoolean()) {
flush(INDEX_NAME);
} else {
refresh(INDEX_NAME);
}

internalCluster().restartNode(primary);
ensureYellow(INDEX_NAME);
assertDocCounts(1, primary);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,8 @@ private void testPeerRecovery(int numberOfIterations, boolean invokeFlush) throw
.filter(rs -> rs.getRecoverySource().getType() == RecoverySource.Type.PEER)
.findFirst();
assertFalse(recoverySource.isEmpty());
if (numberOfIterations == 1 && invokeFlush) {
// segments_N file is copied to new replica
assertEquals(1, recoverySource.get().getIndex().recoveredFileCount());
} else {
assertEquals(0, recoverySource.get().getIndex().recoveredFileCount());
}
// segments_N file is copied to new replica
assertEquals(1, recoverySource.get().getIndex().recoveredFileCount());

IndexResponse response = indexSingleDoc(INDEX_NAME);
assertEquals(indexStats.get(MAX_SEQ_NO_TOTAL) + 1, response.getSeqNo());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ public class NRTReplicationEngine extends Engine {
private final CompletionStatsCache completionStatsCache;
private final LocalCheckpointTracker localCheckpointTracker;
private final WriteOnlyTranslogManager translogManager;
private final boolean shouldCommit;
protected final ReplicaFileTracker replicaFileTracker;

private volatile long lastReceivedGen = SequenceNumbers.NO_OPS_PERFORMED;
private volatile long lastReceivedPrimaryGen = SequenceNumbers.NO_OPS_PERFORMED;

private static final int SI_COUNTER_INCREMENT = 10;

Expand All @@ -69,7 +69,12 @@ public NRTReplicationEngine(EngineConfig engineConfig) {
NRTReplicationReaderManager readerManager = null;
WriteOnlyTranslogManager translogManagerRef = null;
try {
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
this.replicaFileTracker = new ReplicaFileTracker(store::deleteQuiet);
this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
// always protect latest commit on disk.
replicaFileTracker.incRef(this.lastCommittedSegmentInfos.files(true));
// cleanup anything not referenced by the latest infos.
cleanUnreferencedFiles();
readerManager = buildReaderManager();
final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(
this.lastCommittedSegmentInfos.getUserData().entrySet()
Expand All @@ -85,7 +90,7 @@ public NRTReplicationEngine(EngineConfig engineConfig) {
for (ReferenceManager.RefreshListener listener : engineConfig.getInternalRefreshListener()) {
this.readerManager.addListener(listener);
}
final Map<String, String> userData = store.readLastCommittedSegmentsInfo().getUserData();
final Map<String, String> userData = this.lastCommittedSegmentInfos.getUserData();
final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY));
translogManagerRef = new WriteOnlyTranslogManager(
engineConfig.getTranslogConfig(),
Expand Down Expand Up @@ -116,18 +121,21 @@ public void onAfterTranslogSync() {
engineConfig.getPrimaryModeSupplier()
);
this.translogManager = translogManagerRef;
this.shouldCommit = engineConfig.getIndexSettings().isRemoteStoreEnabled() == false;
} catch (IOException e) {
IOUtils.closeWhileHandlingException(store::decRef, readerManager, translogManagerRef);
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
}
}

public void cleanUnreferencedFiles() throws IOException {
replicaFileTracker.deleteUnreferencedFiles(store.directory().listAll());
}

private NRTReplicationReaderManager buildReaderManager() throws IOException {
return new NRTReplicationReaderManager(
OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId),
store::incRefFileDeleter,
store::decRefFileDeleter
replicaFileTracker::incRef,
replicaFileTracker::decRef
);
}

Expand All @@ -143,34 +151,36 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep
final long maxSeqNo = Long.parseLong(infos.userData.get(MAX_SEQ_NO));
final long incomingGeneration = infos.getGeneration();
readerManager.updateSegments(infos);

// Commit and roll the translog when we receive a different generation than what was last received.
// lower/higher gens are possible from a new primary that was just elected.
if (incomingGeneration != lastReceivedGen) {
// Ensure that we commit and clear the local translog if a new commit has been made on the primary.
// We do not compare against the last local commit gen here because it is possible to receive
// a lower gen from a newly elected primary shard that is behind this shard's last commit gen.
// In that case we still commit into the next local generation.
if (incomingGeneration != this.lastReceivedPrimaryGen) {
commitSegmentInfos();
translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(maxSeqNo);
translogManager.rollTranslogGeneration();
}
lastReceivedGen = incomingGeneration;
this.lastReceivedPrimaryGen = incomingGeneration;
localCheckpointTracker.fastForwardProcessedSeqNo(maxSeqNo);
}
}

/**
* Persist the latest live SegmentInfos.
*
* This method creates a commit point from the latest SegmentInfos. It is intended to be used when this shard is about to be promoted as the new primary.
*
* TODO: If this method is invoked while the engine is currently updating segments on its reader, wait for that update to complete so the updated segments are used.
*
* This method creates a commit point from the latest SegmentInfos.
*
* @throws IOException - When there is an IO error committing the SegmentInfos.
*/
private void commitSegmentInfos(SegmentInfos infos) throws IOException {
if (shouldCommit) {
store.commitSegmentInfos(infos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint());
}
// get a reference to the previous commit files so they can be decref'd once a new commit is made.
final Collection<String> previousCommitFiles = getLastCommittedSegmentInfos().files(true);
store.commitSegmentInfos(infos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint());
this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
// incref the latest on-disk commit.
replicaFileTracker.incRef(this.lastCommittedSegmentInfos.files(true));
// decref the prev commit.
replicaFileTracker.decRef(previousCommitFiles);
translogManager.syncTranslog();
}

Expand Down Expand Up @@ -379,21 +389,19 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread()
: "Either the write lock must be held or the engine must be currently be failing itself";
try {
// if remote store is enabled, all segments durably persisted
if (shouldCommit) {
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
/*
This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied
from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is
used to generate new segment file names. The ideal solution is to identify the counter from previous primary.
*/
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
/*
This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied
from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is
used to generate new segment file names. The ideal solution is to identify the counter from previous primary.
This is not required for remote store implementations given on failover the replica re-syncs with the store
during promotion.
*/
if (engineConfig.getIndexSettings().isRemoteStoreEnabled() == false) {
latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT;
latestSegmentInfos.changed();
commitSegmentInfos(latestSegmentInfos);
} else {
store.directory().sync(List.of(store.directory().listAll()));
store.directory().syncMetaData();
}
commitSegmentInfos(latestSegmentInfos);
IOUtils.close(readerManager, translogManager, store::decRef);
} catch (Exception e) {
logger.warn("failed to close engine", e);
Expand Down Expand Up @@ -453,8 +461,8 @@ public synchronized GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
// incref all files
try {
final Collection<String> files = latestSegmentInfos.files(false);
store.incRefFileDeleter(files);
return new GatedCloseable<>(latestSegmentInfos, () -> store.decRefFileDeleter(files));
replicaFileTracker.incRef(files);
return new GatedCloseable<>(latestSegmentInfos, () -> { replicaFileTracker.decRef(files); });
} catch (IOException e) {
throw new EngineException(shardId, e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.engine;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;

/**
* This class is heavily influenced by Lucene's ReplicaFileDeleter class used to keep track of
* segment files that should be preserved on replicas between replication events.
*
* https://github.com/apache/lucene/blob/main/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaFileDeleter.java
*
* @opensearch.internal
*/
final class ReplicaFileTracker {

public static final Logger logger = LogManager.getLogger(ReplicaFileTracker.class);
private final Map<String, Integer> refCounts = new HashMap<>();
private final BiConsumer<String, String> fileDeleter;
private final Set<String> EXCLUDE_FILES = Set.of("write.lock");

public ReplicaFileTracker(BiConsumer<String, String> fileDeleter) {
this.fileDeleter = fileDeleter;
}

public synchronized void incRef(Collection<String> fileNames) {
for (String fileName : fileNames) {
refCounts.merge(fileName, 1, Integer::sum);
}
}

public synchronized int refCount(String file) {
return Optional.ofNullable(refCounts.get(file)).orElse(0);
}

public synchronized void decRef(Collection<String> fileNames) {
Set<String> toDelete = new HashSet<>();
for (String fileName : fileNames) {
Integer curCount = refCounts.get(fileName);
assert curCount != null : "fileName=" + fileName;
assert curCount > 0;
if (curCount == 1) {
refCounts.remove(fileName);
toDelete.add(fileName);
} else {
refCounts.put(fileName, curCount - 1);
}
}
if (toDelete.isEmpty() == false) {
delete(toDelete);
}
}

public void deleteUnreferencedFiles(String... toDelete) {
for (String file : toDelete) {
if (canDelete(file)) {
delete(file);
}
}
}

private synchronized void delete(Collection<String> toDelete) {
for (String fileName : toDelete) {
delete(fileName);
}
}

private synchronized void delete(String fileName) {
assert canDelete(fileName);
fileDeleter.accept("delete unreferenced", fileName);
}

private synchronized boolean canDelete(String fileName) {
return EXCLUDE_FILES.contains(fileName) == false && refCounts.containsKey(fileName) == false;
}

}
35 changes: 21 additions & 14 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.ThreadInterruptedException;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.cluster.metadata.DataStream;
import org.opensearch.core.Assertions;
import org.opensearch.ExceptionsHelper;
Expand Down Expand Up @@ -4657,7 +4656,13 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
RemoteSegmentMetadata remoteSegmentMetadata = remoteDirectory.init();

Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegments = remoteDirectory
.getSegmentsUploadedToRemoteStore();
.getSegmentsUploadedToRemoteStore()
.entrySet()
.stream()
// if this is a refresh level sync, ignore any segments_n uploaded to the store, we will commit the received infos bytes
// locally.
.filter(entry -> refreshLevelSegmentSync && entry.getKey().startsWith(IndexFileNames.SEGMENTS) == false)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
store.incRef();
remoteStore.incRef();
try {
Expand All @@ -4678,19 +4683,21 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal);

if (refreshLevelSegmentSync && remoteSegmentMetadata != null) {
try (
ChecksumIndexInput indexInput = new BufferedChecksumIndexInput(
new ByteArrayIndexInput("Snapshot of SegmentInfos", remoteSegmentMetadata.getSegmentInfosBytes())
);
) {
SegmentInfos infosSnapshot = SegmentInfos.readCommit(
store.directory(),
indexInput,
remoteSegmentMetadata.getGeneration()
);
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
final SegmentInfos infosSnapshot = store.buildSegmentInfos(
remoteSegmentMetadata.getSegmentInfosBytes(),
remoteSegmentMetadata.getGeneration()
);
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
// delete any other commits, we want to start the engine only from a new commit made with the downloaded infos bytes.
// Extra segments will be wiped on engine open.
for (String file : List.of(store.directory().listAll())) {
if (file.startsWith(IndexFileNames.SEGMENTS)) {
store.deleteQuiet(file);
}
}
assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty()
: "There should not be any segments file in the dir";
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
}
} catch (IOException e) {
throw new IndexShardRecoveryException(shardId, "Exception while copying segment files from remote segment store", e);
Expand Down
Loading

0 comments on commit b8cde12

Please sign in to comment.