Skip to content

Commit

Permalink
PR Feedback.
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Aug 2, 2023
1 parent ad3d7bb commit 804c203
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,6 @@ public void updateShardState(
}

replicationTracker.activatePrimaryMode(getLocalCheckpoint());

if (indexSettings.isSegRepEnabled()) {
// force publish a checkpoint once in primary mode so that replicas not caught up to previous primary
// are brought up to date.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -385,44 +386,53 @@ public void testGetSegmentInfosSnapshotPreservesFilesUntilRelease() throws Excep
final Store nrtEngineStore = createStore(REMOTE_STORE_INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, settings)
) {
List<Engine.Operation> operations = generateHistoryOnReplica(between(5, 10), randomBoolean(), randomBoolean(), randomBoolean());
// only index 2 docs here, this will create segments _0 and _1 and after forcemerge into _2.
final int docCount = 2;
List<Engine.Operation> operations = generateHistoryOnReplica(docCount, randomBoolean(), randomBoolean(), randomBoolean());
for (Engine.Operation op : operations) {
applyOperation(engine, op);
applyOperation(nrtEngine, op);
// refresh to create a lot of segments.
engine.refresh("test");
}
assertEquals(2, engine.segmentsStats(false, false).getCount());
// wipe the nrt directory initially so we can sync with primary.
Lucene.cleanLuceneIndex(nrtEngineStore.directory());
assertFalse(
Arrays.stream(nrtEngineStore.directory().listAll())
.anyMatch(file -> file.equals("write.lock") == false && file.equals("extra0") == false)
);
for (String file : engine.getLatestSegmentInfos().files(true)) {
nrtEngineStore.directory().copyFrom(store.directory(), file, file, IOContext.DEFAULT);
}
nrtEngine.updateSegments(engine.getLatestSegmentInfos());
assertEquals(engine.getLatestSegmentInfos(), nrtEngine.getLatestSegmentInfos());
final GatedCloseable<SegmentInfos> snapshot = nrtEngine.getSegmentInfosSnapshot();
final Collection<String> replica_snapshotFiles = snapshot.get().files(false);
final Collection<String> replicaSnapshotFiles = snapshot.get().files(false);
List<String> replicaFiles = List.of(nrtEngine.store.directory().listAll());

// merge primary down to 1 segment
engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID());
final Collection<String> files = engine.getLatestSegmentInfos().files(false);
// we expect a 3rd segment to be created after merge.
assertEquals(3, engine.segmentsStats(false, false).getCount());
final Collection<String> latestPrimaryFiles = engine.getLatestSegmentInfos().files(false);

// copy new segments in and load reader.
for (String file : files) {
for (String file : latestPrimaryFiles) {
if (replicaFiles.contains(file) == false) {
nrtEngineStore.directory().copyFrom(store.directory(), file, file, IOContext.DEFAULT);
}
}
nrtEngine.updateSegments(engine.getLatestSegmentInfos());

replicaFiles = List.of(nrtEngine.store.directory().listAll());
assertTrue(replicaFiles.containsAll(replica_snapshotFiles));
assertTrue(replicaFiles.containsAll(replicaSnapshotFiles));

// close snapshot, files should be cleaned up
snapshot.close();

replicaFiles = List.of(nrtEngine.store.directory().listAll());
assertFalse(replicaFiles.containsAll(replica_snapshotFiles));
assertFalse(replicaFiles.containsAll(replicaSnapshotFiles));

// Ensure we still have all the active files. Note - we exclude the infos file here if we aren't committing
// the nrt reader will still reference segments_n-1 after being loaded until a local commit occurs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void getSegmentFiles(
}
};
when(sourceFactory.get(any())).thenReturn(source);
startReplicationAndAssertCancellation(replica, targetService);
startReplicationAndAssertCancellation(replica, primary, targetService);

shards.removeReplica(replica);
closeShards(replica);
Expand Down Expand Up @@ -137,7 +137,7 @@ public void getSegmentFiles(
}
};
when(sourceFactory.get(any())).thenReturn(source);
startReplicationAndAssertCancellation(replica, targetService);
startReplicationAndAssertCancellation(replica, primary, targetService);

shards.removeReplica(replica);
closeShards(replica);
Expand Down Expand Up @@ -189,7 +189,7 @@ public void cancel() {
}
};
when(sourceFactory.get(any())).thenReturn(source);
startReplicationAndAssertCancellation(replica, targetService);
startReplicationAndAssertCancellation(replica, primary, targetService);

shards.removeReplica(replica);
closeShards(replica);
Expand Down Expand Up @@ -227,7 +227,7 @@ public void getSegmentFiles(
) {}
};
when(sourceFactory.get(any())).thenReturn(source);
startReplicationAndAssertCancellation(replica, targetService);
startReplicationAndAssertCancellation(replica, primary, targetService);

shards.removeReplica(replica);
closeShards(replica);
Expand Down Expand Up @@ -275,7 +275,7 @@ public void getSegmentFiles(
}
};
when(sourceFactory.get(any())).thenReturn(source);
startReplicationAndAssertCancellation(nextPrimary, targetService);
startReplicationAndAssertCancellation(nextPrimary, oldPrimary, targetService);
// wait for replica to finish being promoted, and assert doc counts.
final CountDownLatch latch = new CountDownLatch(1);
nextPrimary.acquirePrimaryOperationPermit(new ActionListener<>() {
Expand Down Expand Up @@ -422,7 +422,11 @@ public void testTemporaryFilesNotCleanup() throws Exception {
runnablePostGetFiles
);
when(sourceFactory.get(any())).thenReturn(segmentReplicationSource);
targetService.startReplication(replica, getTargetListener(primaryShard, replica, primaryMetadata, countDownLatch));
targetService.startReplication(
replica,
primaryShard.getLatestReplicationCheckpoint(),
getTargetListener(primaryShard, replica, primaryMetadata, countDownLatch)
);
countDownLatch.await(30, TimeUnit.SECONDS);
assertEquals("Replication failed", 0, countDownLatch.getCount());
shards.assertAllEqual(numDocs);
Expand Down

0 comments on commit 804c203

Please sign in to comment.