Skip to content

Commit

Permalink
Segment Replication - PIT/Scroll compatibility.
Browse files Browse the repository at this point in the history
This change makes updates to make PIT/Scroll queries compatibile with Segment Replication.
It does this by refcounting files when a new reader is created, and discarding those files after
a reader is closed.

Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 authored and dreamer-89 committed Mar 20, 2023
1 parent 3b7402f commit 853928c
Show file tree
Hide file tree
Showing 6 changed files with 328 additions and 6 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Return success on DeletePits when no PITs exist. ([#6544](https://github.com/opensearch-project/OpenSearch/pull/6544))
- Add node repurpose command for search nodes ([#6517](https://github.com/opensearch-project/OpenSearch/pull/6517))
- [Segment Replication] Apply backpressure when replicas fall behind ([#6563](https://github.com/opensearch-project/OpenSearch/pull/6563))
- [Segment Replication] Add point in time and scroll query compatibility. ([#6644](https://github.com/opensearch-project/OpenSearch/pull/6644))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490))
Expand Down Expand Up @@ -111,4 +112,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.5...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.5...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.BytesRef;
import org.opensearch.action.ActionFuture;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.search.CreatePitAction;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.DeletePitAction;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.PitTestsUtil;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchType;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Requests;
Expand All @@ -30,6 +42,7 @@
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.IndexModule;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationPressureService;
Expand All @@ -38,6 +51,8 @@
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.search.builder.PointInTimeBuilder;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.node.NodeClosedException;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.InternalTestCluster;
Expand All @@ -46,14 +61,19 @@
import org.opensearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;

import static java.util.Arrays.asList;
import static org.opensearch.action.search.PitTestsUtil.assertSegments;
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.opensearch.index.query.QueryBuilders.matchQuery;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAllSuccessful;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchHits;

Expand Down Expand Up @@ -833,4 +853,193 @@ public void testPressureServiceStats() throws Exception {
});
}
}

public void testScrollCreatedOnReplica() throws Exception {
final String primary = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
ensureGreen(INDEX_NAME);

for (int i = 0; i < 100; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource(jsonBuilder().startObject().field("field", i).endObject())
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
refresh(INDEX_NAME);
}
assertBusy(
() -> assertEquals(
getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
)
);
final IndexShard replicaShard = getIndexShard(replica, INDEX_NAME);
final SegmentInfos segmentInfos = replicaShard.getLatestSegmentInfosAndCheckpoint().v1().get();
final Collection<String> snapshottedSegments = segmentInfos.files(true);
SearchResponse searchResponse = client(replica).prepareSearch()
.setQuery(matchAllQuery())
.setIndices(INDEX_NAME)
.setRequestCache(false)
.setPreference("_only_local")
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.addSort("field", SortOrder.ASC)
.setSize(10)
.setScroll(TimeValue.timeValueDays(1))
.get();

flush(INDEX_NAME);

for (int i = 3; i < 50; i++) {
client().prepareDelete(INDEX_NAME, String.valueOf(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
refresh(INDEX_NAME);
if (randomBoolean()) {
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get();
flush(INDEX_NAME);
}
}
assertBusy(() -> {
assertEquals(
getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
);
});

client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get();
assertBusy(() -> {
assertEquals(
getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
);
});
// Test stats
long numHits = 0;
do {
numHits += searchResponse.getHits().getHits().length;
searchResponse = client(replica).prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueDays(1)).get();
assertAllSuccessful(searchResponse);
} while (searchResponse.getHits().getHits().length > 0);

List<String> currentFiles = List.of(replicaShard.store().directory().listAll());
assertTrue("Files should be preserved", currentFiles.containsAll(snapshottedSegments));

client(replica).prepareClearScroll().addScrollId(searchResponse.getScrollId()).get();

currentFiles = List.of(replicaShard.store().directory().listAll());
assertFalse("Files should be preserved", currentFiles.containsAll(snapshottedSegments));

}

public void testPitCreatedOnReplica() throws Exception {
final String primary = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
ensureGreen(INDEX_NAME);
client().prepareIndex(INDEX_NAME)
.setId("1")
.setSource("foo", randomInt())
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
refresh(INDEX_NAME);

client().prepareIndex(INDEX_NAME)
.setId("2")
.setSource("foo", randomInt())
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
for (int i = 3; i < 100; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource("foo", randomInt())
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
refresh(INDEX_NAME);
}
// wait until replication finishes, then make the pit request.
assertBusy(
() -> assertEquals(
getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
)
);
CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), false);
request.setPreference("_only_local");
request.setIndices(new String[] { INDEX_NAME });
ActionFuture<CreatePitResponse> execute = client(replica).execute(CreatePitAction.INSTANCE, request);
CreatePitResponse pitResponse = execute.get();
SearchResponse searchResponse = client(replica).prepareSearch(INDEX_NAME)
.setSize(10)
.setPreference("_only_local")
.setRequestCache(false)
.addSort("foo", SortOrder.ASC)
.searchAfter(new Object[] { 30 })
.setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1)))
.get();
assertEquals(1, searchResponse.getSuccessfulShards());
assertEquals(1, searchResponse.getTotalShards());
FlushRequest flushRequest = Requests.flushRequest(INDEX_NAME);
client().admin().indices().flush(flushRequest).get();
final IndexShard replicaShard = getIndexShard(replica, INDEX_NAME);
final SegmentInfos segmentInfos = replicaShard.getLatestSegmentInfosAndCheckpoint().v1().get();
final Collection<String> snapshottedSegments = segmentInfos.files(true);

flush(INDEX_NAME);
for (int i = 101; i < 200; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource("foo", randomInt())
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
refresh(INDEX_NAME);
if (randomBoolean()) {
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get();
flush(INDEX_NAME);
}
}
assertBusy(() -> {
assertEquals(
getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
);
});

client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get();
assertBusy(() -> {
assertEquals(
getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
);
});
// Test stats
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.indices(INDEX_NAME);
indicesStatsRequest.all();
IndicesStatsResponse indicesStatsResponse = client().admin().indices().stats(indicesStatsRequest).get();
long pitCurrent = indicesStatsResponse.getIndex(INDEX_NAME).getTotal().search.getTotal().getPitCurrent();
long openContexts = indicesStatsResponse.getIndex(INDEX_NAME).getTotal().search.getOpenContexts();
assertEquals(1, pitCurrent);
assertEquals(1, openContexts);
SearchResponse resp = client(replica).prepareSearch(INDEX_NAME)
.setSize(10)
.setPreference("_only_local")
.addSort("foo", SortOrder.ASC)
.searchAfter(new Object[] { 30 })
.setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1)))
.setRequestCache(false)
.get();
PitTestsUtil.assertUsingGetAllPits(client(replica), pitResponse.getId(), pitResponse.getCreationTime());
assertSegments(false, INDEX_NAME, 1, client(replica), pitResponse.getId());

List<String> currentFiles = List.of(replicaShard.store().directory().listAll());
assertTrue("Files should be preserved", currentFiles.containsAll(snapshottedSegments));

// delete the PIT
DeletePitRequest deletePITRequest = new DeletePitRequest(pitResponse.getId());
client().execute(DeletePitAction.INSTANCE, deletePITRequest).actionGet();

currentFiles = List.of(replicaShard.store().directory().listAll());
assertFalse("Files should be preserved", currentFiles.containsAll(snapshottedSegments));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public NRTReplicationEngine(EngineConfig engineConfig) {
WriteOnlyTranslogManager translogManagerRef = null;
try {
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
readerManager = new NRTReplicationReaderManager(OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId));
readerManager = buildReaderManager();
final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(
this.lastCommittedSegmentInfos.getUserData().entrySet()
);
Expand Down Expand Up @@ -119,6 +119,23 @@ public void onAfterTranslogSync() {
}
}

private NRTReplicationReaderManager buildReaderManager() throws IOException {
return new NRTReplicationReaderManager(
OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId),
store::incRefFileDeleter,
(files) -> {
store.decrefFileDeleter(files);
try {
store.cleanupAndPreserveLatestCommitPoint("On reader closed", getLatestSegmentInfos());
} catch (IOException e) {
// Log but do not rethrow - we can try cleaning up again after next replication cycle.
// If that were to fail, the shard will as well.
logger.error("Unable to clean store after reader closed", e);
}
}
);
}

@Override
public TranslogManager translogManager() {
return translogManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;

/**
* This is an extension of {@link OpenSearchReaderManager} for use with {@link NRTReplicationEngine}.
Expand All @@ -35,17 +37,28 @@ public class NRTReplicationReaderManager extends OpenSearchReaderManager {

private final static Logger logger = LogManager.getLogger(NRTReplicationReaderManager.class);
private volatile SegmentInfos currentInfos;
private Consumer<Collection<String>> onReaderClosed;
private Consumer<Collection<String>> onNewReader;

/**
* Creates and returns a new SegmentReplicationReaderManager from the given
* already-opened {@link OpenSearchDirectoryReader}, stealing
* the incoming reference.
*
* @param reader the SegmentReplicationReaderManager to use for future reopens
* @param reader - The SegmentReplicationReaderManager to use for future reopens.
* @param onNewReader - Called when a new reader is created.
* @param onReaderClosed - Called when a reader is closed.
*/
NRTReplicationReaderManager(OpenSearchDirectoryReader reader) {
NRTReplicationReaderManager(
OpenSearchDirectoryReader reader,
Consumer<Collection<String>> onNewReader,
Consumer<Collection<String>> onReaderClosed
) throws IOException {
super(reader);
currentInfos = unwrapStandardReader(reader).getSegmentInfos();
this.onReaderClosed = onReaderClosed;
this.onNewReader = onNewReader;
onNewReader.accept(currentInfos.files(true));
}

@Override
Expand All @@ -60,6 +73,7 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re
for (LeafReaderContext ctx : standardDirectoryReader.leaves()) {
subs.add(ctx.reader());
}
final Collection<String> files = currentInfos.files(true);
DirectoryReader innerReader = StandardDirectoryReader.open(referenceToRefresh.directory(), currentInfos, subs, null);
final DirectoryReader softDeletesDirectoryReaderWrapper = new SoftDeletesDirectoryReaderWrapper(
innerReader,
Expand All @@ -68,7 +82,13 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re
logger.trace(
() -> new ParameterizedMessage("updated to SegmentInfosVersion=" + currentInfos.getVersion() + " reader=" + innerReader)
);
return OpenSearchDirectoryReader.wrap(softDeletesDirectoryReaderWrapper, referenceToRefresh.shardId());
final OpenSearchDirectoryReader reader = OpenSearchDirectoryReader.wrap(
softDeletesDirectoryReaderWrapper,
referenceToRefresh.shardId()
);
onNewReader.accept(files);
OpenSearchDirectoryReader.addReaderCloseListener(reader, key -> onReaderClosed.accept(files));
return reader;
}

/**
Expand Down
Loading

0 comments on commit 853928c

Please sign in to comment.