Skip to content

Commit

Permalink
Fix test bug with PIT where snapshotted segments are queried instead …
Browse files Browse the repository at this point in the history
…of current store state.

Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Mar 16, 2023
1 parent eb32b8b commit 8822b81
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,22 @@
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.BytesRef;
import org.opensearch.action.ActionFuture;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.opensearch.action.admin.indices.segments.PitSegmentsAction;
import org.opensearch.action.admin.indices.segments.PitSegmentsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.search.CreatePitAction;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.DeletePitAction;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.ListPitInfo;
import org.opensearch.action.search.PitTestsUtil;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchType;
Expand All @@ -41,17 +46,23 @@
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.IndexModule;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.NRTReplicationReaderManager;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.search.SearchService;
import org.opensearch.search.builder.PointInTimeBuilder;
import org.opensearch.search.internal.PitReaderContext;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.node.NodeClosedException;
import org.opensearch.test.BackgroundIndexer;
Expand All @@ -69,6 +80,7 @@

import static java.util.Arrays.asList;
import static org.opensearch.action.search.PitTestsUtil.assertSegments;
import static org.opensearch.action.search.SearchContextId.decode;
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.opensearch.index.query.QueryBuilders.matchQuery;
Expand Down Expand Up @@ -984,8 +996,17 @@ public void testPitCreatedOnReplica() throws Exception {
FlushRequest flushRequest = Requests.flushRequest(INDEX_NAME);
client().admin().indices().flush(flushRequest).get();
final IndexShard replicaShard = getIndexShard(replica, INDEX_NAME);
final SegmentInfos segmentInfos = replicaShard.getLatestSegmentInfosAndCheckpoint().v1().get();
final Collection<String> snapshottedSegments = segmentInfos.files(true);

// fetch the segments snapshotted when the reader context was created.
Collection<String> snapshottedSegments;
SearchService searchService = internalCluster().getInstance(SearchService.class, replica);
NamedWriteableRegistry registry = internalCluster().getInstance(NamedWriteableRegistry.class, replica);
final PitReaderContext pitReaderContext = searchService.getPitReaderContext(decode(registry, pitResponse.getId()).shards().get(replicaShard.routingEntry().shardId()).getSearchContextId());
try(final Engine.Searcher searcher = pitReaderContext.acquireSearcher("test")) {
final StandardDirectoryReader standardDirectoryReader = NRTReplicationReaderManager.unwrapStandardReader((OpenSearchDirectoryReader) searcher.getDirectoryReader());
final SegmentInfos infos = standardDirectoryReader.getSegmentInfos();
snapshottedSegments = infos.files(true);
};

flush(INDEX_NAME);
for (int i = 101; i < 200; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiFunction;
import java.util.function.Supplier;

import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public SegmentInfos getSegmentInfos() {
return currentInfos;
}

private StandardDirectoryReader unwrapStandardReader(OpenSearchDirectoryReader reader) {
public static StandardDirectoryReader unwrapStandardReader(OpenSearchDirectoryReader reader) {
final DirectoryReader delegate = reader.getDelegate();
if (delegate instanceof SoftDeletesDirectoryReaderWrapper) {
return (StandardDirectoryReader) ((SoftDeletesDirectoryReaderWrapper) delegate).getDelegate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,7 @@ public synchronized void decRef(Collection<String> fileNames) {
}
}

public synchronized Integer getRefCount(String fileName) {
return refCounts.get(fileName);
}

public synchronized boolean skipDelete(String fileName) {
return refCounts.containsKey(fileName);
public synchronized boolean canDelete(String fileName) {
return refCounts.containsKey(fileName) == false;
}
}
2 changes: 1 addition & 1 deletion server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,7 @@ private void cleanupFiles(String reason, MetadataSnapshot localSnapshot, @Nullab
if (Store.isAutogenerated(existingFile)
|| localSnapshot.contains(existingFile)
|| (additionalFiles != null && additionalFiles.contains(existingFile))
|| replicaFileDeleter != null && replicaFileDeleter.skipDelete(existingFile)) {
|| replicaFileDeleter != null && replicaFileDeleter.canDelete(existingFile) == false) {
// don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete
// checksum)
continue;
Expand Down

0 comments on commit 8822b81

Please sign in to comment.