From b3b10914efa926625ef4d00ba570aae796a5aaa6 Mon Sep 17 00:00:00 2001 From: Jay Deng Date: Mon, 4 Sep 2023 17:21:39 -0400 Subject: [PATCH] [Backport 2.x] Adding concurrent search versions of query count and time metrics (#9622) (#9653) * Adding concurrent search versions of query count and time metrics (#9622) Signed-off-by: Jay Deng * Add average query concurrency metric for concurrent segment search (#9670) Signed-off-by: Jay Deng --------- Signed-off-by: Jay Deng Signed-off-by: Andriy Redko Co-authored-by: Andriy Redko --- CHANGELOG.md | 2 + .../test/cat.shards/10_basic.yml | 1 - .../search/stats/ConcurrentSearchStatsIT.java | 325 ++++++++++++++++++ .../index/search/stats/SearchStats.java | 74 ++++ .../index/search/stats/ShardSearchStats.java | 21 ++ .../rest/action/cat/RestIndicesAction.java | 40 +++ .../rest/action/cat/RestNodesAction.java | 25 ++ .../rest/action/cat/RestShardsAction.java | 26 ++ .../index/search/stats/SearchStatsTests.java | 11 +- 9 files changed, 521 insertions(+), 4 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/search/stats/ConcurrentSearchStatsIT.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 6942014957380..372775cf40cf8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Core crypto library to perform encryption and decryption of source content ([#8466](https://github.com/opensearch-project/OpenSearch/pull/8466)) - Expose DelimitedTermFrequencyTokenFilter to allow providing term frequencies along with terms ([#9479](https://github.com/opensearch-project/OpenSearch/pull/9479)) - APIs for performing async blob reads and async downloads from the repository using multiple streams ([#9592](https://github.com/opensearch-project/OpenSearch/issues/9592)) +- Add concurrent segment search related metrics to node and index stats ([#9622](https://github.com/opensearch-project/OpenSearch/issues/9622)) +- Add average concurrency metric for concurrent segment search ([#9670](https://github.com/opensearch-project/OpenSearch/issues/9670)) - Introduce cluster default remote translog buffer interval setting ([#9584](https://github.com/opensearch-project/OpenSearch/pull/9584)) - Added encryption-sdk lib to provide encryption and decryption capabilities ([#8466](https://github.com/opensearch-project/OpenSearch/pull/8466) [#9289](https://github.com/opensearch-project/OpenSearch/pull/9289)) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yml index 189215b6562a3..f80c9f9c0bc80 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yml @@ -1,4 +1,3 @@ ---- "Help": - skip: version: " - 2.3.99" diff --git a/server/src/internalClusterTest/java/org/opensearch/search/stats/ConcurrentSearchStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/search/stats/ConcurrentSearchStatsIT.java new file mode 100644 index 0000000000000..d2af7fc6effe7 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/search/stats/ConcurrentSearchStatsIT.java @@ -0,0 +1,325 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.stats; + +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.indices.stats.IndexStats; +import org.opensearch.action.admin.indices.stats.IndicesStatsRequestBuilder; +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.IndexModule; +import org.opensearch.index.IndexSettings; +import org.opensearch.indices.IndicesQueryCache; +import org.opensearch.indices.IndicesService; +import org.opensearch.plugins.Plugin; +import org.opensearch.script.MockScriptPlugin; +import org.opensearch.script.Script; +import org.opensearch.script.ScriptType; +import org.opensearch.test.InternalSettingsPlugin; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Locale; +import java.util.Map; +import java.util.function.Function; + +import static org.opensearch.index.query.QueryBuilders.scriptQuery; +import static org.opensearch.search.SearchBootstrapSettings.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_KEY; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThan; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 1, numClientNodes = 0, supportsDedicatedMasters = false) +public class ConcurrentSearchStatsIT extends OpenSearchIntegTestCase { + + private final int SEGMENT_SLICE_COUNT = 4; + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(ScriptedDelayedPlugin.class, InternalSettingsPlugin.class); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + // Filter/Query cache is cleaned periodically, default is 60s, so make sure it runs often. Thread.sleep for 60s is bad + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(IndicesService.INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), "1ms") + .put(IndicesQueryCache.INDICES_QUERIES_CACHE_ALL_SEGMENTS_SETTING.getKey(), true) + .put(CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_KEY, SEGMENT_SLICE_COUNT) + .build(); + } + + @Override + public Settings indexSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.getKey(), false) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0) + .build(); + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true").build(); + } + + public void testConcurrentQueryCount() throws Exception { + String INDEX_1 = "test-" + randomAlphaOfLength(5).toLowerCase(Locale.ROOT); + String INDEX_2 = "test-" + randomAlphaOfLength(5).toLowerCase(Locale.ROOT); + int NUM_SHARDS = randomIntBetween(1, 5); + createIndex( + INDEX_1, + Settings.builder() + .put(indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, NUM_SHARDS) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build() + ); + createIndex( + INDEX_2, + Settings.builder() + .put(indexSettings()) + .put("search.concurrent_segment_search.enabled", false) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, NUM_SHARDS) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build() + ); + + ensureGreen(); + + indexRandom( + false, + true, + client().prepareIndex(INDEX_1).setId("1").setSource("foo", "bar"), + client().prepareIndex(INDEX_1).setId("2").setSource("foo", "baz"), + client().prepareIndex(INDEX_2).setId("1").setSource("foo", "bar"), + client().prepareIndex(INDEX_2).setId("2").setSource("foo", "baz") + ); + + refresh(); + + // Search with custom plugin to ensure that queryTime is significant + client().prepareSearch(INDEX_1, INDEX_2) + .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedDelayedPlugin.SCRIPT_NAME, Collections.emptyMap()))) + .execute() + .actionGet(); + client().prepareSearch(INDEX_1) + .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedDelayedPlugin.SCRIPT_NAME, Collections.emptyMap()))) + .execute() + .actionGet(); + client().prepareSearch(INDEX_2) + .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedDelayedPlugin.SCRIPT_NAME, Collections.emptyMap()))) + .execute() + .actionGet(); + + IndicesStatsRequestBuilder builder = client().admin().indices().prepareStats(); + IndicesStatsResponse stats = builder.execute().actionGet(); + + assertEquals(4 * NUM_SHARDS, stats.getTotal().search.getTotal().getQueryCount()); + assertEquals(2 * NUM_SHARDS, stats.getTotal().search.getTotal().getConcurrentQueryCount()); + assertThat(stats.getTotal().search.getTotal().getQueryTimeInMillis(), greaterThan(0L)); + assertThat(stats.getTotal().search.getTotal().getConcurrentQueryTimeInMillis(), greaterThan(0L)); + assertThat( + stats.getTotal().search.getTotal().getConcurrentQueryTimeInMillis(), + lessThan(stats.getTotal().search.getTotal().getQueryTimeInMillis()) + ); + } + + /** + * Test average concurrency is correctly calculated across indices for the same node + */ + public void testAvgConcurrencyNodeLevel() throws InterruptedException { + int NUM_SHARDS = 1; + String INDEX_1 = "test-" + randomAlphaOfLength(5).toLowerCase(Locale.ROOT); + String INDEX_2 = "test-" + randomAlphaOfLength(5).toLowerCase(Locale.ROOT); + + // Create index test1 with 4 segments + createIndex( + INDEX_1, + Settings.builder() + .put(indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, NUM_SHARDS) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build() + ); + ensureGreen(); + for (int i = 0; i < 4; i++) { + client().prepareIndex(INDEX_1).setId(Integer.toString(i)).setSource("field", "value" + i).get(); + refresh(); + } + + client().prepareSearch(INDEX_1).execute().actionGet(); + NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().execute().actionGet(); + + assertEquals(1, nodesStatsResponse.getNodes().size(), 0); + double expectedConcurrency = SEGMENT_SLICE_COUNT; + assertEquals( + SEGMENT_SLICE_COUNT, + nodesStatsResponse.getNodes().get(0).getIndices().getSearch().getTotal().getConcurrentAvgSliceCount(), + 0 + ); + + forceMerge(); + // Sleep to make sure force merge completes + Thread.sleep(1000); + client().prepareSearch(INDEX_1).execute().actionGet(); + + nodesStatsResponse = client().admin().cluster().prepareNodesStats().execute().actionGet(); + + assertEquals(1, nodesStatsResponse.getNodes().size(), 0); + expectedConcurrency = (SEGMENT_SLICE_COUNT + 1) / 2.0; + assertEquals( + expectedConcurrency, + nodesStatsResponse.getNodes().get(0).getIndices().getSearch().getTotal().getConcurrentAvgSliceCount(), + 0 + ); + + // Create second index test2 with 4 segments + createIndex( + INDEX_2, + Settings.builder() + .put(indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, NUM_SHARDS) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build() + ); + ensureGreen(); + for (int i = 0; i < 4; i++) { + client().prepareIndex(INDEX_2).setId(Integer.toString(i)).setSource("field", "value" + i).get(); + refresh(); + } + + client().prepareSearch(INDEX_2).execute().actionGet(); + nodesStatsResponse = client().admin().cluster().prepareNodesStats().execute().actionGet(); + + assertEquals(1, nodesStatsResponse.getNodes().size(), 0); + expectedConcurrency = (SEGMENT_SLICE_COUNT + 1 + SEGMENT_SLICE_COUNT) / 3.0; + assertEquals( + expectedConcurrency, + nodesStatsResponse.getNodes().get(0).getIndices().getSearch().getTotal().getConcurrentAvgSliceCount(), + 0 + ); + + forceMerge(); + // Sleep to make sure force merge completes + Thread.sleep(1000); + client().prepareSearch(INDEX_2).execute().actionGet(); + nodesStatsResponse = client().admin().cluster().prepareNodesStats().execute().actionGet(); + + assertEquals(1, nodesStatsResponse.getNodes().size(), 0); + expectedConcurrency = (SEGMENT_SLICE_COUNT + 1 + SEGMENT_SLICE_COUNT + 1) / 4.0; + assertEquals( + expectedConcurrency, + nodesStatsResponse.getNodes().get(0).getIndices().getSearch().getTotal().getConcurrentAvgSliceCount(), + 0 + ); + + // Check that non-concurrent search requests do not affect the average concurrency + client().admin() + .indices() + .prepareUpdateSettings(INDEX_1) + .setSettings(Settings.builder().put("search.concurrent_segment_search.enabled", false)) + .execute() + .actionGet(); + client().admin() + .indices() + .prepareUpdateSettings(INDEX_2) + .setSettings(Settings.builder().put("search.concurrent_segment_search.enabled", false)) + .execute() + .actionGet(); + client().prepareSearch(INDEX_1).execute().actionGet(); + client().prepareSearch(INDEX_2).execute().actionGet(); + assertEquals(1, nodesStatsResponse.getNodes().size(), 0); + assertEquals( + expectedConcurrency, + nodesStatsResponse.getNodes().get(0).getIndices().getSearch().getTotal().getConcurrentAvgSliceCount(), + 0 + ); + } + + /** + * Test average concurrency is correctly calculated across shard for the same index + */ + public void testAvgConcurrencyIndexLevel() throws InterruptedException { + int NUM_SHARDS = 2; + String INDEX = "test-" + randomAlphaOfLength(5).toLowerCase(Locale.ROOT); + createIndex( + INDEX, + Settings.builder() + .put(indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, NUM_SHARDS) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build() + ); + ensureGreen(); + // Create 4 segments on each shard + for (int i = 0; i < 4; i++) { + client().prepareIndex(INDEX).setId(Integer.toString(i)).setSource("field", "value" + i).setRouting("0").get(); + refresh(); + } + for (int i = 4; i < 8; i++) { + client().prepareIndex(INDEX).setId(Integer.toString(i)).setSource("field", "value" + i).setRouting("1").get(); + refresh(); + } + client().prepareSearch(INDEX).execute().actionGet(); + IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats().execute().actionGet(); + + IndexStats stats = indicesStatsResponse.getIndices().get(INDEX); + assertNotNull(stats); + double expectedConcurrency = (SEGMENT_SLICE_COUNT * NUM_SHARDS) / (double) NUM_SHARDS; + assertEquals(expectedConcurrency, stats.getTotal().getSearch().getTotal().getConcurrentAvgSliceCount(), 0); + + forceMerge(); + // Sleep to make sure force merge completes + Thread.sleep(1000); + client().prepareSearch(INDEX).execute().actionGet(); + + indicesStatsResponse = client().admin().indices().prepareStats().execute().actionGet(); + stats = indicesStatsResponse.getIndices().get(INDEX); + assertNotNull(stats); + expectedConcurrency = (SEGMENT_SLICE_COUNT * NUM_SHARDS + 1 * NUM_SHARDS) / (NUM_SHARDS * 2.0); + assertEquals(expectedConcurrency, stats.getTotal().getSearch().getTotal().getConcurrentAvgSliceCount(), 0); + + // Check that non-concurrent search requests do not affect the average concurrency + client().admin() + .indices() + .prepareUpdateSettings(INDEX) + .setSettings(Settings.builder().put("search.concurrent_segment_search.enabled", false)) + .execute() + .actionGet(); + + client().prepareSearch(INDEX).execute().actionGet(); + + indicesStatsResponse = client().admin().indices().prepareStats().execute().actionGet(); + stats = indicesStatsResponse.getIndices().get(INDEX); + assertNotNull(stats); + assertEquals(expectedConcurrency, stats.getTotal().getSearch().getTotal().getConcurrentAvgSliceCount(), 0); + } + + public static class ScriptedDelayedPlugin extends MockScriptPlugin { + static final String SCRIPT_NAME = "search_timeout"; + + @Override + public Map, Object>> pluginScripts() { + return Collections.singletonMap(SCRIPT_NAME, params -> { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return true; + }); + } + } +} diff --git a/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java b/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java index e41deb514fbef..76b216304b3b7 100644 --- a/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java +++ b/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java @@ -35,6 +35,7 @@ import org.opensearch.Version; import org.opensearch.common.Nullable; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.common.Strings; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -67,6 +68,11 @@ public static class Stats implements Writeable, ToXContentFragment { private long queryTimeInMillis; private long queryCurrent; + private long concurrentQueryCount; + private long concurrentQueryTimeInMillis; + private long concurrentQueryCurrent; + private long queryConcurrency; + private long fetchCount; private long fetchTimeInMillis; private long fetchCurrent; @@ -91,6 +97,10 @@ public Stats( long queryCount, long queryTimeInMillis, long queryCurrent, + long concurrentQueryCount, + long concurrentQueryTimeInMillis, + long concurrentQueryCurrent, + long queryConcurrency, long fetchCount, long fetchTimeInMillis, long fetchCurrent, @@ -108,6 +118,11 @@ public Stats( this.queryTimeInMillis = queryTimeInMillis; this.queryCurrent = queryCurrent; + this.concurrentQueryCount = concurrentQueryCount; + this.concurrentQueryTimeInMillis = concurrentQueryTimeInMillis; + this.concurrentQueryCurrent = concurrentQueryCurrent; + this.queryConcurrency = queryConcurrency; + this.fetchCount = fetchCount; this.fetchTimeInMillis = fetchTimeInMillis; this.fetchCurrent = fetchCurrent; @@ -147,6 +162,13 @@ private Stats(StreamInput in) throws IOException { pitTimeInMillis = in.readVLong(); pitCurrent = in.readVLong(); } + + if (in.getVersion().onOrAfter(Version.V_2_10_0)) { + concurrentQueryCount = in.readVLong(); + concurrentQueryTimeInMillis = in.readVLong(); + concurrentQueryCurrent = in.readVLong(); + queryConcurrency = in.readVLong(); + } } public void add(Stats stats) { @@ -154,6 +176,11 @@ public void add(Stats stats) { queryTimeInMillis += stats.queryTimeInMillis; queryCurrent += stats.queryCurrent; + concurrentQueryCount += stats.concurrentQueryCount; + concurrentQueryTimeInMillis += stats.concurrentQueryTimeInMillis; + concurrentQueryCurrent += stats.concurrentQueryCurrent; + queryConcurrency += stats.queryConcurrency; + fetchCount += stats.fetchCount; fetchTimeInMillis += stats.fetchTimeInMillis; fetchCurrent += stats.fetchCurrent; @@ -175,6 +202,9 @@ public void addForClosingShard(Stats stats) { queryCount += stats.queryCount; queryTimeInMillis += stats.queryTimeInMillis; + concurrentQueryCount += stats.concurrentQueryCount; + concurrentQueryTimeInMillis += stats.concurrentQueryTimeInMillis; + fetchCount += stats.fetchCount; fetchTimeInMillis += stats.fetchTimeInMillis; @@ -189,6 +219,7 @@ public void addForClosingShard(Stats stats) { pitCount += stats.pitCount; pitTimeInMillis += stats.pitTimeInMillis; pitCurrent += stats.pitCurrent; + queryConcurrency += stats.queryConcurrency; } public long getQueryCount() { @@ -207,6 +238,30 @@ public long getQueryCurrent() { return queryCurrent; } + public long getConcurrentQueryCount() { + return concurrentQueryCount; + } + + public TimeValue getConcurrentQueryTime() { + return new TimeValue(concurrentQueryTimeInMillis); + } + + public double getConcurrentAvgSliceCount() { + if (concurrentQueryCount == 0) { + return 0; + } else { + return queryConcurrency / (double) concurrentQueryCount; + } + } + + public long getConcurrentQueryTimeInMillis() { + return concurrentQueryTimeInMillis; + } + + public long getConcurrentQueryCurrent() { + return concurrentQueryCurrent; + } + public long getFetchCount() { return fetchCount; } @@ -298,6 +353,13 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(pitTimeInMillis); out.writeVLong(pitCurrent); } + + if (out.getVersion().onOrAfter(Version.V_2_10_0)) { + out.writeVLong(concurrentQueryCount); + out.writeVLong(concurrentQueryTimeInMillis); + out.writeVLong(concurrentQueryCurrent); + out.writeVLong(queryConcurrency); + } } @Override @@ -306,6 +368,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.humanReadableField(Fields.QUERY_TIME_IN_MILLIS, Fields.QUERY_TIME, getQueryTime()); builder.field(Fields.QUERY_CURRENT, queryCurrent); + if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)) { + builder.field(Fields.CONCURRENT_QUERY_TOTAL, concurrentQueryCount); + builder.humanReadableField(Fields.CONCURRENT_QUERY_TIME_IN_MILLIS, Fields.CONCURRENT_QUERY_TIME, getConcurrentQueryTime()); + builder.field(Fields.CONCURRENT_QUERY_CURRENT, concurrentQueryCurrent); + builder.field(Fields.CONCURRENT_AVG_SLICE_COUNT, getConcurrentAvgSliceCount()); + } + builder.field(Fields.FETCH_TOTAL, fetchCount); builder.humanReadableField(Fields.FETCH_TIME_IN_MILLIS, Fields.FETCH_TIME, getFetchTime()); builder.field(Fields.FETCH_CURRENT, fetchCurrent); @@ -430,6 +499,11 @@ static final class Fields { static final String QUERY_TIME = "query_time"; static final String QUERY_TIME_IN_MILLIS = "query_time_in_millis"; static final String QUERY_CURRENT = "query_current"; + static final String CONCURRENT_QUERY_TOTAL = "concurrent_query_total"; + static final String CONCURRENT_QUERY_TIME = "concurrent_query_time"; + static final String CONCURRENT_QUERY_TIME_IN_MILLIS = "concurrent_query_time_in_millis"; + static final String CONCURRENT_QUERY_CURRENT = "concurrent_query_current"; + static final String CONCURRENT_AVG_SLICE_COUNT = "concurrent_avg_slice_count"; static final String FETCH_TOTAL = "fetch_total"; static final String FETCH_TIME = "fetch_time"; static final String FETCH_TIME_IN_MILLIS = "fetch_time_in_millis"; diff --git a/server/src/main/java/org/opensearch/index/search/stats/ShardSearchStats.java b/server/src/main/java/org/opensearch/index/search/stats/ShardSearchStats.java index 6f6ebd5545c7a..99e3f8465c5db 100644 --- a/server/src/main/java/org/opensearch/index/search/stats/ShardSearchStats.java +++ b/server/src/main/java/org/opensearch/index/search/stats/ShardSearchStats.java @@ -91,6 +91,9 @@ public void onPreQueryPhase(SearchContext searchContext) { statsHolder.suggestCurrent.inc(); } else { statsHolder.queryCurrent.inc(); + if (searchContext.shouldUseConcurrentSearch()) { + statsHolder.concurrentQueryCurrent.inc(); + } } }); } @@ -104,6 +107,10 @@ public void onFailedQueryPhase(SearchContext searchContext) { } else { statsHolder.queryCurrent.dec(); assert statsHolder.queryCurrent.count() >= 0; + if (searchContext.shouldUseConcurrentSearch()) { + statsHolder.concurrentQueryCurrent.dec(); + assert statsHolder.concurrentQueryCurrent.count() >= 0; + } } }); } @@ -119,6 +126,13 @@ public void onQueryPhase(SearchContext searchContext, long tookInNanos) { statsHolder.queryMetric.inc(tookInNanos); statsHolder.queryCurrent.dec(); assert statsHolder.queryCurrent.count() >= 0; + if (searchContext.shouldUseConcurrentSearch()) { + statsHolder.concurrentQueryMetric.inc(tookInNanos); + statsHolder.concurrentQueryCurrent.dec(); + assert statsHolder.concurrentQueryCurrent.count() >= 0; + assert searchContext.searcher().getSlices() != null; + statsHolder.queryConcurrencyMetric.inc(searchContext.searcher().getSlices().length); + } } }); } @@ -206,6 +220,8 @@ public void onFreePitContext(ReaderContext readerContext) { */ static final class StatsHolder { final MeanMetric queryMetric = new MeanMetric(); + final MeanMetric concurrentQueryMetric = new MeanMetric(); + final CounterMetric queryConcurrencyMetric = new CounterMetric(); final MeanMetric fetchMetric = new MeanMetric(); /* We store scroll statistics in microseconds because with nanoseconds we run the risk of overflowing the total stats if there are * many scrolls. For example, on a system with 2^24 scrolls that have been executed, each executing for 2^10 seconds, then using @@ -218,6 +234,7 @@ static final class StatsHolder { final MeanMetric pitMetric = new MeanMetric(); final MeanMetric suggestMetric = new MeanMetric(); final CounterMetric queryCurrent = new CounterMetric(); + final CounterMetric concurrentQueryCurrent = new CounterMetric(); final CounterMetric fetchCurrent = new CounterMetric(); final CounterMetric scrollCurrent = new CounterMetric(); final CounterMetric pitCurrent = new CounterMetric(); @@ -228,6 +245,10 @@ SearchStats.Stats stats() { queryMetric.count(), TimeUnit.NANOSECONDS.toMillis(queryMetric.sum()), queryCurrent.count(), + concurrentQueryMetric.count(), + TimeUnit.NANOSECONDS.toMillis(concurrentQueryMetric.sum()), + concurrentQueryCurrent.count(), + queryConcurrencyMetric.count(), fetchMetric.count(), TimeUnit.NANOSECONDS.toMillis(fetchMetric.sum()), fetchCurrent.count(), diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestIndicesAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestIndicesAction.java index 2781c09de8614..dea4f02aef5bd 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestIndicesAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestIndicesAction.java @@ -53,6 +53,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.time.DateFormatter; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.action.ActionListener; import org.opensearch.core.action.ActionResponse; import org.opensearch.core.common.Strings; @@ -574,6 +575,31 @@ protected Table getTableWithHeader(final RestRequest request) { "sibling:pri;alias:sqto,searchQueryTotal;default:false;text-align:right;desc:total query phase ops" ); table.addCell("pri.search.query_total", "default:false;text-align:right;desc:total query phase ops"); + if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)) { + table.addCell( + "search.concurrent_query_current", + "sibling:pri;alias:scqc,searchConcurrentQueryCurrent;default:false;text-align:right;desc:current concurrent query phase ops" + ); + table.addCell("pri.search.concurrent_query_current", "default:false;text-align:right;desc:current concurrent query phase ops"); + + table.addCell( + "search.concurrent_query_time", + "sibling:pri;alias:scqti,searchConcurrentQueryTime;default:false;text-align:right;desc:time spent in concurrent query phase" + ); + table.addCell("pri.search.concurrent_query_time", "default:false;text-align:right;desc:time spent in concurrent query phase"); + + table.addCell( + "search.concurrent_query_total", + "sibling:pri;alias:scqto,searchConcurrentQueryTotal;default:false;text-align:right;desc:total query phase ops" + ); + table.addCell("pri.search.concurrent_query_total", "default:false;text-align:right;desc:total query phase ops"); + + table.addCell( + "search.concurrent_avg_slice_count", + "sibling:pri;alias:casc,searchConcurrentAvgSliceCount;default:false;text-align:right;desc:average query concurrency" + ); + table.addCell("pri.search.concurrent_avg_slice_count", "default:false;text-align:right;desc:average query concurrency"); + } table.addCell( "search.scroll_current", @@ -883,6 +909,20 @@ Table buildTable( table.addCell(totalStats.getSearch() == null ? null : totalStats.getSearch().getTotal().getQueryCount()); table.addCell(primaryStats.getSearch() == null ? null : primaryStats.getSearch().getTotal().getQueryCount()); + if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)) { + table.addCell(totalStats.getSearch() == null ? null : totalStats.getSearch().getTotal().getConcurrentQueryCurrent()); + table.addCell(primaryStats.getSearch() == null ? null : primaryStats.getSearch().getTotal().getConcurrentQueryCurrent()); + + table.addCell(totalStats.getSearch() == null ? null : totalStats.getSearch().getTotal().getConcurrentQueryTime()); + table.addCell(primaryStats.getSearch() == null ? null : primaryStats.getSearch().getTotal().getConcurrentQueryTime()); + + table.addCell(totalStats.getSearch() == null ? null : totalStats.getSearch().getTotal().getConcurrentQueryCount()); + table.addCell(primaryStats.getSearch() == null ? null : primaryStats.getSearch().getTotal().getConcurrentQueryCount()); + + table.addCell(totalStats.getSearch() == null ? null : totalStats.getSearch().getTotal().getConcurrentAvgSliceCount()); + table.addCell(primaryStats.getSearch() == null ? null : primaryStats.getSearch().getTotal().getConcurrentAvgSliceCount()); + } + table.addCell(totalStats.getSearch() == null ? null : totalStats.getSearch().getTotal().getScrollCurrent()); table.addCell(primaryStats.getSearch() == null ? null : primaryStats.getSearch().getTotal().getScrollCurrent()); diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java index da4bd9d71732e..bf8b27f378311 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java @@ -47,6 +47,7 @@ import org.opensearch.common.Table; import org.opensearch.common.logging.DeprecationLogger; import org.opensearch.common.network.NetworkAddress; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.common.Strings; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.common.unit.ByteSizeValue; @@ -303,6 +304,24 @@ protected Table getTableWithHeader(final RestRequest request) { table.addCell("search.query_current", "alias:sqc,searchQueryCurrent;default:false;text-align:right;desc:current query phase ops"); table.addCell("search.query_time", "alias:sqti,searchQueryTime;default:false;text-align:right;desc:time spent in query phase"); table.addCell("search.query_total", "alias:sqto,searchQueryTotal;default:false;text-align:right;desc:total query phase ops"); + if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)) { + table.addCell( + "search.concurrent_query_current", + "alias:scqc,searchConcurrentQueryCurrent;default:false;text-align:right;desc:current concurrent query phase ops" + ); + table.addCell( + "search.concurrent_query_time", + "alias:scqti,searchConcurrentQueryTime;default:false;text-align:right;desc:time spent in concurrent query phase" + ); + table.addCell( + "search.concurrent_query_total", + "alias:scqto,searchConcurrentQueryTotal;default:false;text-align:right;desc:total concurrent query phase ops" + ); + table.addCell( + "search.concurrent_avg_slice_count", + "alias:casc,searchConcurrentAvgSliceCount;default:false;text-align:right;desc:average query concurrency" + ); + } table.addCell("search.scroll_current", "alias:scc,searchScrollCurrent;default:false;text-align:right;desc:open scroll contexts"); table.addCell( "search.scroll_time", @@ -529,6 +548,12 @@ Table buildTable( table.addCell(searchStats == null ? null : searchStats.getTotal().getQueryCurrent()); table.addCell(searchStats == null ? null : searchStats.getTotal().getQueryTime()); table.addCell(searchStats == null ? null : searchStats.getTotal().getQueryCount()); + if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)) { + table.addCell(searchStats == null ? null : searchStats.getTotal().getConcurrentQueryCurrent()); + table.addCell(searchStats == null ? null : searchStats.getTotal().getConcurrentQueryTime()); + table.addCell(searchStats == null ? null : searchStats.getTotal().getConcurrentQueryCount()); + table.addCell(searchStats == null ? null : searchStats.getTotal().getConcurrentAvgSliceCount()); + } table.addCell(searchStats == null ? null : searchStats.getTotal().getScrollCurrent()); table.addCell(searchStats == null ? null : searchStats.getTotal().getScrollTime()); table.addCell(searchStats == null ? null : searchStats.getTotal().getScrollCount()); diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java index cadc6ef730350..06b31270f7c65 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java @@ -44,6 +44,7 @@ import org.opensearch.common.Table; import org.opensearch.common.logging.DeprecationLogger; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.common.Strings; import org.opensearch.index.cache.query.QueryCacheStats; import org.opensearch.index.engine.CommitStats; @@ -219,6 +220,24 @@ protected Table getTableWithHeader(final RestRequest request) { table.addCell("search.query_current", "alias:sqc,searchQueryCurrent;default:false;text-align:right;desc:current query phase ops"); table.addCell("search.query_time", "alias:sqti,searchQueryTime;default:false;text-align:right;desc:time spent in query phase"); table.addCell("search.query_total", "alias:sqto,searchQueryTotal;default:false;text-align:right;desc:total query phase ops"); + if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)) { + table.addCell( + "search.concurrent_query_current", + "alias:scqc,searchConcurrentQueryCurrent;default:false;text-align:right;desc:current concurrent query phase ops" + ); + table.addCell( + "search.concurrent_query_time", + "alias:scqti,searchConcurrentQueryTime;default:false;text-align:right;desc:time spent in concurrent query phase" + ); + table.addCell( + "search.concurrent_query_total", + "alias:scqto,searchConcurrentQueryTotal;default:false;text-align:right;desc:total concurrent query phase ops" + ); + table.addCell( + "search.concurrent_avg_slice_count", + "alias:casc,searchConcurrentAvgSliceCount;default:false;text-align:right;desc:average query concurrency" + ); + } table.addCell("search.scroll_current", "alias:scc,searchScrollCurrent;default:false;text-align:right;desc:open scroll contexts"); table.addCell( "search.scroll_time", @@ -399,6 +418,13 @@ Table buildTable(RestRequest request, ClusterStateResponse state, IndicesStatsRe table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getQueryCurrent())); table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getQueryTime())); table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getQueryCount())); + if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)) { + table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getConcurrentQueryCurrent())); + table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getConcurrentQueryTime())); + table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getConcurrentQueryCount())); + table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getConcurrentAvgSliceCount())); + + } table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getScrollCurrent())); table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getScrollTime())); table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getScrollCount())); diff --git a/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java b/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java index 7d2d8e38d066e..f65b0e231b1dd 100644 --- a/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java +++ b/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java @@ -45,9 +45,9 @@ public void testShardLevelSearchGroupStats() throws Exception { // let's create two dummy search stats with groups Map groupStats1 = new HashMap<>(); Map groupStats2 = new HashMap<>(); - groupStats2.put("group1", new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1)); - SearchStats searchStats1 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats1); - SearchStats searchStats2 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats2); + groupStats2.put("group1", new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1)); + SearchStats searchStats1 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats1); + SearchStats searchStats2 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats2); // adding these two search stats and checking group stats are correct searchStats1.add(searchStats2); @@ -69,6 +69,9 @@ private static void assertStats(Stats stats, long equalTo) { assertEquals(equalTo, stats.getQueryCount()); assertEquals(equalTo, stats.getQueryTimeInMillis()); assertEquals(equalTo, stats.getQueryCurrent()); + assertEquals(equalTo, stats.getConcurrentQueryCount()); + assertEquals(equalTo, stats.getConcurrentQueryTimeInMillis()); + assertEquals(equalTo, stats.getConcurrentQueryCurrent()); assertEquals(equalTo, stats.getFetchCount()); assertEquals(equalTo, stats.getFetchTimeInMillis()); assertEquals(equalTo, stats.getFetchCurrent()); @@ -81,6 +84,8 @@ private static void assertStats(Stats stats, long equalTo) { assertEquals(equalTo, stats.getSuggestCount()); assertEquals(equalTo, stats.getSuggestTimeInMillis()); assertEquals(equalTo, stats.getSuggestCurrent()); + // avg_concurrency is not summed up across stats + assertEquals(1, stats.getConcurrentAvgSliceCount(), 0); } }