diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index a048b858ca139..106643198cc3b 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -18,6 +18,7 @@ import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.core.internal.io.IOUtils; import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.seqno.SeqNoStats; @@ -90,6 +91,7 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th // generation. We can still refresh with incoming SegmentInfos that are not part of a commit point. if (infos.getGeneration() > lastCommittedSegmentInfos.getGeneration()) { this.lastCommittedSegmentInfos = infos; + rollTranslogGeneration(); } localCheckpointTracker.fastForwardProcessedSeqNo(seqNo); } @@ -121,10 +123,15 @@ public boolean isThrottled() { @Override public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException { - ensureOpen(); - try { + try (ReleasableLock lock = readLock.acquire()) { + ensureOpen(); translog.trimOperations(belowTerm, aboveSeqNo); - } catch (IOException e) { + } catch (Exception e) { + try { + failEngine("translog operations trimming failed", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } throw new EngineException(shardId, "failed to trim translog operations", e); } } @@ -286,10 +293,15 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException { @Override public void trimUnreferencedTranslogFiles() throws EngineException { - ensureOpen(); - try { + try (ReleasableLock lock = readLock.acquire()) { + ensureOpen(); translog.trimUnreferencedReaders(); - } catch (IOException e) { + } catch (Exception e) { + try { + failEngine("translog trimming failed", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } throw new EngineException(shardId, "failed to trim translog", e); } } @@ -301,11 +313,16 @@ public boolean shouldRollTranslogGeneration() { @Override public void rollTranslogGeneration() throws EngineException { - ensureOpen(); - try { + try (ReleasableLock ignored = readLock.acquire()) { + ensureOpen(); translog.rollGeneration(); translog.trimUnreferencedReaders(); - } catch (IOException e) { + } catch (Exception e) { + try { + failEngine("translog trimming failed", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } throw new EngineException(shardId, "failed to roll translog", e); } } @@ -350,11 +367,8 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) { } catch (Exception e) { logger.warn("failed to close engine", e); } finally { - try { - logger.debug("engine closed [{}]", reason); - } finally { - closedLatch.countDown(); - } + logger.debug("engine closed [{}]", reason); + closedLatch.countDown(); } } } diff --git a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java index 133eb1199bbcb..6aa00bb9312dd 100644 --- a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java @@ -129,9 +129,32 @@ public void testUpdateSegments() throws Exception { engine.flush(); nrtEngine.syncTranslog(); // to advance persisted checkpoint + Set seqNos = operations.stream().map(Engine.Operation::seqNo).collect(Collectors.toSet()); + + try (Translog.Snapshot snapshot = nrtEngine.getTranslog().newSnapshot()) { + assertThat(snapshot.totalOperations(), equalTo(operations.size())); + assertThat( + TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()), + equalTo(seqNos) + ); + } + nrtEngine.updateSegments(engine.getLastCommittedSegmentInfos(), engine.getProcessedLocalCheckpoint()); assertMatchingSegmentsAndCheckpoints(nrtEngine); + assertEquals( + nrtEngine.getTranslog().getGeneration().translogFileGeneration, + engine.getTranslog().getGeneration().translogFileGeneration + ); + + try (Translog.Snapshot snapshot = nrtEngine.getTranslog().newSnapshot()) { + assertThat(snapshot.totalOperations(), equalTo(operations.size())); + assertThat( + TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()), + equalTo(seqNos) + ); + } + // Ensure the same hit count between engines. int expectedDocCount; try (final Engine.Searcher test = engine.acquireSearcher("test")) {