Skip to content

Commit

Permalink
Avoid duplicate indexing in case of SegRep enabled indices' translog …
Browse files Browse the repository at this point in the history
…replay (opensearch-project#8578)

Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
  • Loading branch information
gbbafna committed Aug 4, 2023
1 parent ec790f5 commit 9314625
Show file tree
Hide file tree
Showing 12 changed files with 323 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,16 @@ public class IndexPrimaryRelocationIT extends OpenSearchIntegTestCase {

private static final int RELOCATION_COUNT = 15;

public void setup() {}

public Settings indexSettings() {
return Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).build();
}

public void testPrimaryRelocationWhileIndexing() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(randomIntBetween(2, 3));
client().admin()
.indices()
.prepareCreate("test")
.setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0))
.setMapping("field", "type=text")
.get();
setup();
client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get();
ensureGreen("test");
AtomicInteger numAutoGenDocs = new AtomicInteger();
final AtomicBoolean finished = new AtomicBoolean(false);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.recovery.IndexPrimaryRelocationIT;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;

import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST)
public class RemoteIndexPrimaryRelocationIT extends IndexPrimaryRelocationIT {

protected static final String REPOSITORY_NAME = "test-remote-store-repo";

protected Path absolutePath;

public void setup() {
absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
);
}

protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(REPOSITORY_NAME, REPOSITORY_NAME, false))
.build();
}

@Override
protected boolean addMockInternalEngine() {
return false;
}

public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.build();
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder()
.put(super.featureFlagSettings())
.put(FeatureFlags.REMOTE_STORE, "true")
.put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true")
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.comparesEqualTo;
import static org.hamcrest.Matchers.comparesEqualTo;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,22 @@
import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.junit.Before;
import org.opensearch.action.admin.indices.close.CloseIndexResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.Locale;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -116,4 +121,63 @@ public void testPromoteReplicaToPrimary() throws Exception {
refresh(indexName);
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), numOfDocs);
}

public void testFailoverWhileIndexing() throws Exception {
internalCluster().startNode();
internalCluster().startNode();
final String indexName = randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
shard_count = scaledRandomIntBetween(1, 5);
createIndex(indexName);
ensureGreen(indexName);
int docCount = scaledRandomIntBetween(20, 50);
final int indexDocAfterFailover = scaledRandomIntBetween(20, 50);
AtomicInteger numAutoGenDocs = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean finished = new AtomicBoolean(false);
Thread indexingThread = new Thread(() -> {
int docsAfterFailover = 0;
while (finished.get() == false && numAutoGenDocs.get() < docCount) {
IndexResponse indexResponse = internalCluster().clusterManagerClient()
.prepareIndex(indexName)
.setSource("field", numAutoGenDocs.get())
.get();

if (indexResponse.status() == RestStatus.CREATED || indexResponse.status() == RestStatus.ACCEPTED) {
numAutoGenDocs.incrementAndGet();
if (numAutoGenDocs.get() == docCount / 2) {
if (random().nextInt(3) == 0) {
refresh(indexName);
} else if (random().nextInt(2) == 0) {
flush(indexName);
}
// Node is killed on this
latch.countDown();
} else if (numAutoGenDocs.get() > docCount / 2) {
docsAfterFailover++;
if (docsAfterFailover == indexDocAfterFailover) {
finished.set(true);
}
}
}
}
logger.debug("Done indexing");
});
indexingThread.start();
latch.await();

ClusterState state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState();
final int numShards = state.metadata().index(indexName).getNumberOfShards();
final ShardRouting primaryShard = state.routingTable().index(indexName).shard(randomIntBetween(0, numShards - 1)).primaryShard();
final DiscoveryNode randomNode = state.nodes().resolveNode(primaryShard.currentNodeId());

// stop the random data node, all remaining shards are promoted to primaries
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(randomNode.getName()));
ensureYellowAndNoInitializingShards(indexName);
indexingThread.join();
refresh(indexName);
assertHitCount(
client(internalCluster().getClusterManagerName()).prepareSearch(indexName).setSize(0).setTrackTotalHits(true).get(),
numAutoGenDocs.get()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,7 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op)
final OpVsLuceneDocStatus status;
VersionValue versionValue = getVersionFromMap(op.uid().bytes());
assert incrementVersionLookup();
boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabled();
if (versionValue != null) {
status = compareOpToVersionMapOnSeqNo(op.id(), op.seqNo(), op.primaryTerm(), versionValue);
} else {
Expand All @@ -813,10 +814,8 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op)
} else if (op.seqNo() > docAndSeqNo.seqNo) {
status = OpVsLuceneDocStatus.OP_NEWER;
} else if (op.seqNo() == docAndSeqNo.seqNo) {
assert localCheckpointTracker.hasProcessed(op.seqNo()) : "local checkpoint tracker is not updated seq_no="
+ op.seqNo()
+ " id="
+ op.id();
assert localCheckpointTracker.hasProcessed(op.seqNo()) || segRepEnabled
: "local checkpoint tracker is not updated seq_no=" + op.seqNo() + " id=" + op.id();
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
} else {
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
Expand Down Expand Up @@ -1018,6 +1017,7 @@ public IndexResult index(Index index) throws IOException {
plan.currentNotFoundOrDeleted
);
}

}
if (index.origin().isFromTranslog() == false) {
final Translog.Location location;
Expand Down Expand Up @@ -1096,10 +1096,18 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO
assert maxSeqNoOfUpdatesOrDeletes < index.seqNo() : index.seqNo() + ">=" + maxSeqNoOfUpdatesOrDeletes;
plan = IndexingStrategy.optimizedAppendOnly(index.version(), 0);
} else {
boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabled();
versionMap.enforceSafeAccess();
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
plan = IndexingStrategy.processAsStaleOp(index.version());
if (segRepEnabled) {
// For segrep based indices, we can't completely rely on localCheckpointTracker
// as the preserved checkpoint may not have all the operations present in lucene
// we don't need to index it again as stale op as it would create multiple documents for same seq no
plan = IndexingStrategy.processButSkipLucene(false, index.version());
} else {
plan = IndexingStrategy.processAsStaleOp(index.version());
}
} else {
plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.version(), 0);
}
Expand Down Expand Up @@ -1533,9 +1541,17 @@ protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws
// See testRecoveryWithOutOfOrderDelete for an example of peer recovery
plan = DeletionStrategy.processButSkipLucene(false, delete.version());
} else {
boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabled();
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
plan = DeletionStrategy.processAsStaleOp(delete.version());
if (segRepEnabled) {
// For segrep based indices, we can't completely rely on localCheckpointTracker
// as the preserved checkpoint may not have all the operations present in lucene
// we don't need to index it again as stale op as it would create multiple documents for same seq no
plan = DeletionStrategy.processButSkipLucene(false, delete.version());
} else {
plan = DeletionStrategy.processAsStaleOp(delete.version());
}
} else {
plan = DeletionStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, delete.version(), 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
Expand All @@ -50,8 +50,6 @@
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.Constants;
import org.junit.Assert;
import org.opensearch.common.io.PathUtils;
import org.opensearch.core.Assertions;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.action.ActionListener;
Expand All @@ -77,11 +75,12 @@
import org.opensearch.common.Randomness;
import org.opensearch.common.Strings;
import org.opensearch.common.UUIDs;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
Expand All @@ -90,8 +89,9 @@
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.core.Assertions;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentBuilder;
Expand All @@ -106,8 +106,8 @@
import org.opensearch.index.engine.EngineTestCase;
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.engine.InternalEngineFactory;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.engine.NRTReplicationEngine;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.engine.ReadOnlyEngine;
import org.opensearch.index.fielddata.FieldDataStats;
import org.opensearch.index.fielddata.IndexFieldData;
Expand Down Expand Up @@ -167,6 +167,7 @@
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
Expand All @@ -191,7 +192,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.Collection;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.containsInAnyOrder;
Expand All @@ -216,8 +217,8 @@
import static org.mockito.Mockito.mock;
import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.opensearch.common.lucene.Lucene.cleanLuceneIndex;
import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS;
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.opensearch.test.hamcrest.RegexMatcher.matches;

Expand Down Expand Up @@ -2824,13 +2825,14 @@ public void testCommitLevelRestoreShardFromRemoteStore() throws IOException {
}

public void testRestoreShardFromRemoteStore(boolean performFlush) throws IOException {
String remoteStorePath = createTempDir().toString();
IndexShard target = newStartedShard(
true,
Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, "temp-fs")
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "temp-fs")
.put(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, remoteStorePath + "__test")
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, remoteStorePath + "__test")
.build(),
new InternalEngineFactory()
);
Expand Down Expand Up @@ -2895,7 +2897,6 @@ public void testRestoreShardFromRemoteStore(boolean performFlush) throws IOExcep
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
target.restoreFromRemoteStore(future);
target.remoteStore().decRef();

assertTrue(future.actionGet());
assertDocs(target, "1", "2");

Expand Down
Loading

0 comments on commit 9314625

Please sign in to comment.