diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 400d76c3955bb..d49aa3efa1852 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -514,7 +514,8 @@ private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecovery flush(false, true); translog.trimUnreferencedReaders(); } - // Package private for testing purposes only + + // Package private for testing purposes only boolean hasSnapshottedCommits() { return combinedDeletionPolicy.hasSnapshottedCommits(); } @@ -543,7 +544,6 @@ public long getWritingBytes() { return indexWriter.getFlushingBytes() + versionMap.getRefreshingBytes(); } - private ExternalReaderManager createReaderManager(RefreshWarmerListener externalRefreshListener) throws EngineException { boolean success = false; OpenSearchReaderManager internalReaderManager = null; diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index f2ea501c634b9..b8c06ccd9e7e6 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -50,25 +50,26 @@ public NRTReplicationEngine(EngineConfig engineConfig) { try { lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); readerManager = new NRTReplicationReaderManager(OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId)); - final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(this.lastCommittedSegmentInfos.getUserData().entrySet()); + final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit( + this.lastCommittedSegmentInfos.getUserData().entrySet() + ); this.localCheckpointTracker = new LocalCheckpointTracker(commitInfo.maxSeqNo, commitInfo.localCheckpoint); this.completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats")); this.readerManager.addListener(completionStatsCache); } catch (IOException e) { - IOUtils.closeWhileHandlingException(store::decRef, translog); + IOUtils.closeWhileHandlingException(store::decRef, translog); throw new EngineCreationFailureException(shardId, "failed to create engine", e); } } - public synchronized void updateSegments(final SegmentInfos infos, long seqNo) - throws IOException { + public synchronized void updateSegments(final SegmentInfos infos, long seqNo) throws IOException { try { store.incRef(); // Update the current infos reference on the Engine's reader. readerManager.updateSegments(infos); // only update the persistedSeqNo and "lastCommitted" infos reference if the incoming segments have a higher - // generation. We can still refresh with incoming SegmentInfos that are not part of a commit point. + // generation. We can still refresh with incoming SegmentInfos that are not part of a commit point. if (infos.getGeneration() > lastCommittedSegmentInfos.getGeneration()) { this.lastCommittedSegmentInfos = infos; localCheckpointTracker.fastForwardPersistedSeqNo(seqNo); @@ -106,12 +107,7 @@ public boolean isThrottled() { @Override public IndexResult index(Index index) throws IOException { - IndexResult indexResult = new IndexResult( - index.version(), - index.primaryTerm(), - index.seqNo(), - false - ); + IndexResult indexResult = new IndexResult(index.version(), index.primaryTerm(), index.seqNo(), false); addIndexOperationToTranslog(index, indexResult); indexResult.setTook(System.nanoTime() - index.startTime()); indexResult.freeze(); @@ -121,12 +117,7 @@ public IndexResult index(Index index) throws IOException { @Override public DeleteResult delete(Delete delete) throws IOException { - DeleteResult deleteResult = new DeleteResult( - delete.version(), - delete.primaryTerm(), - delete.seqNo(), - true - ); + DeleteResult deleteResult = new DeleteResult(delete.version(), delete.primaryTerm(), delete.seqNo(), true); addDeleteOperationToTranslog(delete, deleteResult); deleteResult.setTook(System.nanoTime() - delete.startTime()); deleteResult.freeze(); @@ -161,7 +152,13 @@ public Closeable acquireHistoryRetentionLock() { } @Override - public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo, boolean requiredFullRange, boolean accurateCount) throws IOException { + public Translog.Snapshot newChangesSnapshot( + String source, + long fromSeqNo, + long toSeqNo, + boolean requiredFullRange, + boolean accurateCount + ) throws IOException { throw new UnsupportedOperationException("Not implemented"); } @@ -170,7 +167,6 @@ public int countNumberOfHistoryOperations(String source, long fromSeqNo, long to return 0; } - @Override public boolean hasCompleteOperationHistory(String reason, long startingSeqNo) { return false; @@ -229,9 +225,15 @@ public boolean shouldPeriodicallyFlush() { @Override public void flush(boolean force, boolean waitIfOngoing) throws EngineException {} - @Override - public void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade, boolean upgradeOnlyAncientSegments, String forceMergeUUID) throws EngineException, IOException {} + public void forceMerge( + boolean flush, + int maxNumSegments, + boolean onlyExpungeDeletes, + boolean upgrade, + boolean upgradeOnlyAncientSegments, + String forceMergeUUID + ) throws EngineException, IOException {} @Override public GatedCloseable acquireLastIndexCommit(boolean flushFirst) throws EngineException { @@ -323,9 +325,6 @@ protected SegmentInfos getLatestSegmentInfos() { private DirectoryReader getDirectoryReader() throws IOException { // for segment replication: replicas should create the reader from store, we don't want an open IW on replicas. - return new SoftDeletesDirectoryReaderWrapper( - DirectoryReader.open(store.directory()), - Lucene.SOFT_DELETES_FIELD - ); + return new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(store.directory()), Lucene.SOFT_DELETES_FIELD); } } diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java index a97f3b191d53c..e8bbc28cf077b 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java @@ -18,7 +18,6 @@ import org.apache.lucene.index.StandardDirectoryReader; import org.opensearch.common.lucene.Lucene; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; -import org.opensearch.index.shard.ShardId; import java.io.IOException; import java.util.ArrayList; diff --git a/server/src/main/java/org/opensearch/index/engine/TranslogAwareEngine.java b/server/src/main/java/org/opensearch/index/engine/TranslogAwareEngine.java index 9f3c5d2c13b08..4de0185b0e217 100644 --- a/server/src/main/java/org/opensearch/index/engine/TranslogAwareEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/TranslogAwareEngine.java @@ -47,11 +47,14 @@ protected TranslogAwareEngine(EngineConfig engineConfig) { customTranslogDeletionPolicy = engineConfig.getCustomTranslogDeletionPolicyFactory() .create(engineConfig.getIndexSettings(), engineConfig.retentionLeasesSupplier()); } - translogDeletionPolicy = Objects.requireNonNullElseGet(customTranslogDeletionPolicy, () -> new DefaultTranslogDeletionPolicy( - engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), - engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(), - engineConfig.getIndexSettings().getTranslogRetentionTotalFiles() - )); + translogDeletionPolicy = Objects.requireNonNullElseGet( + customTranslogDeletionPolicy, + () -> new DefaultTranslogDeletionPolicy( + engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), + engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(), + engineConfig.getIndexSettings().getTranslogRetentionTotalFiles() + ) + ); try { store.trimUnsafeCommits(engineConfig.getTranslogConfig().getTranslogPath()); translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier(), seqNo -> { diff --git a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTest.java b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTest.java index 3e74faa84249a..df9ca935658a0 100644 --- a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTest.java +++ b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTest.java @@ -10,11 +10,9 @@ import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.SegmentInfos; -import org.apache.lucene.tests.store.BaseDirectoryWrapper; import org.hamcrest.MatcherAssert; import org.opensearch.common.lucene.Lucene; import org.opensearch.common.lucene.search.Queries; -import org.opensearch.core.internal.io.IOUtils; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.store.Store; import org.opensearch.index.translog.Translog; @@ -29,8 +27,10 @@ public class NRTReplicationEngineTest extends EngineTestCase { public void testCreateEngine() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - try (final Store nrtEngineStore = createStore(); - final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore);) { + try ( + final Store nrtEngineStore = createStore(); + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore); + ) { final SegmentInfos latestSegmentInfos = nrtEngine.getLatestSegmentInfos(); final SegmentInfos lastCommittedSegmentInfos = nrtEngine.getLastCommittedSegmentInfos(); assertEquals(latestSegmentInfos.version, lastCommittedSegmentInfos.version); @@ -44,9 +44,16 @@ public void testCreateEngine() throws IOException { public void testEngineWritesOpsToTranslog() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - try (final Store nrtEngineStore = createStore(); - final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore);) { - List operations = generateHistoryOnReplica(between(1, 500), randomBoolean(), randomBoolean(), randomBoolean()); + try ( + final Store nrtEngineStore = createStore(); + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore); + ) { + List operations = generateHistoryOnReplica( + between(1, 500), + randomBoolean(), + randomBoolean(), + randomBoolean() + ); for (Engine.Operation op : operations) { applyOperation(engine, op); applyOperation(nrtEngine, op); @@ -70,10 +77,15 @@ public void testEngineWritesOpsToTranslog() throws Exception { public void testUpdateSegments() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - try (final Store nrtEngineStore = createStore(); - final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore);) { + try ( + final Store nrtEngineStore = createStore(); + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore); + ) { // add docs to the primary engine. - List operations = generateHistoryOnReplica(between(1, 500), randomBoolean(), randomBoolean(), randomBoolean()).stream().filter(op -> op.operationType().equals(Engine.Operation.TYPE.INDEX)).collect(Collectors.toList()); + List operations = generateHistoryOnReplica(between(1, 500), randomBoolean(), randomBoolean(), randomBoolean()) + .stream() + .filter(op -> op.operationType().equals(Engine.Operation.TYPE.INDEX)) + .collect(Collectors.toList()); for (Engine.Operation op : operations) { applyOperation(engine, op); applyOperation(nrtEngine, op); @@ -116,7 +128,15 @@ private void assertSearcherHits(Engine engine, int hits) { private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint, Store store) throws IOException { Lucene.cleanLuceneIndex(store.directory()); final Path translogDir = createTempDir(); - final EngineConfig replicaConfig = config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get); + final EngineConfig replicaConfig = config( + defaultSettings, + store, + translogDir, + NoMergePolicy.INSTANCE, + null, + null, + globalCheckpoint::get + ); if (Lucene.indexExists(store.directory()) == false) { store.createEmpty(replicaConfig.getIndexSettings().getIndexVersionCreated().luceneVersion); final String translogUuid = Translog.createEmptyTranslog( diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index b06c8dfae5828..50f303fda5a64 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -4399,11 +4399,9 @@ public void testReadOnlyReplicaEngineConfig() throws IOException { assertTrue(replicaShard.getEngine().config().isReadOnlyReplica()); assertEquals(replicaShard.getEngine().getClass(), NRTReplicationEngine.class); - closeShards(primaryShard, replicaShard); } - public void testCloseShardWhileEngineIsWarming() throws Exception { CountDownLatch warmerStarted = new CountDownLatch(1); CountDownLatch warmerBlocking = new CountDownLatch(1);