Skip to content

Commit

Permalink
Fix test bug with PIT where snapshotted segments are queried instead …
Browse files Browse the repository at this point in the history
…of current store state.

Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 authored and dreamer-89 committed Mar 20, 2023
1 parent ec5d6ea commit 766c76f
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.BytesRef;
import org.opensearch.action.ActionFuture;
Expand All @@ -41,17 +42,23 @@
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.IndexModule;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.NRTReplicationReaderManager;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.search.SearchService;
import org.opensearch.search.builder.PointInTimeBuilder;
import org.opensearch.search.internal.PitReaderContext;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.node.NodeClosedException;
import org.opensearch.test.BackgroundIndexer;
Expand All @@ -69,6 +76,7 @@

import static java.util.Arrays.asList;
import static org.opensearch.action.search.PitTestsUtil.assertSegments;
import static org.opensearch.action.search.SearchContextId.decode;
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.opensearch.index.query.QueryBuilders.matchQuery;
Expand Down Expand Up @@ -982,8 +990,22 @@ public void testPitCreatedOnReplica() throws Exception {
FlushRequest flushRequest = Requests.flushRequest(INDEX_NAME);
client().admin().indices().flush(flushRequest).get();
final IndexShard replicaShard = getIndexShard(replica, INDEX_NAME);
final SegmentInfos segmentInfos = replicaShard.getLatestSegmentInfosAndCheckpoint().v1().get();
final Collection<String> snapshottedSegments = segmentInfos.files(true);

// fetch the segments snapshotted when the reader context was created.
Collection<String> snapshottedSegments;
SearchService searchService = internalCluster().getInstance(SearchService.class, replica);
NamedWriteableRegistry registry = internalCluster().getInstance(NamedWriteableRegistry.class, replica);
final PitReaderContext pitReaderContext = searchService.getPitReaderContext(
decode(registry, pitResponse.getId()).shards().get(replicaShard.routingEntry().shardId()).getSearchContextId()
);
try (final Engine.Searcher searcher = pitReaderContext.acquireSearcher("test")) {
final StandardDirectoryReader standardDirectoryReader = NRTReplicationReaderManager.unwrapStandardReader(
(OpenSearchDirectoryReader) searcher.getDirectoryReader()
);
final SegmentInfos infos = standardDirectoryReader.getSegmentInfos();
snapshottedSegments = infos.files(true);
}
;

flush(INDEX_NAME);
for (int i = 101; i < 200; i++) {
Expand Down Expand Up @@ -1040,6 +1062,6 @@ public void testPitCreatedOnReplica() throws Exception {
client().execute(DeletePitAction.INSTANCE, deletePITRequest).actionGet();

currentFiles = List.of(replicaShard.store().directory().listAll());
assertFalse("Files should be preserved", currentFiles.containsAll(snapshottedSegments));
assertFalse("Files should be cleaned up", currentFiles.containsAll(snapshottedSegments));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ public class NRTReplicationReaderManager extends OpenSearchReaderManager {
) {
super(reader);
currentInfos = unwrapStandardReader(reader).getSegmentInfos();
this.onReaderClosed = onReaderClosed;
this.onNewReader = onNewReader;
this.onReaderClosed = onReaderClosed;
}

@Override
Expand Down Expand Up @@ -108,7 +108,7 @@ public SegmentInfos getSegmentInfos() {
return currentInfos;
}

private StandardDirectoryReader unwrapStandardReader(OpenSearchDirectoryReader reader) {
public static StandardDirectoryReader unwrapStandardReader(OpenSearchDirectoryReader reader) {
final DirectoryReader delegate = reader.getDelegate();
if (delegate instanceof SoftDeletesDirectoryReaderWrapper) {
return (StandardDirectoryReader) ((SoftDeletesDirectoryReaderWrapper) delegate).getDelegate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
*
* @opensearch.internal
*/
final class ReplicaFileDeleter {
final class ReplicaFileTracker {

private final Map<String, Integer> refCounts = new HashMap<>();

Expand All @@ -45,11 +45,7 @@ public synchronized void decRef(Collection<String> fileNames) {
}
}

public synchronized Integer getRefCount(String fileName) {
return refCounts.get(fileName);
}

public synchronized boolean skipDelete(String fileName) {
return refCounts.containsKey(fileName);
public synchronized boolean canDelete(String fileName) {
return refCounts.containsKey(fileName) == false;
}
}
13 changes: 7 additions & 6 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
private final ShardLock shardLock;
private final OnClose onClose;

private final ReplicaFileDeleter replicaFileDeleter;
private final ReplicaFileTracker replicaFileTracker;

private final AbstractRefCounted refCounter = new AbstractRefCounted("store") {
@Override
Expand All @@ -205,9 +205,9 @@ public Store(ShardId shardId, IndexSettings indexSettings, Directory directory,
this.shardLock = shardLock;
this.onClose = onClose;
if (indexSettings.isSegRepEnabled()) {
this.replicaFileDeleter = new ReplicaFileDeleter();
this.replicaFileTracker = new ReplicaFileTracker();
} else {
this.replicaFileDeleter = null;
this.replicaFileTracker = null;
}

assert onClose != null;
Expand Down Expand Up @@ -817,7 +817,8 @@ private void cleanupFiles(String reason, MetadataSnapshot localSnapshot, @Nullab
if (Store.isAutogenerated(existingFile)
|| localSnapshot.contains(existingFile)
|| (additionalFiles != null && additionalFiles.contains(existingFile))
|| replicaFileDeleter != null && replicaFileDeleter.skipDelete(existingFile)) {
// also ensure we are not deleting a file referenced by an active reader.
|| replicaFileTracker != null && replicaFileTracker.canDelete(existingFile) == false) {
// don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete
// checksum)
continue;
Expand Down Expand Up @@ -1920,13 +1921,13 @@ private static IndexWriterConfig newIndexWriterConfig() {

public void incRefFileDeleter(Collection<String> files) {
if (this.indexSettings.isSegRepEnabled()) {
this.replicaFileDeleter.incRef(files);
this.replicaFileTracker.incRef(files);
}
}

public void decrefFileDeleter(Collection<String> files) {
if (this.indexSettings.isSegRepEnabled()) {
this.replicaFileDeleter.decRef(files);
this.replicaFileTracker.decRef(files);
}
}
}

0 comments on commit 766c76f

Please sign in to comment.