From 1a3f9e5a07c52c2a70dc591ad0da1c394e47dd46 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sun, 26 Apr 2020 16:43:27 -0400 Subject: [PATCH] Return true for can_match on idle search shards (#55428) With this change, we will always return true for can_match requests on idle search shards; otherwise, some shards will never get refreshed if all search requests perform the can_match phase (i.e., total shards > pre_filter_shard_size). Relates #27500 Relates #50043 --- .../search/CanMatchPreFilterSearchPhase.java | 2 +- .../elasticsearch/index/shard/IndexShard.java | 50 +++++++++++++++---- .../elasticsearch/search/SearchService.java | 28 ++++++----- .../action/search/TransportSearchIT.java | 34 +++++++++++++ .../index/shard/SearchIdleIT.java | 3 ++ 5 files changed, 94 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java index 70eeb2a811e5b..337861d0dc583 100644 --- a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java @@ -160,7 +160,7 @@ private static final class CanMatchSearchPhaseResults extends SearchPhaseResults @Override void consumeResult(CanMatchResponse result) { - consumeResult(result.getShardIndex(), result.canMatch(), result.minAndMax()); + consumeResult(result.getShardIndex(), result.canMatch(), result.estimatedMinAndMax()); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 969b9a7e63d47..7516654abf18d 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -157,6 +157,7 @@ import java.nio.channels.ClosedByInterruptException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; @@ -271,6 +272,7 @@ Runnable getGlobalCheckpointSyncer() { private final AtomicLong lastSearcherAccess = new AtomicLong(); private final AtomicReference pendingRefreshLocation = new AtomicReference<>(); + private final RefreshPendingLocationListener refreshPendingLocationListener; private volatile boolean useRetentionLeasesInPeerRecovery; public IndexShard( @@ -369,6 +371,7 @@ public boolean shouldCache(Query query) { lastSearcherAccess.set(threadPool.relativeTimeInMillis()); persistMetadata(path, indexSettings, shardRouting, null, logger); this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases(); + this.refreshPendingLocationListener = new RefreshPendingLocationListener(); } public ThreadPool getThreadPool() { @@ -2751,7 +2754,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { similarityService.similarity(mapperService), codecService, shardEventListener, indexCache != null ? indexCache.query() : null, cachingPolicy, translogConfig, IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), - Collections.singletonList(refreshListeners), + Arrays.asList(refreshListeners, refreshPendingLocationListener), Collections.singletonList(new RefreshMetricUpdater(refreshMetric)), indexSort, circuitBreakerService, globalCheckpointSupplier, replicationTracker::getRetentionLeases, () -> getOperationPrimaryTerm(), tombstoneDocSupplier()); @@ -3251,7 +3254,7 @@ && isSearchIdle() /** * Returns true if this shards is search idle */ - final boolean isSearchIdle() { + public final boolean isSearchIdle() { return (threadPool.relativeTimeInMillis() - lastSearcherAccess.get()) >= indexSettings.getSearchIdleAfter().getMillis(); } @@ -3262,15 +3265,44 @@ final long getLastSearcherAccess() { return lastSearcherAccess.get(); } + /** + * Returns true if this shard has some scheduled refresh that is pending because of search-idle. + */ + public final boolean hasRefreshPending() { + return pendingRefreshLocation.get() != null; + } + private void setRefreshPending(Engine engine) { - Translog.Location lastWriteLocation = engine.getTranslogLastWriteLocation(); - Translog.Location location; - do { - location = this.pendingRefreshLocation.get(); - if (location != null && lastWriteLocation.compareTo(location) <= 0) { - break; + final Translog.Location lastWriteLocation = engine.getTranslogLastWriteLocation(); + pendingRefreshLocation.updateAndGet(curr -> { + if (curr == null || curr.compareTo(lastWriteLocation) <= 0) { + return lastWriteLocation; + } else { + return curr; } - } while (pendingRefreshLocation.compareAndSet(location, lastWriteLocation) == false); + }); + } + + private class RefreshPendingLocationListener implements ReferenceManager.RefreshListener { + Translog.Location lastWriteLocation; + + @Override + public void beforeRefresh() { + lastWriteLocation = getEngine().getTranslogLastWriteLocation(); + } + + @Override + public void afterRefresh(boolean didRefresh) { + if (didRefresh) { + pendingRefreshLocation.updateAndGet(pendingLocation -> { + if (pendingLocation == null || pendingLocation.compareTo(lastWriteLocation) <= 0) { + return null; + } else { + return pendingLocation; + } + }); + } + } } /** diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 3c437ac213261..03c4cd7e277d5 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -1115,6 +1115,7 @@ public CanMatchResponse canMatch(ShardSearchRequest request) throws IOException IndexShard indexShard = indexService.getShard(request.shardId().getId()); // we don't want to use the reader wrapper since it could run costly operations // and we can afford false positives. + final boolean hasRefreshPending = indexShard.hasRefreshPending(); try (Engine.Searcher searcher = indexShard.acquireCanMatchSearcher()) { final boolean aliasFilterCanMatch = request.getAliasFilter() .getQueryBuilder() instanceof MatchNoneQueryBuilder == false; @@ -1123,14 +1124,15 @@ public CanMatchResponse canMatch(ShardSearchRequest request) throws IOException Rewriteable.rewrite(request.getRewriteable(), context, false); FieldSortBuilder sortBuilder = FieldSortBuilder.getPrimaryFieldSortOrNull(request.source()); MinAndMax minMax = sortBuilder != null ? FieldSortBuilder.getMinMaxOrNull(context, sortBuilder) : null; + final boolean canMatch; if (canRewriteToMatchNone(request.source())) { QueryBuilder queryBuilder = request.source().query(); - return new CanMatchResponse( - aliasFilterCanMatch && queryBuilder instanceof MatchNoneQueryBuilder == false, minMax - ); + canMatch = aliasFilterCanMatch && queryBuilder instanceof MatchNoneQueryBuilder == false; + } else { + // null query means match_all + canMatch = aliasFilterCanMatch; } - // null query means match_all - return new CanMatchResponse(aliasFilterCanMatch, minMax); + return new CanMatchResponse(canMatch || hasRefreshPending, minMax); } } @@ -1207,28 +1209,28 @@ private static PipelineTree requestToPipelineTree(SearchRequest request) { public static final class CanMatchResponse extends SearchPhaseResult { private final boolean canMatch; - private final MinAndMax minAndMax; + private final MinAndMax estimatedMinAndMax; public CanMatchResponse(StreamInput in) throws IOException { super(in); this.canMatch = in.readBoolean(); if (in.getVersion().onOrAfter(Version.V_7_6_0)) { - minAndMax = in.readOptionalWriteable(MinAndMax::new); + estimatedMinAndMax = in.readOptionalWriteable(MinAndMax::new); } else { - minAndMax = null; + estimatedMinAndMax = null; } } - public CanMatchResponse(boolean canMatch, MinAndMax minAndMax) { + public CanMatchResponse(boolean canMatch, MinAndMax estimatedMinAndMax) { this.canMatch = canMatch; - this.minAndMax = minAndMax; + this.estimatedMinAndMax = estimatedMinAndMax; } @Override public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(canMatch); if (out.getVersion().onOrAfter(Version.V_7_6_0)) { - out.writeOptionalWriteable(minAndMax); + out.writeOptionalWriteable(estimatedMinAndMax); } } @@ -1236,8 +1238,8 @@ public boolean canMatch() { return canMatch; } - public MinAndMax minAndMax() { - return minAndMax; + public MinAndMax estimatedMinAndMax() { + return estimatedMinAndMax; } } diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchIT.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchIT.java index 64c9a1a43955b..8b707092141e0 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchIT.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchIT.java @@ -21,12 +21,18 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.test.ESIntegTestCase; import java.util.Collections; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; public class TransportSearchIT extends ESIntegTestCase { @@ -69,4 +75,32 @@ public void testShardCountLimit() throws Exception { TransportSearchAction.SHARD_COUNT_LIMIT_SETTING.getKey(), null))); } } + + public void testSearchIdle() throws Exception { + int numOfReplicas = randomIntBetween(0, 1); + internalCluster().ensureAtLeastNumDataNodes(numOfReplicas + 1); + final Settings.Builder settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 5)) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numOfReplicas) + .put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.timeValueMillis(randomIntBetween(50, 500))); + assertAcked(prepareCreate("test").setSettings(settings).addMapping("_doc", "created_date", "type=date,format=yyyy-MM-dd")); + ensureGreen("test"); + assertBusy(() -> { + for (String node : internalCluster().nodesInclude("test")) { + final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); + for (IndexShard indexShard : indicesService.indexServiceSafe(resolveIndex("test"))) { + assertTrue(indexShard.isSearchIdle()); + } + } + }); + client().prepareIndex("test", "_doc").setId("1").setSource("created_date", "2020-01-01").get(); + client().prepareIndex("test", "_doc").setId("2").setSource("created_date", "2020-01-02").get(); + client().prepareIndex("test", "_doc").setId("3").setSource("created_date", "2020-01-03").get(); + assertBusy(() -> { + SearchResponse resp = client().prepareSearch("test") + .setQuery(new RangeQueryBuilder("created_date").gte("2020-01-02").lte("2020-01-03")) + .setPreFilterShardSize(randomIntBetween(1, 3)).get(); + assertThat(resp.getHits().getTotalHits().value, equalTo(2L)); + }); + } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/SearchIdleIT.java b/server/src/test/java/org/elasticsearch/index/shard/SearchIdleIT.java index 96f10df0084e6..db7225519c5ef 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/SearchIdleIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/SearchIdleIT.java @@ -157,6 +157,7 @@ public void testPendingRefreshWithIntervalChange() throws Exception { assertHitCount(client().prepareSearch().get(), 1); client().prepareIndex("test", "test", "1").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get(); assertFalse(shard.scheduledRefresh()); + assertTrue(shard.hasRefreshPending()); // now disable background refresh and make sure the refresh happens CountDownLatch updateSettingsLatch = new CountDownLatch(1); @@ -168,11 +169,13 @@ public void testPendingRefreshWithIntervalChange() throws Exception { // wait for both to ensure we don't have in-flight operations updateSettingsLatch.await(); refreshLatch.await(); + assertFalse(shard.hasRefreshPending()); // We need to ensure a `scheduledRefresh` triggered by the internal refresh setting update is executed before we index a new doc; // otherwise, it will compete to call `Engine#maybeRefresh` with the `scheduledRefresh` that we are going to verify. ensureNoPendingScheduledRefresh(indexService.getThreadPool()); client().prepareIndex("test", "test", "2").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get(); assertTrue(shard.scheduledRefresh()); + assertFalse(shard.hasRefreshPending()); assertTrue(shard.isSearchIdle()); assertHitCount(client().prepareSearch().get(), 3); }