Skip to content

Commit

Permalink
PR feedback.
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Aug 10, 2023
1 parent 506bc13 commit 488404b
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1419,4 +1419,23 @@ public void testIndexWhileRecoveringReplica() throws Exception {
.get();
assertNoFailures(response);
}

public void testRestartPrimary_NoReplicas() throws Exception {
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellow(INDEX_NAME);

assertEquals(getNodeContainingPrimaryShard().getName(), primary);

client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
if (randomBoolean()) {
flush(INDEX_NAME);
} else {
refresh(INDEX_NAME);
}

internalCluster().restartNode(primary);
ensureYellow(INDEX_NAME);
assertDocCounts(1, primary);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.junit.After;
import org.junit.Before;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.replication.SegmentReplicationIT;
Expand Down Expand Up @@ -68,23 +67,4 @@ public void teardown() {
public void testPressureServiceStats() throws Exception {
super.testPressureServiceStats();
}

public void testRestartPrimary_NoReplicas() throws Exception {
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellow(INDEX_NAME);

assertEquals(getNodeContainingPrimaryShard().getName(), primary);

client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
if (randomBoolean()) {
flush(INDEX_NAME);
} else {
refresh(INDEX_NAME);
}

internalCluster().restartNode(primary);
ensureYellow(INDEX_NAME);
assertDocCounts(1, primary);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,8 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied
from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is
used to generate new segment file names. The ideal solution is to identify the counter from previous primary.
This is not required for remote store implementations given on failover the replica re-syncs with the store
during promotion.
*/
if (engineConfig.getIndexSettings().isRemoteStoreEnabled() == false) {
latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ public void getCheckpointMetadata(
// TODO: Need to figure out a way to pass this information for segment metadata via remote store.
try (final GatedCloseable<SegmentInfos> segmentInfosSnapshot = indexShard.getSegmentInfosSnapshot()) {
final Version version = segmentInfosSnapshot.get().getCommitLuceneVersion();
RemoteSegmentMetadata mdFile = remoteDirectory.init(); // During initial recovery flow, the remote store might not
// have metadata as primary hasn't uploaded anything yet.
RemoteSegmentMetadata mdFile = remoteDirectory.init();
// During initial recovery flow, the remote store might not
// have metadata as primary hasn't uploaded anything yet.
if (mdFile == null && indexShard.state().equals(IndexShardState.STARTED) == false) {
listener.onResponse(new CheckpointInfoResponse(checkpoint, Collections.emptyMap(), null));
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,33 +252,6 @@ public void testReplicaCommitsInfosBytesOnRecovery() throws Exception {
}
}

public void testPrimaryRestart() throws Exception {
final Path remotePath = createTempDir();
try (ReplicationGroup shards = createGroup(0, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory(), remotePath)) {
shards.startAll();
// ensure primary has uploaded something
shards.indexDocs(10);
IndexShard primary = shards.getPrimary();
if (randomBoolean()) {
flushShard(primary);
} else {
primary.refresh("test");
}
assertDocCount(primary, 10);
// get a metadata map - we'll use segrep diff to ensure segments on reader are identical after restart.
final Map<String, StoreFileMetadata> metadataBeforeRestart = primary.getSegmentMetadataMap();
// restart the primary
shards.reinitPrimaryShard(remotePath);
// the store is open at this point but the shard has not yet run through recovery
primary = shards.getPrimary();
shards.startPrimary();
assertDocCount(primary, 10);
final Store.RecoveryDiff diff = Store.segmentReplicationDiff(metadataBeforeRestart, primary.getSegmentMetadataMap());
assertTrue(diff.missing.isEmpty());
assertTrue(diff.different.isEmpty());
}
}

public void testPrimaryRestart_PrimaryHasExtraCommits() throws Exception {
final Path remotePath = createTempDir();
try (ReplicationGroup shards = createGroup(0, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory(), remotePath)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -774,6 +775,33 @@ public void testNoDuplicateSeqNo() throws Exception {
}
}

public void testPrimaryRestart() throws Exception {
final Path remotePath = createTempDir();
try (ReplicationGroup shards = createGroup(0, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory(), remotePath)) {
shards.startAll();
// ensure primary has uploaded something
shards.indexDocs(10);
IndexShard primary = shards.getPrimary();
if (randomBoolean()) {
flushShard(primary);
} else {
primary.refresh("test");
}
assertDocCount(primary, 10);
// get a metadata map - we'll use segrep diff to ensure segments on reader are identical after restart.
final Map<String, StoreFileMetadata> metadataBeforeRestart = primary.getSegmentMetadataMap();
// restart the primary
shards.reinitPrimaryShard(remotePath);
// the store is open at this point but the shard has not yet run through recovery
primary = shards.getPrimary();
shards.startPrimary();
assertDocCount(primary, 10);
final Store.RecoveryDiff diff = Store.segmentReplicationDiff(metadataBeforeRestart, primary.getSegmentMetadataMap());
assertTrue(diff.missing.isEmpty());
assertTrue(diff.different.isEmpty());
}
}

/**
* Assert persisted and searchable doc counts. This method should not be used while docs are concurrently indexed because
* it asserts point in time seqNos are relative to the doc counts.
Expand Down

0 comments on commit 488404b

Please sign in to comment.