diff --git a/CHANGELOG.md b/CHANGELOG.md index 50a1a6ea99243..5b18f02585657 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -223,6 +223,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Extract cluster management for integration tests into JUnit test rule out of OpenSearchIntegTestCase ([#11877](https://github.com/opensearch-project/OpenSearch/pull/11877)), ([#12000](https://github.com/opensearch-project/OpenSearch/pull/12000)) - Workaround for https://bugs.openjdk.org/browse/JDK-8323659 regression, introduced in JDK-21.0.2 ([#11968](https://github.com/opensearch-project/OpenSearch/pull/11968)) - Updates IpField to be searchable when only `doc_values` are enabled ([#11508](https://github.com/opensearch-project/OpenSearch/pull/11508)) +- [Query Insights] Implement Query Insights Plugin with Top N Queries Feature ([#11506](https://github.com/opensearch-project/OpenSearch/pull/11506)) ### Deprecated diff --git a/plugins/query-insights/build.gradle b/plugins/query-insights/build.gradle new file mode 100644 index 0000000000000..eabbd395bd3bd --- /dev/null +++ b/plugins/query-insights/build.gradle @@ -0,0 +1,18 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +opensearchplugin { + description 'OpenSearch Query Insights Plugin.' + classname 'org.opensearch.plugin.insights.QueryInsightsPlugin' +} + +dependencies { +} diff --git a/plugins/query-insights/src/internalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java b/plugins/query-insights/src/internalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java new file mode 100644 index 0000000000000..04e715444f50a --- /dev/null +++ b/plugins/query-insights/src/internalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java @@ -0,0 +1,274 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights; + +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.action.admin.cluster.node.info.NodeInfo; +import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest; +import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.opensearch.action.admin.cluster.node.info.PluginsAndModules; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesRequest; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesResponse; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugins.Plugin; +import org.opensearch.plugins.PluginInfo; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.Assert; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +/** + * Transport Action tests for Query Insights Plugin + */ + +@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 0, scope = OpenSearchIntegTestCase.Scope.TEST) +public class QueryInsightsPluginTransportIT extends OpenSearchIntegTestCase { + + private final int TOTAL_NUMBER_OF_NODES = 2; + private final int TOTAL_SEARCH_REQUESTS = 5; + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(QueryInsightsPlugin.class); + } + + /** + * Test Query Insights Plugin is installed + */ + public void testQueryInsightPluginInstalled() { + NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); + nodesInfoRequest.addMetric(NodesInfoRequest.Metric.PLUGINS.metricName()); + NodesInfoResponse nodesInfoResponse = OpenSearchIntegTestCase.client().admin().cluster().nodesInfo(nodesInfoRequest).actionGet(); + List pluginInfos = nodesInfoResponse.getNodes() + .stream() + .flatMap( + (Function>) nodeInfo -> nodeInfo.getInfo(PluginsAndModules.class).getPluginInfos().stream() + ) + .collect(Collectors.toList()); + Assert.assertTrue( + pluginInfos.stream().anyMatch(pluginInfo -> pluginInfo.getName().equals("org.opensearch.plugin.insights.QueryInsightsPlugin")) + ); + } + + /** + * Test get top queries when feature disabled + */ + public void testGetTopQueriesWhenFeatureDisabled() { + TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY); + TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet(); + Assert.assertNotEquals(0, response.failures().size()); + Assert.assertEquals( + "Cannot get top n queries for [latency] when it is not enabled.", + response.failures().get(0).getCause().getCause().getMessage() + ); + } + + /** + * Test update top query record when feature enabled + */ + public void testUpdateRecordWhenFeatureDisabledThenEnabled() throws ExecutionException, InterruptedException { + Settings commonSettings = Settings.builder().put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "false").build(); + + logger.info("--> starting nodes for query insight testing"); + List nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build()); + + logger.info("--> waiting for nodes to form a cluster"); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet(); + assertFalse(health.isTimedOut()); + + assertAcked( + prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 2)) + ); + ensureStableCluster(2); + logger.info("--> creating indices for query insight testing"); + for (int i = 0; i < 5; i++) { + IndexResponse response = client().prepareIndex("test_" + i).setId("" + i).setSource("field_" + i, "value_" + i).get(); + assertEquals("CREATED", response.status().toString()); + } + // making search requests to get top queries + for (int i = 0; i < TOTAL_SEARCH_REQUESTS; i++) { + SearchResponse searchResponse = internalCluster().client(randomFrom(nodes)) + .prepareSearch() + .setQuery(QueryBuilders.matchAllQuery()) + .get(); + assertEquals(searchResponse.getFailedShards(), 0); + } + + TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY); + TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet(); + Assert.assertNotEquals(0, response.failures().size()); + Assert.assertEquals( + "Cannot get top n queries for [latency] when it is not enabled.", + response.failures().get(0).getCause().getCause().getMessage() + ); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().persistentSettings( + Settings.builder().put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true").build() + ); + assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get()); + TopQueriesRequest request2 = new TopQueriesRequest(MetricType.LATENCY); + TopQueriesResponse response2 = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request2).actionGet(); + Assert.assertEquals(0, response2.failures().size()); + Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response2.getNodes().size()); + for (int i = 0; i < TOTAL_NUMBER_OF_NODES; i++) { + Assert.assertEquals(0, response2.getNodes().get(i).getTopQueriesRecord().size()); + } + + internalCluster().stopAllNodes(); + } + + /** + * Test get top queries when feature enabled + */ + public void testGetTopQueriesWhenFeatureEnabled() throws InterruptedException { + Settings commonSettings = Settings.builder() + .put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true") + .put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "100") + .put(TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey(), "600s") + .build(); + + logger.info("--> starting nodes for query insight testing"); + List nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build()); + + logger.info("--> waiting for nodes to form a cluster"); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet(); + assertFalse(health.isTimedOut()); + + assertAcked( + prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 2)) + ); + ensureStableCluster(2); + logger.info("--> creating indices for query insight testing"); + for (int i = 0; i < 5; i++) { + IndexResponse response = client().prepareIndex("test_" + i).setId("" + i).setSource("field_" + i, "value_" + i).get(); + assertEquals("CREATED", response.status().toString()); + } + // making search requests to get top queries + for (int i = 0; i < TOTAL_SEARCH_REQUESTS; i++) { + SearchResponse searchResponse = internalCluster().client(randomFrom(nodes)) + .prepareSearch() + .setQuery(QueryBuilders.matchAllQuery()) + .get(); + assertEquals(searchResponse.getFailedShards(), 0); + } + // Sleep to wait for queue drained to top queries store + Thread.sleep(6000); + TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY); + TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet(); + Assert.assertEquals(0, response.failures().size()); + Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size()); + Assert.assertEquals(TOTAL_SEARCH_REQUESTS, response.getNodes().stream().mapToInt(o -> o.getTopQueriesRecord().size()).sum()); + + internalCluster().stopAllNodes(); + } + + /** + * Test get top queries with small top n size + */ + public void testGetTopQueriesWithSmallTopN() throws InterruptedException { + Settings commonSettings = Settings.builder() + .put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true") + .put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "1") + .put(TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey(), "600s") + .build(); + + logger.info("--> starting nodes for query insight testing"); + List nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build()); + + logger.info("--> waiting for nodes to form a cluster"); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet(); + assertFalse(health.isTimedOut()); + + assertAcked( + prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 2)) + ); + ensureStableCluster(2); + logger.info("--> creating indices for query insight testing"); + for (int i = 0; i < 5; i++) { + IndexResponse response = client().prepareIndex("test_" + i).setId("" + i).setSource("field_" + i, "value_" + i).get(); + assertEquals("CREATED", response.status().toString()); + } + // making search requests to get top queries + for (int i = 0; i < TOTAL_SEARCH_REQUESTS; i++) { + SearchResponse searchResponse = internalCluster().client(randomFrom(nodes)) + .prepareSearch() + .setQuery(QueryBuilders.matchAllQuery()) + .get(); + assertEquals(searchResponse.getFailedShards(), 0); + } + Thread.sleep(6000); + TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY); + TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet(); + Assert.assertEquals(0, response.failures().size()); + Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size()); + Assert.assertEquals(2, response.getNodes().stream().mapToInt(o -> o.getTopQueriesRecord().size()).sum()); + + internalCluster().stopAllNodes(); + } + + /** + * Test get top queries with small window size + */ + public void testGetTopQueriesWithSmallWindowSize() throws InterruptedException { + Settings commonSettings = Settings.builder() + .put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true") + .put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "100") + .put(TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey(), "1m") + .build(); + + logger.info("--> starting nodes for query insight testing"); + List nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build()); + + logger.info("--> waiting for nodes to form a cluster"); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet(); + assertFalse(health.isTimedOut()); + + assertAcked( + prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 2)) + ); + ensureStableCluster(2); + logger.info("--> creating indices for query insight testing"); + for (int i = 0; i < 5; i++) { + IndexResponse response = client().prepareIndex("test_" + i).setId("" + i).setSource("field_" + i, "value_" + i).get(); + assertEquals("CREATED", response.status().toString()); + } + // making search requests to get top queries + for (int i = 0; i < TOTAL_SEARCH_REQUESTS; i++) { + SearchResponse searchResponse = internalCluster().client(randomFrom(nodes)) + .prepareSearch() + .setQuery(QueryBuilders.matchAllQuery()) + .get(); + assertEquals(searchResponse.getFailedShards(), 0); + } + + TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY); + TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet(); + Assert.assertEquals(0, response.failures().size()); + Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size()); + Thread.sleep(6000); + internalCluster().stopAllNodes(); + } +} diff --git a/plugins/query-insights/src/javaRestTest/java/org/opensearch/plugin/insights/TopQueriesRestIT.java b/plugins/query-insights/src/javaRestTest/java/org/opensearch/plugin/insights/TopQueriesRestIT.java new file mode 100644 index 0000000000000..57dea6ad8d5ff --- /dev/null +++ b/plugins/query-insights/src/javaRestTest/java/org/opensearch/plugin/insights/TopQueriesRestIT.java @@ -0,0 +1,107 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights; + +import org.opensearch.client.Request; +import org.opensearch.client.Response; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.test.rest.OpenSearchRestTestCase; +import org.junit.Assert; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; + +/** + * Rest Action tests for Query Insights + */ +public class TopQueriesRestIT extends OpenSearchRestTestCase { + + /** + * test Query Insights is installed + * @throws IOException IOException + */ + @SuppressWarnings("unchecked") + public void testQueryInsightsPluginInstalled() throws IOException { + Request request = new Request("GET", "/_cat/plugins?s=component&h=name,component,version,description&format=json"); + Response response = client().performRequest(request); + List pluginsList = JsonXContent.jsonXContent.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + response.getEntity().getContent() + ).list(); + Assert.assertTrue( + pluginsList.stream().map(o -> (Map) o).anyMatch(plugin -> plugin.get("component").equals("query-insights")) + ); + } + + /** + * test enabling top queries + * @throws IOException IOException + */ + public void testTopQueriesResponses() throws IOException { + // Enable Top N Queries feature + Request request = new Request("PUT", "/_cluster/settings"); + request.setJsonEntity(defaultTopQueriesSettings()); + Response response = client().performRequest(request); + + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + + // Create documents for search + request = new Request("POST", "/my-index-0/_doc"); + request.setJsonEntity(createDocumentsBody()); + response = client().performRequest(request); + + Assert.assertEquals(201, response.getStatusLine().getStatusCode()); + + // Do Search + request = new Request("GET", "/my-index-0/_search?size=20&pretty"); + request.setJsonEntity(searchBody()); + response = client().performRequest(request); + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + response = client().performRequest(request); + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + + // Get Top Queries + request = new Request("GET", "/_insights/top_queries?pretty"); + response = client().performRequest(request); + + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + String top_requests = new String(response.getEntity().getContent().readAllBytes(), StandardCharsets.UTF_8); + Assert.assertTrue(top_requests.contains("top_queries")); + Assert.assertEquals(2, top_requests.split("searchType", -1).length - 1); + } + + private String defaultTopQueriesSettings() { + return "{\n" + + " \"persistent\" : {\n" + + " \"search.top_n_queries.latency.enabled\" : \"true\",\n" + + " \"search.top_n_queries.latency.window_size\" : \"600s\",\n" + + " \"search.top_n_queries.latency.top_n_size\" : 5\n" + + " }\n" + + "}"; + } + + private String createDocumentsBody() { + return "{\n" + + " \"@timestamp\": \"2099-11-15T13:12:00\",\n" + + " \"message\": \"this is document 1\",\n" + + " \"user\": {\n" + + " \"id\": \"cyji\"\n" + + " }\n" + + "}"; + } + + private String searchBody() { + return "{}"; + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java new file mode 100644 index 0000000000000..4d7e0d486068a --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java @@ -0,0 +1,116 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights; + +import org.opensearch.action.ActionRequest; +import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.IndexScopedSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.SettingsFilter; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.plugin.insights.core.listener.QueryInsightsListener; +import org.opensearch.plugin.insights.core.service.QueryInsightsService; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction; +import org.opensearch.plugin.insights.rules.resthandler.top_queries.RestTopQueriesAction; +import org.opensearch.plugin.insights.rules.transport.top_queries.TransportTopQueriesAction; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.plugins.ActionPlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.rest.RestController; +import org.opensearch.rest.RestHandler; +import org.opensearch.script.ScriptService; +import org.opensearch.threadpool.ExecutorBuilder; +import org.opensearch.threadpool.ScalingExecutorBuilder; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.watcher.ResourceWatcherService; + +import java.util.Collection; +import java.util.List; +import java.util.function.Supplier; + +/** + * Plugin class for Query Insights. + */ +public class QueryInsightsPlugin extends Plugin implements ActionPlugin { + /** + * Default constructor + */ + public QueryInsightsPlugin() {} + + @Override + public Collection createComponents( + final Client client, + final ClusterService clusterService, + final ThreadPool threadPool, + final ResourceWatcherService resourceWatcherService, + final ScriptService scriptService, + final NamedXContentRegistry xContentRegistry, + final Environment environment, + final NodeEnvironment nodeEnvironment, + final NamedWriteableRegistry namedWriteableRegistry, + final IndexNameExpressionResolver indexNameExpressionResolver, + final Supplier repositoriesServiceSupplier + ) { + // create top n queries service + final QueryInsightsService queryInsightsService = new QueryInsightsService(threadPool); + return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService)); + } + + @Override + public List> getExecutorBuilders(final Settings settings) { + return List.of( + new ScalingExecutorBuilder( + QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR, + 1, + Math.min((OpenSearchExecutors.allocatedProcessors(settings) + 1) / 2, QueryInsightsSettings.MAX_THREAD_COUNT), + TimeValue.timeValueMinutes(5) + ) + ); + } + + @Override + public List getRestHandlers( + final Settings settings, + final RestController restController, + final ClusterSettings clusterSettings, + final IndexScopedSettings indexScopedSettings, + final SettingsFilter settingsFilter, + final IndexNameExpressionResolver indexNameExpressionResolver, + final Supplier nodesInCluster + ) { + return List.of(new RestTopQueriesAction()); + } + + @Override + public List> getActions() { + return List.of(new ActionPlugin.ActionHandler<>(TopQueriesAction.INSTANCE, TransportTopQueriesAction.class)); + } + + @Override + public List> getSettings() { + return List.of( + // Settings for top N queries + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED, + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE, + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE + ); + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java new file mode 100644 index 0000000000000..705273f52a567 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java @@ -0,0 +1,147 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.listener; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.search.SearchPhaseContext; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchRequestContext; +import org.opensearch.action.search.SearchRequestOperationsListener; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.plugin.insights.core.service.QueryInsightsService; +import org.opensearch.plugin.insights.rules.model.Attribute; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE; + +/** + * The listener for query insights services. + * It forwards query-related data to the appropriate query insights stores, + * either for each request or for each phase. + * + * @opensearch.internal + */ +public final class QueryInsightsListener extends SearchRequestOperationsListener { + private static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false")); + + private static final Logger log = LogManager.getLogger(QueryInsightsListener.class); + + private final QueryInsightsService queryInsightsService; + + /** + * Constructor for QueryInsightsListener + * + * @param clusterService The Node's cluster service. + * @param queryInsightsService The topQueriesByLatencyService associated with this listener + */ + @Inject + public QueryInsightsListener(final ClusterService clusterService, final QueryInsightsService queryInsightsService) { + this.queryInsightsService = queryInsightsService; + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_ENABLED, v -> this.setEnableTopQueries(MetricType.LATENCY, v)); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer( + TOP_N_LATENCY_QUERIES_SIZE, + v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).setTopNSize(v), + v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).validateTopNSize(v) + ); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer( + TOP_N_LATENCY_QUERIES_WINDOW_SIZE, + v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).setWindowSize(v), + v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).validateWindowSize(v) + ); + this.setEnableTopQueries(MetricType.LATENCY, clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_ENABLED)); + this.queryInsightsService.getTopQueriesService(MetricType.LATENCY) + .setTopNSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_SIZE)); + this.queryInsightsService.getTopQueriesService(MetricType.LATENCY) + .setWindowSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_WINDOW_SIZE)); + } + + /** + * Enable or disable top queries insights collection for {@link MetricType} + * This function will enable or disable the corresponding listeners + * and query insights services. + * + * @param metricType {@link MetricType} + * @param enabled boolean + */ + public void setEnableTopQueries(final MetricType metricType, final boolean enabled) { + boolean isAllMetricsDisabled = !queryInsightsService.isEnabled(); + this.queryInsightsService.enableCollection(metricType, enabled); + if (!enabled) { + // disable QueryInsightsListener only if all metrics collections are disabled now. + if (!queryInsightsService.isEnabled()) { + super.setEnabled(false); + this.queryInsightsService.stop(); + } + } else { + super.setEnabled(true); + // restart QueryInsightsListener only if none of metrics collections is enabled before. + if (isAllMetricsDisabled) { + this.queryInsightsService.stop(); + this.queryInsightsService.start(); + } + } + + } + + @Override + public boolean isEnabled() { + return super.isEnabled(); + } + + @Override + public void onPhaseStart(SearchPhaseContext context) {} + + @Override + public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} + + @Override + public void onPhaseFailure(SearchPhaseContext context) {} + + @Override + public void onRequestStart(SearchRequestContext searchRequestContext) {} + + @Override + public void onRequestEnd(final SearchPhaseContext context, final SearchRequestContext searchRequestContext) { + final SearchRequest request = context.getRequest(); + try { + Map measurements = new HashMap<>(); + if (queryInsightsService.isCollectionEnabled(MetricType.LATENCY)) { + measurements.put( + MetricType.LATENCY, + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchRequestContext.getAbsoluteStartNanos()) + ); + } + Map attributes = new HashMap<>(); + attributes.put(Attribute.SEARCH_TYPE, request.searchType().toString().toLowerCase(Locale.ROOT)); + attributes.put(Attribute.SOURCE, request.source().toString(FORMAT_PARAMS)); + attributes.put(Attribute.TOTAL_SHARDS, context.getNumShards()); + attributes.put(Attribute.INDICES, request.indices()); + attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap()); + SearchQueryRecord record = new SearchQueryRecord(request.getOrCreateAbsoluteStartMillis(), measurements, attributes); + queryInsightsService.addRecord(record); + } catch (Exception e) { + log.error(String.format(Locale.ROOT, "fail to ingest query insight data, error: %s", e)); + } + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/package-info.java new file mode 100644 index 0000000000000..3cb9cacf7fd1c --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Listeners for Query Insights + */ +package org.opensearch.plugin.insights.core.listener; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java new file mode 100644 index 0000000000000..525ca0d4a3d33 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java @@ -0,0 +1,180 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.service; + +import org.opensearch.common.inject.Inject; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.threadpool.Scheduler; +import org.opensearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * Service responsible for gathering, analyzing, storing and exporting + * information related to search queries + * + * @opensearch.internal + */ +public class QueryInsightsService extends AbstractLifecycleComponent { + /** + * The internal OpenSearch thread pool that execute async processing and exporting tasks + */ + private final ThreadPool threadPool; + + /** + * Services to capture top n queries for different metric types + */ + private final Map topQueriesServices; + + /** + * Flags for enabling insight data collection for different metric types + */ + private final Map enableCollect; + + /** + * The internal thread-safe queue to ingest the search query data and subsequently forward to processors + */ + private final LinkedBlockingQueue queryRecordsQueue; + + /** + * Holds a reference to delayed operation {@link Scheduler.Cancellable} so it can be cancelled when + * the service closed concurrently. + */ + protected volatile Scheduler.Cancellable scheduledFuture; + + /** + * Constructor of the QueryInsightsService + * + * @param threadPool The OpenSearch thread pool to run async tasks + */ + @Inject + public QueryInsightsService(final ThreadPool threadPool) { + enableCollect = new HashMap<>(); + queryRecordsQueue = new LinkedBlockingQueue<>(QueryInsightsSettings.QUERY_RECORD_QUEUE_CAPACITY); + topQueriesServices = new HashMap<>(); + for (MetricType metricType : MetricType.allMetricTypes()) { + enableCollect.put(metricType, false); + topQueriesServices.put(metricType, new TopQueriesService(metricType)); + } + this.threadPool = threadPool; + } + + /** + * Ingest the query data into in-memory stores + * + * @param record the record to ingest + */ + public boolean addRecord(final SearchQueryRecord record) { + boolean shouldAdd = false; + for (Map.Entry entry : topQueriesServices.entrySet()) { + if (!enableCollect.get(entry.getKey())) { + continue; + } + List currentSnapshot = entry.getValue().getTopQueriesCurrentSnapshot(); + // skip add to top N queries store if the incoming record is smaller than the Nth record + if (currentSnapshot.size() < entry.getValue().getTopNSize() + || SearchQueryRecord.compare(record, currentSnapshot.get(0), entry.getKey()) > 0) { + shouldAdd = true; + break; + } + } + if (shouldAdd) { + return queryRecordsQueue.offer(record); + } + return false; + } + + /** + * Drain the queryRecordsQueue into internal stores and services + */ + public void drainRecords() { + final List records = new ArrayList<>(); + queryRecordsQueue.drainTo(records); + records.sort(Comparator.comparingLong(SearchQueryRecord::getTimestamp)); + for (MetricType metricType : MetricType.allMetricTypes()) { + if (enableCollect.get(metricType)) { + // ingest the records into topQueriesService + topQueriesServices.get(metricType).consumeRecords(records); + } + } + } + + /** + * Get the top queries service based on metricType + * @param metricType {@link MetricType} + * @return {@link TopQueriesService} + */ + public TopQueriesService getTopQueriesService(final MetricType metricType) { + return topQueriesServices.get(metricType); + } + + /** + * Set flag to enable or disable Query Insights data collection + * + * @param metricType {@link MetricType} + * @param enable Flag to enable or disable Query Insights data collection + */ + public void enableCollection(final MetricType metricType, final boolean enable) { + this.enableCollect.put(metricType, enable); + this.topQueriesServices.get(metricType).setEnabled(enable); + } + + /** + * Get if the Query Insights data collection is enabled for a MetricType + * + * @param metricType {@link MetricType} + * @return if the Query Insights data collection is enabled + */ + public boolean isCollectionEnabled(final MetricType metricType) { + return this.enableCollect.get(metricType); + } + + /** + * Check if query insights service is enabled + * + * @return if query insights service is enabled + */ + public boolean isEnabled() { + for (MetricType t : MetricType.allMetricTypes()) { + if (isCollectionEnabled(t)) { + return true; + } + } + return false; + } + + @Override + protected void doStart() { + if (isEnabled()) { + scheduledFuture = threadPool.scheduleWithFixedDelay( + this::drainRecords, + QueryInsightsSettings.QUERY_RECORD_QUEUE_DRAIN_INTERVAL, + QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR + ); + } + } + + @Override + protected void doStop() { + if (scheduledFuture != null) { + scheduledFuture.cancel(); + } + } + + @Override + protected void doClose() {} +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java new file mode 100644 index 0000000000000..d2c30cbdf98e7 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java @@ -0,0 +1,282 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.service; + +import org.opensearch.common.unit.TimeValue; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.PriorityQueue; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Service responsible for gathering and storing top N queries + * with high latency or resource usage + * + * @opensearch.internal + */ +public class TopQueriesService { + private boolean enabled; + /** + * The metric type to measure top n queries + */ + private final MetricType metricType; + private int topNSize; + /** + * The window size to keep the top n queries + */ + private TimeValue windowSize; + /** + * The current window start timestamp + */ + private long windowStart; + /** + * The internal thread-safe store that holds the top n queries insight data + */ + private final PriorityQueue topQueriesStore; + + /** + * The AtomicReference of a snapshot of the current window top queries for getters to consume + */ + private final AtomicReference> topQueriesCurrentSnapshot; + + /** + * The AtomicReference of a snapshot of the last window top queries for getters to consume + */ + private final AtomicReference> topQueriesHistorySnapshot; + + TopQueriesService(final MetricType metricType) { + this.enabled = false; + this.metricType = metricType; + this.topNSize = QueryInsightsSettings.DEFAULT_TOP_N_SIZE; + this.windowSize = QueryInsightsSettings.DEFAULT_WINDOW_SIZE; + this.windowStart = -1L; + topQueriesStore = new PriorityQueue<>(topNSize, (a, b) -> SearchQueryRecord.compare(a, b, metricType)); + topQueriesCurrentSnapshot = new AtomicReference<>(new ArrayList<>()); + topQueriesHistorySnapshot = new AtomicReference<>(new ArrayList<>()); + } + + /** + * Set the top N size for TopQueriesService service. + * + * @param topNSize the top N size to set + */ + public void setTopNSize(final int topNSize) { + this.topNSize = topNSize; + } + + /** + * Get the current configured top n size + * + * @return top n size + */ + public int getTopNSize() { + return topNSize; + } + + /** + * Validate the top N size based on the internal constrains + * + * @param size the wanted top N size + */ + public void validateTopNSize(final int size) { + if (size > QueryInsightsSettings.MAX_N_SIZE) { + throw new IllegalArgumentException( + "Top N size setting for [" + + metricType + + "]" + + " should be smaller than max top N size [" + + QueryInsightsSettings.MAX_N_SIZE + + "was (" + + size + + " > " + + QueryInsightsSettings.MAX_N_SIZE + + ")" + ); + } + } + + /** + * Set enable flag for the service + * @param enabled boolean + */ + public void setEnabled(final boolean enabled) { + this.enabled = enabled; + } + + /** + * Set the window size for top N queries service + * + * @param windowSize window size to set + */ + public void setWindowSize(final TimeValue windowSize) { + this.windowSize = windowSize; + // reset the window start time since the window size has changed + this.windowStart = -1L; + } + + /** + * Validate if the window size is valid, based on internal constrains. + * + * @param windowSize the window size to validate + */ + public void validateWindowSize(final TimeValue windowSize) { + if (windowSize.compareTo(QueryInsightsSettings.MAX_WINDOW_SIZE) > 0 + || windowSize.compareTo(QueryInsightsSettings.MIN_WINDOW_SIZE) < 0) { + throw new IllegalArgumentException( + "Window size setting for [" + + metricType + + "]" + + " should be between [" + + QueryInsightsSettings.MIN_WINDOW_SIZE + + "," + + QueryInsightsSettings.MAX_WINDOW_SIZE + + "]" + + "was (" + + windowSize + + ")" + ); + } + if (!(QueryInsightsSettings.VALID_WINDOW_SIZES_IN_MINUTES.contains(windowSize) || windowSize.getMinutes() % 60 == 0)) { + throw new IllegalArgumentException( + "Window size setting for [" + + metricType + + "]" + + " should be multiple of 1 hour, or one of " + + QueryInsightsSettings.VALID_WINDOW_SIZES_IN_MINUTES + + ", was (" + + windowSize + + ")" + ); + } + } + + /** + * Get all top queries records that are in the current top n queries store + * Optionally include top N records from the last window. + * + * By default, return the records in sorted order. + * + * @param includeLastWindow if the top N queries from the last window should be included + * @return List of the records that are in the query insight store + * @throws IllegalArgumentException if query insight is disabled in the cluster + */ + public List getTopQueriesRecords(final boolean includeLastWindow) throws IllegalArgumentException { + if (!enabled) { + throw new IllegalArgumentException( + String.format(Locale.ROOT, "Cannot get top n queries for [%s] when it is not enabled.", metricType.toString()) + ); + } + // read from window snapshots + final List queries = new ArrayList<>(topQueriesCurrentSnapshot.get()); + if (includeLastWindow) { + queries.addAll(topQueriesHistorySnapshot.get()); + } + return Stream.of(queries) + .flatMap(Collection::stream) + .sorted((a, b) -> SearchQueryRecord.compare(a, b, metricType) * -1) + .collect(Collectors.toList()); + } + + /** + * Consume records to top queries stores + * + * @param records a list of {@link SearchQueryRecord} + */ + void consumeRecords(final List records) { + final long currentWindowStart = calculateWindowStart(System.currentTimeMillis()); + List recordsInLastWindow = new ArrayList<>(); + List recordsInThisWindow = new ArrayList<>(); + for (SearchQueryRecord record : records) { + // skip the records that does not have the corresponding measurement + if (!record.getMeasurements().containsKey(metricType)) { + continue; + } + if (record.getTimestamp() < currentWindowStart) { + recordsInLastWindow.add(record); + } else { + recordsInThisWindow.add(record); + } + } + // add records in last window, if there are any, to the top n store + addToTopNStore(recordsInLastWindow); + // rotate window and reset window start if necessary + rotateWindowIfNecessary(currentWindowStart); + // add records in current window, if there are any, to the top n store + addToTopNStore(recordsInThisWindow); + // update the current window snapshot for getters to consume + final List newSnapShot = new ArrayList<>(topQueriesStore); + newSnapShot.sort((a, b) -> SearchQueryRecord.compare(a, b, metricType)); + topQueriesCurrentSnapshot.set(newSnapShot); + } + + private void addToTopNStore(final List records) { + topQueriesStore.addAll(records); + // remove top elements for fix sizing priority queue + while (topQueriesStore.size() > topNSize) { + topQueriesStore.poll(); + } + } + + /** + * Reset the current window and rotate the data to history snapshot for top n queries, + * This function would be invoked zero time or only once in each consumeRecords call + * + * @param newWindowStart the new windowStart to set to + */ + private void rotateWindowIfNecessary(final long newWindowStart) { + // reset window if the current window is outdated + if (windowStart < newWindowStart) { + final List history = new ArrayList<>(); + // rotate the current window to history store only if the data belongs to the last window + if (windowStart == newWindowStart - windowSize.getMillis()) { + history.addAll(topQueriesStore); + } + topQueriesHistorySnapshot.set(history); + topQueriesStore.clear(); + topQueriesCurrentSnapshot.set(new ArrayList<>()); + windowStart = newWindowStart; + } + } + + /** + * Calculate the window start for the given timestamp + * + * @param timestamp the given timestamp to calculate window start + */ + private long calculateWindowStart(final long timestamp) { + final LocalDateTime currentTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.of("UTC")); + LocalDateTime windowStartTime = currentTime.truncatedTo(ChronoUnit.HOURS); + while (!windowStartTime.plusMinutes(windowSize.getMinutes()).isAfter(currentTime)) { + windowStartTime = windowStartTime.plusMinutes(windowSize.getMinutes()); + } + return windowStartTime.toInstant(ZoneOffset.UTC).getEpochSecond() * 1000; + } + + /** + * Get the current top queries snapshot from the AtomicReference. + * + * @return a list of {@link SearchQueryRecord} + */ + public List getTopQueriesCurrentSnapshot() { + return topQueriesCurrentSnapshot.get(); + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/package-info.java new file mode 100644 index 0000000000000..5068f28234f6d --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Service Classes for Query Insights + */ +package org.opensearch.plugin.insights.core.service; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/package-info.java new file mode 100644 index 0000000000000..04d1f9bfff7e1 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Base Package of Query Insights + */ +package org.opensearch.plugin.insights; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/package-info.java new file mode 100644 index 0000000000000..9b6b5856f7d27 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Transport Actions, Requests and Responses for Query Insights + */ +package org.opensearch.plugin.insights.rules.action; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueries.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueries.java new file mode 100644 index 0000000000000..26cff82aae52e --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueries.java @@ -0,0 +1,77 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.action.top_queries; + +import org.opensearch.action.support.nodes.BaseNodeResponse; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; + +import java.io.IOException; +import java.util.List; + +/** + * Holds all top queries records by resource usage or latency on a node + * Mainly used in the top N queries node response workflow. + * + * @opensearch.internal + */ +public class TopQueries extends BaseNodeResponse implements ToXContentObject { + /** The store to keep the top queries records */ + private final List topQueriesRecords; + + /** + * Create the TopQueries Object from StreamInput + * @param in A {@link StreamInput} object. + * @throws IOException IOException + */ + public TopQueries(final StreamInput in) throws IOException { + super(in); + topQueriesRecords = in.readList(SearchQueryRecord::new); + } + + /** + * Create the TopQueries Object + * @param node A node that is part of the cluster. + * @param searchQueryRecords A list of SearchQueryRecord associated in this TopQueries. + */ + public TopQueries(final DiscoveryNode node, final List searchQueryRecords) { + super(node); + topQueriesRecords = searchQueryRecords; + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + if (topQueriesRecords != null) { + for (SearchQueryRecord record : topQueriesRecords) { + record.toXContent(builder, params); + } + } + return builder; + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + out.writeList(topQueriesRecords); + + } + + /** + * Get all top queries records + * + * @return the top queries records in this node response + */ + public List getTopQueriesRecord() { + return topQueriesRecords; + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesAction.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesAction.java new file mode 100644 index 0000000000000..b8ed69fa5692b --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesAction.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.action.top_queries; + +import org.opensearch.action.ActionType; + +/** + * Transport action for cluster/node level top queries information. + * + * @opensearch.internal + */ +public class TopQueriesAction extends ActionType { + + /** + * The TopQueriesAction Instance. + */ + public static final TopQueriesAction INSTANCE = new TopQueriesAction(); + /** + * The name of this Action + */ + public static final String NAME = "cluster:admin/opensearch/insights/top_queries"; + + private TopQueriesAction() { + super(NAME, TopQueriesResponse::new); + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequest.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequest.java new file mode 100644 index 0000000000000..3bdff2c403161 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequest.java @@ -0,0 +1,62 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.action.top_queries; + +import org.opensearch.action.support.nodes.BaseNodesRequest; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.plugin.insights.rules.model.MetricType; + +import java.io.IOException; + +/** + * A request to get cluster/node level top queries information. + * + * @opensearch.internal + */ +public class TopQueriesRequest extends BaseNodesRequest { + + final MetricType metricType; + + /** + * Constructor for TopQueriesRequest + * + * @param in A {@link StreamInput} object. + * @throws IOException if the stream cannot be deserialized. + */ + public TopQueriesRequest(final StreamInput in) throws IOException { + super(in); + this.metricType = MetricType.readFromStream(in); + } + + /** + * Get top queries from nodes based on the nodes ids specified. + * If none are passed, cluster level top queries will be returned. + * + * @param metricType {@link MetricType} + * @param nodesIds the nodeIds specified in the request + */ + public TopQueriesRequest(final MetricType metricType, final String... nodesIds) { + super(nodesIds); + this.metricType = metricType; + } + + /** + * Get the type of requested metrics + */ + public MetricType getMetricType() { + return metricType; + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(metricType.toString()); + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponse.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponse.java new file mode 100644 index 0000000000000..2e66bb7f77baf --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponse.java @@ -0,0 +1,143 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.action.top_queries; + +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.support.nodes.BaseNodesResponse; +import org.opensearch.cluster.ClusterName; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.plugin.insights.rules.model.Attribute; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Transport response for cluster/node level top queries information. + * + * @opensearch.internal + */ +public class TopQueriesResponse extends BaseNodesResponse implements ToXContentFragment { + + private static final String CLUSTER_LEVEL_RESULTS_KEY = "top_queries"; + private final MetricType metricType; + private final int top_n_size; + + /** + * Constructor for TopQueriesResponse. + * + * @param in A {@link StreamInput} object. + * @throws IOException if the stream cannot be deserialized. + */ + public TopQueriesResponse(final StreamInput in) throws IOException { + super(in); + top_n_size = in.readInt(); + metricType = in.readEnum(MetricType.class); + } + + /** + * Constructor for TopQueriesResponse + * + * @param clusterName The current cluster name + * @param nodes A list that contains top queries results from all nodes + * @param failures A list that contains FailedNodeException + * @param top_n_size The top N size to return to the user + * @param metricType the {@link MetricType} to be returned in this response + */ + public TopQueriesResponse( + final ClusterName clusterName, + final List nodes, + final List failures, + final int top_n_size, + final MetricType metricType + ) { + super(clusterName, nodes, failures); + this.top_n_size = top_n_size; + this.metricType = metricType; + } + + @Override + protected List readNodesFrom(final StreamInput in) throws IOException { + return in.readList(TopQueries::new); + } + + @Override + protected void writeNodesTo(final StreamOutput out, final List nodes) throws IOException { + out.writeList(nodes); + out.writeLong(top_n_size); + out.writeEnum(metricType); + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + final List results = getNodes(); + postProcess(results); + builder.startObject(); + toClusterLevelResult(builder, params, results); + return builder.endObject(); + } + + @Override + public String toString() { + try { + final XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint(); + builder.startObject(); + this.toXContent(builder, EMPTY_PARAMS); + builder.endObject(); + return builder.toString(); + } catch (IOException e) { + return "{ \"error\" : \"" + e.getMessage() + "\"}"; + } + } + + /** + * Post process the top queries results to add customized attributes + * + * @param results the top queries results + */ + private void postProcess(final List results) { + for (TopQueries topQueries : results) { + final String nodeId = topQueries.getNode().getId(); + for (SearchQueryRecord record : topQueries.getTopQueriesRecord()) { + record.addAttribute(Attribute.NODE_ID, nodeId); + } + } + } + + /** + * Merge top n queries results from nodes into cluster level results in XContent format. + * + * @param builder XContent builder + * @param params serialization parameters + * @param results top queries results from all nodes + * @throws IOException if an error occurs + */ + private void toClusterLevelResult(final XContentBuilder builder, final Params params, final List results) + throws IOException { + final List all_records = results.stream() + .map(TopQueries::getTopQueriesRecord) + .flatMap(Collection::stream) + .sorted((a, b) -> SearchQueryRecord.compare(a, b, metricType) * -1) + .limit(top_n_size) + .collect(Collectors.toList()); + builder.startArray(CLUSTER_LEVEL_RESULTS_KEY); + for (SearchQueryRecord record : all_records) { + record.toXContent(builder, params); + } + builder.endArray(); + } + +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/package-info.java new file mode 100644 index 0000000000000..3cc7900e5ce7d --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Transport Actions, Requests and Responses for Top N Queries + */ +package org.opensearch.plugin.insights.rules.action.top_queries; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java new file mode 100644 index 0000000000000..c1d17edf9ff14 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java @@ -0,0 +1,74 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.model; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Locale; + +/** + * Valid attributes for a search query record + * + * @opensearch.internal + */ +public enum Attribute { + /** + * The search query type + */ + SEARCH_TYPE, + /** + * The search query source + */ + SOURCE, + /** + * Total shards queried + */ + TOTAL_SHARDS, + /** + * The indices involved + */ + INDICES, + /** + * The per phase level latency map for a search query + */ + PHASE_LATENCY_MAP, + /** + * The node id for this request + */ + NODE_ID; + + /** + * Read an Attribute from a StreamInput + * + * @param in the StreamInput to read from + * @return Attribute + * @throws IOException IOException + */ + static Attribute readFromStream(final StreamInput in) throws IOException { + return Attribute.valueOf(in.readString().toUpperCase(Locale.ROOT)); + } + + /** + * Write Attribute to a StreamOutput + * + * @param out the StreamOutput to write + * @param attribute the Attribute to write + * @throws IOException IOException + */ + static void writeTo(final StreamOutput out, final Attribute attribute) throws IOException { + out.writeString(attribute.toString()); + } + + @Override + public String toString() { + return this.name().toLowerCase(Locale.ROOT); + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/MetricType.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/MetricType.java new file mode 100644 index 0000000000000..cdd090fbf4804 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/MetricType.java @@ -0,0 +1,121 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.model; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Locale; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Valid metric types for a search query record + * + * @opensearch.internal + */ +public enum MetricType implements Comparator { + /** + * Latency metric type + */ + LATENCY, + /** + * CPU usage metric type + */ + CPU, + /** + * JVM heap usage metric type + */ + JVM; + + /** + * Read a MetricType from a StreamInput + * + * @param in the StreamInput to read from + * @return MetricType + * @throws IOException IOException + */ + public static MetricType readFromStream(final StreamInput in) throws IOException { + return fromString(in.readString()); + } + + /** + * Create MetricType from String + * + * @param metricType the String representation of MetricType + * @return MetricType + */ + public static MetricType fromString(final String metricType) { + return MetricType.valueOf(metricType.toUpperCase(Locale.ROOT)); + } + + /** + * Write MetricType to a StreamOutput + * + * @param out the StreamOutput to write + * @param metricType the MetricType to write + * @throws IOException IOException + */ + static void writeTo(final StreamOutput out, final MetricType metricType) throws IOException { + out.writeString(metricType.toString()); + } + + @Override + public String toString() { + return this.name().toLowerCase(Locale.ROOT); + } + + /** + * Get all valid metrics + * + * @return A set of String that contains all valid metrics + */ + public static Set allMetricTypes() { + return Arrays.stream(values()).collect(Collectors.toSet()); + } + + /** + * Compare two numbers based on the metric type + * + * @param a the first Number to be compared. + * @param b the second Number to be compared. + * @return a negative integer, zero, or a positive integer as the first argument is less than, equal to, or greater than the second + */ + public int compare(final Number a, final Number b) { + switch (this) { + case LATENCY: + return Long.compare(a.longValue(), b.longValue()); + case JVM: + case CPU: + return Double.compare(a.doubleValue(), b.doubleValue()); + } + return -1; + } + + /** + * Parse a value with the correct type based on MetricType + * + * @param o the generic object to parse + * @return {@link Number} + */ + Number parseValue(final Object o) { + switch (this) { + case LATENCY: + return (Long) o; + case JVM: + case CPU: + return (Double) o; + default: + return (Number) o; + } + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java new file mode 100644 index 0000000000000..060711edb5580 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java @@ -0,0 +1,176 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.model; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * SearchQueryRecord represents a minimal atomic record stored in the Query Insight Framework, + * which contains extensive information related to a search query. + * + * @opensearch.internal + */ +public class SearchQueryRecord implements ToXContentObject, Writeable { + private final long timestamp; + private final Map measurements; + private final Map attributes; + + /** + * Constructor of SearchQueryRecord + * + * @param in the StreamInput to read the SearchQueryRecord from + * @throws IOException IOException + * @throws ClassCastException ClassCastException + */ + public SearchQueryRecord(final StreamInput in) throws IOException, ClassCastException { + this.timestamp = in.readLong(); + measurements = new HashMap<>(); + in.readMap(MetricType::readFromStream, StreamInput::readGenericValue) + .forEach(((metricType, o) -> measurements.put(metricType, metricType.parseValue(o)))); + this.attributes = in.readMap(Attribute::readFromStream, StreamInput::readGenericValue); + } + + /** + * Constructor of SearchQueryRecord + * + * @param timestamp The timestamp of the query. + * @param measurements A list of Measurement associated with this query + * @param attributes A list of Attributes associated with this query + */ + public SearchQueryRecord(final long timestamp, Map measurements, final Map attributes) { + if (measurements == null) { + throw new IllegalArgumentException("Measurements cannot be null"); + } + this.measurements = measurements; + this.attributes = attributes; + this.timestamp = timestamp; + } + + /** + * Returns the observation time of the metric. + * + * @return the observation time in milliseconds + */ + public long getTimestamp() { + return timestamp; + } + + /** + * Returns the measurement associated with the specified name. + * + * @param name the name of the measurement + * @return the measurement object, or null if not found + */ + public Number getMeasurement(final MetricType name) { + return measurements.get(name); + } + + /** + * Returns a map of all the measurements associated with the metric. + * + * @return a map of measurement names to measurement objects + */ + public Map getMeasurements() { + return measurements; + } + + /** + * Returns a map of the attributes associated with the metric. + * + * @return a map of attribute keys to attribute values + */ + public Map getAttributes() { + return attributes; + } + + /** + * Add an attribute to this record + * + * @param attribute attribute to add + * @param value the value associated with the attribute + */ + public void addAttribute(final Attribute attribute, final Object value) { + attributes.put(attribute, value); + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final ToXContent.Params params) throws IOException { + builder.startObject(); + builder.field("timestamp", timestamp); + for (Map.Entry entry : attributes.entrySet()) { + builder.field(entry.getKey().toString(), entry.getValue()); + } + for (Map.Entry entry : measurements.entrySet()) { + builder.field(entry.getKey().toString(), entry.getValue()); + } + return builder.endObject(); + } + + /** + * Write a SearchQueryRecord to a StreamOutput + * + * @param out the StreamOutput to write + * @throws IOException IOException + */ + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeLong(timestamp); + out.writeMap(measurements, (stream, metricType) -> MetricType.writeTo(out, metricType), StreamOutput::writeGenericValue); + out.writeMap(attributes, (stream, attribute) -> Attribute.writeTo(out, attribute), StreamOutput::writeGenericValue); + } + + /** + * Compare two SearchQueryRecord, based on the given MetricType + * + * @param a the first SearchQueryRecord to compare + * @param b the second SearchQueryRecord to compare + * @param metricType the MetricType to compare on + * @return 0 if the first SearchQueryRecord is numerically equal to the second SearchQueryRecord; + * -1 if the first SearchQueryRecord is numerically less than the second SearchQueryRecord; + * 1 if the first SearchQueryRecord is numerically greater than the second SearchQueryRecord. + */ + public static int compare(final SearchQueryRecord a, final SearchQueryRecord b, final MetricType metricType) { + return metricType.compare(a.getMeasurement(metricType), b.getMeasurement(metricType)); + } + + /** + * Check if a SearchQueryRecord is deep equal to another record + * + * @param o the other SearchQueryRecord record + * @return true if two records are deep equal, false otherwise. + */ + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (!(o instanceof SearchQueryRecord)) { + return false; + } + final SearchQueryRecord other = (SearchQueryRecord) o; + return timestamp == other.getTimestamp() + && measurements.equals(other.getMeasurements()) + && attributes.size() == other.getAttributes().size(); + } + + @Override + public int hashCode() { + return Objects.hash(timestamp, measurements, attributes); + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/package-info.java new file mode 100644 index 0000000000000..c59ec1550f54b --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Data Models for Query Insight Records + */ +package org.opensearch.plugin.insights.rules.model; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/package-info.java new file mode 100644 index 0000000000000..3787f05f65552 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Rest Handlers for Query Insights + */ +package org.opensearch.plugin.insights.rules.resthandler; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesAction.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesAction.java new file mode 100644 index 0000000000000..6aa511c626ab1 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesAction.java @@ -0,0 +1,99 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.resthandler.top_queries; + +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.Strings; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesRequest; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesResponse; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestChannel; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestResponse; +import org.opensearch.rest.action.RestResponseListener; + +import java.util.List; +import java.util.Locale; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_QUERIES_BASE_URI; +import static org.opensearch.rest.RestRequest.Method.GET; + +/** + * Rest action to get Top N queries by certain metric type + * + * @opensearch.api + */ +public class RestTopQueriesAction extends BaseRestHandler { + /** The metric types that are allowed in top N queries */ + static final Set ALLOWED_METRICS = MetricType.allMetricTypes().stream().map(MetricType::toString).collect(Collectors.toSet()); + + /** + * Constructor for RestTopQueriesAction + */ + public RestTopQueriesAction() {} + + @Override + public List routes() { + return List.of( + new Route(GET, TOP_QUERIES_BASE_URI), + new Route(GET, String.format(Locale.ROOT, "%s/{nodeId}", TOP_QUERIES_BASE_URI)) + ); + } + + @Override + public String getName() { + return "query_insights_top_queries_action"; + } + + @Override + public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) { + final TopQueriesRequest topQueriesRequest = prepareRequest(request); + topQueriesRequest.timeout(request.param("timeout")); + + return channel -> client.execute(TopQueriesAction.INSTANCE, topQueriesRequest, topQueriesResponse(channel)); + } + + static TopQueriesRequest prepareRequest(final RestRequest request) { + final String[] nodesIds = Strings.splitStringByCommaToArray(request.param("nodeId")); + final String metricType = request.param("type", MetricType.LATENCY.toString()); + if (!ALLOWED_METRICS.contains(metricType)) { + throw new IllegalArgumentException( + String.format(Locale.ROOT, "request [%s] contains invalid metric type [%s]", request.path(), metricType) + ); + } + return new TopQueriesRequest(MetricType.fromString(metricType), nodesIds); + } + + @Override + protected Set responseParams() { + return Settings.FORMAT_PARAMS; + } + + @Override + public boolean canTripCircuitBreaker() { + return false; + } + + private RestResponseListener topQueriesResponse(final RestChannel channel) { + return new RestResponseListener<>(channel) { + @Override + public RestResponse buildResponse(final TopQueriesResponse response) throws Exception { + return new BytesRestResponse(RestStatus.OK, response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS)); + } + }; + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/package-info.java new file mode 100644 index 0000000000000..087cf7d765f8c --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Rest Handlers for Top N Queries + */ +package org.opensearch.plugin.insights.rules.resthandler.top_queries; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/package-info.java new file mode 100644 index 0000000000000..f3a1c70b9af57 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Transport Actions for Query Insights. + */ +package org.opensearch.plugin.insights.rules.transport; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesAction.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesAction.java new file mode 100644 index 0000000000000..ddf614211bc41 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesAction.java @@ -0,0 +1,155 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.transport.top_queries; + +import org.opensearch.OpenSearchException; +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.nodes.TransportNodesAction; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.plugin.insights.core.service.QueryInsightsService; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueries; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesRequest; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesResponse; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; + +/** + * Transport action for cluster/node level top queries information. + * + * @opensearch.internal + */ +public class TransportTopQueriesAction extends TransportNodesAction< + TopQueriesRequest, + TopQueriesResponse, + TransportTopQueriesAction.NodeRequest, + TopQueries> { + + private final QueryInsightsService queryInsightsService; + + /** + * Create the TransportTopQueriesAction Object + + * @param threadPool The OpenSearch thread pool to run async tasks + * @param clusterService The clusterService of this node + * @param transportService The TransportService of this node + * @param queryInsightsService The topQueriesByLatencyService associated with this Transport Action + * @param actionFilters the action filters + */ + @Inject + public TransportTopQueriesAction( + final ThreadPool threadPool, + final ClusterService clusterService, + final TransportService transportService, + final QueryInsightsService queryInsightsService, + final ActionFilters actionFilters + ) { + super( + TopQueriesAction.NAME, + threadPool, + clusterService, + transportService, + actionFilters, + TopQueriesRequest::new, + NodeRequest::new, + ThreadPool.Names.GENERIC, + TopQueries.class + ); + this.queryInsightsService = queryInsightsService; + } + + @Override + protected TopQueriesResponse newResponse( + final TopQueriesRequest topQueriesRequest, + final List responses, + final List failures + ) { + if (topQueriesRequest.getMetricType() == MetricType.LATENCY) { + return new TopQueriesResponse( + clusterService.getClusterName(), + responses, + failures, + clusterService.getClusterSettings().get(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE), + MetricType.LATENCY + ); + } else { + throw new OpenSearchException(String.format(Locale.ROOT, "invalid metric type %s", topQueriesRequest.getMetricType())); + } + } + + @Override + protected NodeRequest newNodeRequest(final TopQueriesRequest request) { + return new NodeRequest(request); + } + + @Override + protected TopQueries newNodeResponse(final StreamInput in) throws IOException { + return new TopQueries(in); + } + + @Override + protected TopQueries nodeOperation(final NodeRequest nodeRequest) { + final TopQueriesRequest request = nodeRequest.request; + if (request.getMetricType() == MetricType.LATENCY) { + return new TopQueries( + clusterService.localNode(), + queryInsightsService.getTopQueriesService(MetricType.LATENCY).getTopQueriesRecords(true) + ); + } else { + throw new OpenSearchException(String.format(Locale.ROOT, "invalid metric type %s", request.getMetricType())); + } + + } + + /** + * Inner Node Top Queries Request + * + * @opensearch.internal + */ + public static class NodeRequest extends TransportRequest { + + final TopQueriesRequest request; + + /** + * Create the NodeResponse object from StreamInput + * + * @param in the StreamInput to read the object + * @throws IOException IOException + */ + public NodeRequest(StreamInput in) throws IOException { + super(in); + request = new TopQueriesRequest(in); + } + + /** + * Create the NodeResponse object from a TopQueriesRequest + * @param request the TopQueriesRequest object + */ + public NodeRequest(final TopQueriesRequest request) { + this.request = request; + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + request.writeTo(out); + } + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/package-info.java new file mode 100644 index 0000000000000..54da0980deff8 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Transport Actions for Top N Queries. + */ +package org.opensearch.plugin.insights.rules.transport.top_queries; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java new file mode 100644 index 0000000000000..52cc1fbde790f --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java @@ -0,0 +1,116 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.settings; + +import org.opensearch.common.settings.Setting; +import org.opensearch.common.unit.TimeValue; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * Settings for Query Insights Plugin + * + * @opensearch.api + * @opensearch.experimental + */ +public class QueryInsightsSettings { + /** + * Executors settings + */ + public static final String QUERY_INSIGHTS_EXECUTOR = "query_insights_executor"; + /** + * Max number of thread + */ + public static final int MAX_THREAD_COUNT = 5; + /** + * Max number of requests for the consumer to collect at one time + */ + public static final int QUERY_RECORD_QUEUE_CAPACITY = 1000; + /** + * Time interval for record queue consumer to run + */ + public static final TimeValue QUERY_RECORD_QUEUE_DRAIN_INTERVAL = new TimeValue(5, TimeUnit.SECONDS); + /** + * Default Values and Settings + */ + public static final TimeValue MAX_WINDOW_SIZE = new TimeValue(1, TimeUnit.DAYS); + /** + * Minimal window size + */ + public static final TimeValue MIN_WINDOW_SIZE = new TimeValue(1, TimeUnit.MINUTES); + /** + * Valid window sizes + */ + public static final Set VALID_WINDOW_SIZES_IN_MINUTES = new HashSet<>( + Arrays.asList( + new TimeValue(1, TimeUnit.MINUTES), + new TimeValue(5, TimeUnit.MINUTES), + new TimeValue(10, TimeUnit.MINUTES), + new TimeValue(30, TimeUnit.MINUTES) + ) + ); + + /** Default N size for top N queries */ + public static final int MAX_N_SIZE = 100; + /** Default window size in seconds to keep the top N queries with latency data in query insight store */ + public static final TimeValue DEFAULT_WINDOW_SIZE = new TimeValue(60, TimeUnit.SECONDS); + /** Default top N size to keep the data in query insight store */ + public static final int DEFAULT_TOP_N_SIZE = 3; + /** + * Query Insights base uri + */ + public static final String PLUGINS_BASE_URI = "/_insights"; + + /** + * Settings for Top Queries + * + */ + public static final String TOP_QUERIES_BASE_URI = PLUGINS_BASE_URI + "/top_queries"; + /** Default prefix for top N queries feature */ + public static final String TOP_N_QUERIES_SETTING_PREFIX = "search.insights.top_queries"; + /** Default prefix for top N queries by latency feature */ + public static final String TOP_N_LATENCY_QUERIES_PREFIX = TOP_N_QUERIES_SETTING_PREFIX + ".latency"; + /** + * Boolean setting for enabling top queries by latency. + */ + public static final Setting TOP_N_LATENCY_QUERIES_ENABLED = Setting.boolSetting( + TOP_N_LATENCY_QUERIES_PREFIX + ".enabled", + false, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Int setting to define the top n size for top queries by latency. + */ + public static final Setting TOP_N_LATENCY_QUERIES_SIZE = Setting.intSetting( + TOP_N_LATENCY_QUERIES_PREFIX + ".top_n_size", + DEFAULT_TOP_N_SIZE, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Time setting to define the window size in seconds for top queries by latency. + */ + public static final Setting TOP_N_LATENCY_QUERIES_WINDOW_SIZE = Setting.positiveTimeSetting( + TOP_N_LATENCY_QUERIES_PREFIX + ".window_size", + DEFAULT_WINDOW_SIZE, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + /** + * Default constructor + */ + public QueryInsightsSettings() {} +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/package-info.java new file mode 100644 index 0000000000000..f3152bbf966cb --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Settings for Query Insights Plugin + */ +package org.opensearch.plugin.insights.settings; diff --git a/plugins/query-insights/src/main/resources/mappings/top_n_queries_record.json b/plugins/query-insights/src/main/resources/mappings/top_n_queries_record.json new file mode 100644 index 0000000000000..3bdfa835e813e --- /dev/null +++ b/plugins/query-insights/src/main/resources/mappings/top_n_queries_record.json @@ -0,0 +1,40 @@ +{ + "_meta" : { + "schema_version": 1 + }, + "properties" : { + "timestamp" : { + "type" : "date" + }, + "search_type" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword", + "ignore_above" : 256 + } + } + }, + "total_shards" : { + "type" : "integer" + }, + "indices" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword", + "ignore_above" : 256 + } + } + }, + "phase_latency_map" : { + "type" : "object" + }, + "property_map" : { + "type" : "object" + }, + "value" : { + "type" : "long" + } + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java new file mode 100644 index 0000000000000..681979bb5eb0b --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java @@ -0,0 +1,97 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights; + +import org.opensearch.action.ActionRequest; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.plugin.insights.core.listener.QueryInsightsListener; +import org.opensearch.plugin.insights.core.service.QueryInsightsService; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction; +import org.opensearch.plugin.insights.rules.resthandler.top_queries.RestTopQueriesAction; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.plugins.ActionPlugin; +import org.opensearch.rest.RestHandler; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; +import org.junit.Before; + +import java.util.Arrays; +import java.util.List; + +import static org.mockito.Mockito.mock; + +public class QueryInsightsPluginTests extends OpenSearchTestCase { + + private QueryInsightsPlugin queryInsightsPlugin; + + private final Client client = mock(Client.class); + private ClusterService clusterService; + private final ThreadPool threadPool = mock(ThreadPool.class); + + @Before + public void setup() { + queryInsightsPlugin = new QueryInsightsPlugin(); + Settings.Builder settingsBuilder = Settings.builder(); + Settings settings = settingsBuilder.build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE); + + clusterService = new ClusterService(settings, clusterSettings, threadPool); + + } + + public void testGetSettings() { + assertEquals( + Arrays.asList( + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED, + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE, + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE + ), + queryInsightsPlugin.getSettings() + ); + } + + public void testCreateComponent() { + List components = (List) queryInsightsPlugin.createComponents( + client, + clusterService, + threadPool, + null, + null, + null, + null, + null, + null, + null, + null + ); + assertEquals(2, components.size()); + assertTrue(components.get(0) instanceof QueryInsightsService); + assertTrue(components.get(1) instanceof QueryInsightsListener); + } + + public void testGetRestHandlers() { + List components = queryInsightsPlugin.getRestHandlers(Settings.EMPTY, null, null, null, null, null, null); + assertEquals(1, components.size()); + assertTrue(components.get(0) instanceof RestTopQueriesAction); + } + + public void testGetActions() { + List> components = queryInsightsPlugin.getActions(); + assertEquals(1, components.size()); + assertTrue(components.get(0).getAction() instanceof TopQueriesAction); + } + +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java new file mode 100644 index 0000000000000..870ef5b9c8be9 --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java @@ -0,0 +1,189 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights; + +import org.opensearch.action.search.SearchType; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.util.Maps; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueries; +import org.opensearch.plugin.insights.rules.model.Attribute; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.test.VersionUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.opensearch.test.OpenSearchTestCase.buildNewFakeTransportAddress; +import static org.opensearch.test.OpenSearchTestCase.random; +import static org.opensearch.test.OpenSearchTestCase.randomAlphaOfLengthBetween; +import static org.opensearch.test.OpenSearchTestCase.randomArray; +import static org.opensearch.test.OpenSearchTestCase.randomDouble; +import static org.opensearch.test.OpenSearchTestCase.randomIntBetween; +import static org.opensearch.test.OpenSearchTestCase.randomLong; +import static org.opensearch.test.OpenSearchTestCase.randomLongBetween; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +final public class QueryInsightsTestUtils { + + public QueryInsightsTestUtils() {} + + public static List generateQueryInsightRecords(int count) { + return generateQueryInsightRecords(count, count, System.currentTimeMillis(), 0); + } + + /** + * Creates a List of random Query Insight Records for testing purpose + */ + public static List generateQueryInsightRecords(int lower, int upper, long startTimeStamp, long interval) { + List records = new ArrayList<>(); + int countOfRecords = randomIntBetween(lower, upper); + long timestamp = startTimeStamp; + for (int i = 0; i < countOfRecords; ++i) { + Map measurements = Map.of( + MetricType.LATENCY, + randomLongBetween(1000, 10000), + MetricType.CPU, + randomDouble(), + MetricType.JVM, + randomDouble() + ); + + Map phaseLatencyMap = new HashMap<>(); + int countOfPhases = randomIntBetween(2, 5); + for (int j = 0; j < countOfPhases; ++j) { + phaseLatencyMap.put(randomAlphaOfLengthBetween(5, 10), randomLong()); + } + Map attributes = new HashMap<>(); + attributes.put(Attribute.SEARCH_TYPE, SearchType.QUERY_THEN_FETCH.toString().toLowerCase(Locale.ROOT)); + attributes.put(Attribute.SOURCE, "{\"size\":20}"); + attributes.put(Attribute.TOTAL_SHARDS, randomIntBetween(1, 100)); + attributes.put(Attribute.INDICES, randomArray(1, 3, Object[]::new, () -> randomAlphaOfLengthBetween(5, 10))); + attributes.put(Attribute.PHASE_LATENCY_MAP, phaseLatencyMap); + + records.add(new SearchQueryRecord(timestamp, measurements, attributes)); + timestamp += interval; + } + return records; + } + + public static TopQueries createRandomTopQueries() { + DiscoveryNode node = new DiscoveryNode( + "node_for_top_queries_test", + buildNewFakeTransportAddress(), + emptyMap(), + emptySet(), + VersionUtils.randomVersion(random()) + ); + List records = generateQueryInsightRecords(10); + + return new TopQueries(node, records); + } + + public static TopQueries createFixedTopQueries() { + DiscoveryNode node = new DiscoveryNode( + "node_for_top_queries_test", + buildNewFakeTransportAddress(), + emptyMap(), + emptySet(), + VersionUtils.randomVersion(random()) + ); + List records = new ArrayList<>(); + records.add(createFixedSearchQueryRecord()); + + return new TopQueries(node, records); + } + + public static SearchQueryRecord createFixedSearchQueryRecord() { + long timestamp = 1706574180000L; + Map measurements = Map.of(MetricType.LATENCY, 1L); + + Map phaseLatencyMap = new HashMap<>(); + Map attributes = new HashMap<>(); + attributes.put(Attribute.SEARCH_TYPE, SearchType.QUERY_THEN_FETCH.toString().toLowerCase(Locale.ROOT)); + + return new SearchQueryRecord(timestamp, measurements, attributes); + } + + public static void compareJson(ToXContent param1, ToXContent param2) throws IOException { + if (param1 == null || param2 == null) { + assertNull(param1); + assertNull(param2); + return; + } + + ToXContent.Params params = ToXContent.EMPTY_PARAMS; + XContentBuilder param1Builder = jsonBuilder(); + param1.toXContent(param1Builder, params); + + XContentBuilder param2Builder = jsonBuilder(); + param2.toXContent(param2Builder, params); + + assertEquals(param1Builder.toString(), param2Builder.toString()); + } + + @SuppressWarnings("unchecked") + public static boolean checkRecordsEquals(List records1, List records2) { + if (records1.size() != records2.size()) { + return false; + } + for (int i = 0; i < records1.size(); i++) { + if (!records1.get(i).equals(records2.get(i))) { + return false; + } + Map attributes1 = records1.get(i).getAttributes(); + Map attributes2 = records2.get(i).getAttributes(); + for (Map.Entry entry : attributes1.entrySet()) { + Attribute attribute = entry.getKey(); + Object value = entry.getValue(); + if (!attributes2.containsKey(attribute)) { + return false; + } + if (value instanceof Object[] && !Arrays.deepEquals((Object[]) value, (Object[]) attributes2.get(attribute))) { + return false; + } else if (value instanceof Map + && !Maps.deepEquals((Map) value, (Map) attributes2.get(attribute))) { + return false; + } + } + } + return true; + } + + public static boolean checkRecordsEqualsWithoutOrder( + List records1, + List records2, + MetricType metricType + ) { + Set set2 = new TreeSet<>((a, b) -> SearchQueryRecord.compare(a, b, metricType)); + set2.addAll(records2); + if (records1.size() != records2.size()) { + return false; + } + for (int i = 0; i < records1.size(); i++) { + if (!set2.contains(records1.get(i))) { + return false; + } + } + return true; + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java new file mode 100644 index 0000000000000..f340950017a5c --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java @@ -0,0 +1,161 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.listener; + +import org.opensearch.action.search.SearchPhaseContext; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchRequestContext; +import org.opensearch.action.search.SearchType; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.plugin.insights.core.service.QueryInsightsService; +import org.opensearch.plugin.insights.core.service.TopQueriesService; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.opensearch.search.aggregations.support.ValueType; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Phaser; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Unit Tests for {@link QueryInsightsListener}. + */ +public class QueryInsightsListenerTests extends OpenSearchTestCase { + private final SearchRequestContext searchRequestContext = mock(SearchRequestContext.class); + private final SearchPhaseContext searchPhaseContext = mock(SearchPhaseContext.class); + private final SearchRequest searchRequest = mock(SearchRequest.class); + private final QueryInsightsService queryInsightsService = mock(QueryInsightsService.class); + private final TopQueriesService topQueriesService = mock(TopQueriesService.class); + private ClusterService clusterService; + + @Before + public void setup() { + Settings.Builder settingsBuilder = Settings.builder(); + Settings settings = settingsBuilder.build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE); + clusterService = new ClusterService(settings, clusterSettings, null); + when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true); + when(queryInsightsService.getTopQueriesService(MetricType.LATENCY)).thenReturn(topQueriesService); + } + + public void testOnRequestEnd() throws InterruptedException { + Long timestamp = System.currentTimeMillis() - 100L; + SearchType searchType = SearchType.QUERY_THEN_FETCH; + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.aggregation(new TermsAggregationBuilder("agg1").userValueTypeHint(ValueType.STRING).field("type.keyword")); + searchSourceBuilder.size(0); + + String[] indices = new String[] { "index-1", "index-2" }; + + Map phaseLatencyMap = new HashMap<>(); + phaseLatencyMap.put("expand", 0L); + phaseLatencyMap.put("query", 20L); + phaseLatencyMap.put("fetch", 1L); + + int numberOfShards = 10; + + QueryInsightsListener queryInsightsListener = new QueryInsightsListener(clusterService, queryInsightsService); + + when(searchRequest.getOrCreateAbsoluteStartMillis()).thenReturn(timestamp); + when(searchRequest.searchType()).thenReturn(searchType); + when(searchRequest.source()).thenReturn(searchSourceBuilder); + when(searchRequest.indices()).thenReturn(indices); + when(searchRequestContext.phaseTookMap()).thenReturn(phaseLatencyMap); + when(searchPhaseContext.getRequest()).thenReturn(searchRequest); + when(searchPhaseContext.getNumShards()).thenReturn(numberOfShards); + + queryInsightsListener.onRequestEnd(searchPhaseContext, searchRequestContext); + + verify(queryInsightsService, times(1)).addRecord(any()); + } + + public void testConcurrentOnRequestEnd() throws InterruptedException { + Long timestamp = System.currentTimeMillis() - 100L; + SearchType searchType = SearchType.QUERY_THEN_FETCH; + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.aggregation(new TermsAggregationBuilder("agg1").userValueTypeHint(ValueType.STRING).field("type.keyword")); + searchSourceBuilder.size(0); + + String[] indices = new String[] { "index-1", "index-2" }; + + Map phaseLatencyMap = new HashMap<>(); + phaseLatencyMap.put("expand", 0L); + phaseLatencyMap.put("query", 20L); + phaseLatencyMap.put("fetch", 1L); + + int numberOfShards = 10; + + final List searchListenersList = new ArrayList<>(); + + when(searchRequest.getOrCreateAbsoluteStartMillis()).thenReturn(timestamp); + when(searchRequest.searchType()).thenReturn(searchType); + when(searchRequest.source()).thenReturn(searchSourceBuilder); + when(searchRequest.indices()).thenReturn(indices); + when(searchRequestContext.phaseTookMap()).thenReturn(phaseLatencyMap); + when(searchPhaseContext.getRequest()).thenReturn(searchRequest); + when(searchPhaseContext.getNumShards()).thenReturn(numberOfShards); + + int numRequests = 50; + Thread[] threads = new Thread[numRequests]; + Phaser phaser = new Phaser(numRequests + 1); + CountDownLatch countDownLatch = new CountDownLatch(numRequests); + + for (int i = 0; i < numRequests; i++) { + searchListenersList.add(new QueryInsightsListener(clusterService, queryInsightsService)); + } + + for (int i = 0; i < numRequests; i++) { + int finalI = i; + threads[i] = new Thread(() -> { + phaser.arriveAndAwaitAdvance(); + QueryInsightsListener thisListener = searchListenersList.get(finalI); + thisListener.onRequestEnd(searchPhaseContext, searchRequestContext); + countDownLatch.countDown(); + }); + threads[i].start(); + } + phaser.arriveAndAwaitAdvance(); + countDownLatch.await(); + + verify(queryInsightsService, times(numRequests)).addRecord(any()); + } + + public void testSetEnabled() { + when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true); + QueryInsightsListener queryInsightsListener = new QueryInsightsListener(clusterService, queryInsightsService); + queryInsightsListener.setEnableTopQueries(MetricType.LATENCY, true); + assertTrue(queryInsightsListener.isEnabled()); + + when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(false); + when(queryInsightsService.isCollectionEnabled(MetricType.CPU)).thenReturn(false); + when(queryInsightsService.isCollectionEnabled(MetricType.JVM)).thenReturn(false); + queryInsightsListener.setEnableTopQueries(MetricType.LATENCY, false); + assertFalse(queryInsightsListener.isEnabled()); + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java new file mode 100644 index 0000000000000..c29b48b9690d1 --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.service; + +import org.opensearch.plugin.insights.QueryInsightsTestUtils; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; +import org.junit.Before; + +import static org.mockito.Mockito.mock; + +/** + * Unit Tests for {@link QueryInsightsService}. + */ +public class QueryInsightsServiceTests extends OpenSearchTestCase { + private final ThreadPool threadPool = mock(ThreadPool.class); + private QueryInsightsService queryInsightsService; + + @Before + public void setup() { + queryInsightsService = new QueryInsightsService(threadPool); + queryInsightsService.enableCollection(MetricType.LATENCY, true); + queryInsightsService.enableCollection(MetricType.CPU, true); + queryInsightsService.enableCollection(MetricType.JVM, true); + } + + public void testAddRecordToLimitAndDrain() { + SearchQueryRecord record = QueryInsightsTestUtils.generateQueryInsightRecords(1, 1, System.currentTimeMillis(), 0).get(0); + for (int i = 0; i < QueryInsightsSettings.QUERY_RECORD_QUEUE_CAPACITY; i++) { + assertTrue(queryInsightsService.addRecord(record)); + } + // exceed capacity + assertFalse(queryInsightsService.addRecord(record)); + queryInsightsService.drainRecords(); + assertEquals( + QueryInsightsSettings.DEFAULT_TOP_N_SIZE, + queryInsightsService.getTopQueriesService(MetricType.LATENCY).getTopQueriesRecords(false).size() + ); + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java new file mode 100644 index 0000000000000..060df84a89485 --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java @@ -0,0 +1,102 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.service; + +import org.opensearch.cluster.coordination.DeterministicTaskQueue; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.plugin.insights.QueryInsightsTestUtils; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Unit Tests for {@link QueryInsightsService}. + */ +public class TopQueriesServiceTests extends OpenSearchTestCase { + private TopQueriesService topQueriesService; + + @Before + public void setup() { + topQueriesService = new TopQueriesService(MetricType.LATENCY); + topQueriesService.setTopNSize(Integer.MAX_VALUE); + topQueriesService.setWindowSize(new TimeValue(Long.MAX_VALUE)); + topQueriesService.setEnabled(true); + } + + public void testIngestQueryDataWithLargeWindow() { + final List records = QueryInsightsTestUtils.generateQueryInsightRecords(10); + topQueriesService.consumeRecords(records); + assertTrue( + QueryInsightsTestUtils.checkRecordsEqualsWithoutOrder( + topQueriesService.getTopQueriesRecords(false), + records, + MetricType.LATENCY + ) + ); + } + + public void testRollingWindows() { + List records; + // Create 5 records at Now - 10 minutes to make sure they belong to the last window + records = QueryInsightsTestUtils.generateQueryInsightRecords(5, 5, System.currentTimeMillis() - 1000 * 60 * 10, 0); + topQueriesService.setWindowSize(TimeValue.timeValueMinutes(10)); + topQueriesService.consumeRecords(records); + assertEquals(0, topQueriesService.getTopQueriesRecords(true).size()); + + // Create 10 records at now + 1 minute, to make sure they belong to the current window + records = QueryInsightsTestUtils.generateQueryInsightRecords(10, 10, System.currentTimeMillis() + 1000 * 60, 0); + topQueriesService.setWindowSize(TimeValue.timeValueMinutes(10)); + topQueriesService.consumeRecords(records); + assertEquals(10, topQueriesService.getTopQueriesRecords(true).size()); + } + + public void testSmallNSize() { + final List records = QueryInsightsTestUtils.generateQueryInsightRecords(10); + topQueriesService.setTopNSize(1); + topQueriesService.consumeRecords(records); + assertEquals(1, topQueriesService.getTopQueriesRecords(false).size()); + } + + public void testValidateTopNSize() { + assertThrows(IllegalArgumentException.class, () -> { topQueriesService.validateTopNSize(QueryInsightsSettings.MAX_N_SIZE + 1); }); + } + + public void testGetTopQueriesWhenNotEnabled() { + topQueriesService.setEnabled(false); + assertThrows(IllegalArgumentException.class, () -> { topQueriesService.getTopQueriesRecords(false); }); + } + + public void testValidateWindowSize() { + assertThrows(IllegalArgumentException.class, () -> { + topQueriesService.validateWindowSize(new TimeValue(QueryInsightsSettings.MAX_WINDOW_SIZE.getSeconds() + 1, TimeUnit.SECONDS)); + }); + assertThrows(IllegalArgumentException.class, () -> { + topQueriesService.validateWindowSize(new TimeValue(QueryInsightsSettings.MIN_WINDOW_SIZE.getSeconds() - 1, TimeUnit.SECONDS)); + }); + assertThrows(IllegalArgumentException.class, () -> { topQueriesService.validateWindowSize(new TimeValue(2, TimeUnit.DAYS)); }); + assertThrows(IllegalArgumentException.class, () -> { topQueriesService.validateWindowSize(new TimeValue(7, TimeUnit.MINUTES)); }); + } + + private static void runUntilTimeoutOrFinish(DeterministicTaskQueue deterministicTaskQueue, long duration) { + final long endTime = deterministicTaskQueue.getCurrentTimeMillis() + duration; + while (deterministicTaskQueue.getCurrentTimeMillis() < endTime + && (deterministicTaskQueue.hasRunnableTasks() || deterministicTaskQueue.hasDeferredTasks())) { + if (deterministicTaskQueue.hasDeferredTasks() && randomBoolean()) { + deterministicTaskQueue.advanceTime(); + } else if (deterministicTaskQueue.hasRunnableTasks()) { + deterministicTaskQueue.runRandomTask(); + } + } + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequestTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequestTests.java new file mode 100644 index 0000000000000..619fd4b33a3dc --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequestTests.java @@ -0,0 +1,43 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.action.top_queries; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.test.OpenSearchTestCase; + +/** + * Granular tests for the {@link TopQueriesRequest} class. + */ +public class TopQueriesRequestTests extends OpenSearchTestCase { + + /** + * Check that we can set the metric type + */ + public void testSetMetricType() throws Exception { + TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY, randomAlphaOfLength(5)); + TopQueriesRequest deserializedRequest = roundTripRequest(request); + assertEquals(request.getMetricType(), deserializedRequest.getMetricType()); + } + + /** + * Serialize and deserialize a request. + * @param request A request to serialize. + * @return The deserialized, "round-tripped" request. + */ + private static TopQueriesRequest roundTripRequest(TopQueriesRequest request) throws Exception { + try (BytesStreamOutput out = new BytesStreamOutput()) { + request.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + return new TopQueriesRequest(in); + } + } + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponseTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponseTests.java new file mode 100644 index 0000000000000..eeee50d3da703 --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponseTests.java @@ -0,0 +1,71 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.action.top_queries; + +import org.opensearch.cluster.ClusterName; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.plugin.insights.QueryInsightsTestUtils; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * Granular tests for the {@link TopQueriesResponse} class. + */ +public class TopQueriesResponseTests extends OpenSearchTestCase { + + /** + * Check serialization and deserialization + */ + public void testSerialize() throws Exception { + TopQueries topQueries = QueryInsightsTestUtils.createRandomTopQueries(); + ClusterName clusterName = new ClusterName("test-cluster"); + TopQueriesResponse response = new TopQueriesResponse(clusterName, List.of(topQueries), new ArrayList<>(), 10, MetricType.LATENCY); + TopQueriesResponse deserializedResponse = roundTripResponse(response); + assertEquals(response.toString(), deserializedResponse.toString()); + } + + public void testToXContent() throws IOException { + char[] expectedXcontent = + "{\"top_queries\":[{\"timestamp\":1706574180000,\"node_id\":\"node_for_top_queries_test\",\"search_type\":\"query_then_fetch\",\"latency\":1}]}" + .toCharArray(); + TopQueries topQueries = QueryInsightsTestUtils.createFixedTopQueries(); + ClusterName clusterName = new ClusterName("test-cluster"); + TopQueriesResponse response = new TopQueriesResponse(clusterName, List.of(topQueries), new ArrayList<>(), 10, MetricType.LATENCY); + XContentBuilder builder = MediaTypeRegistry.contentBuilder(MediaTypeRegistry.JSON); + char[] xContent = BytesReference.bytes(response.toXContent(builder, ToXContent.EMPTY_PARAMS)).utf8ToString().toCharArray(); + Arrays.sort(expectedXcontent); + Arrays.sort(xContent); + + assertEquals(Arrays.hashCode(expectedXcontent), Arrays.hashCode(xContent)); + } + + /** + * Serialize and deserialize a TopQueriesResponse. + * @param response A response to serialize. + * @return The deserialized, "round-tripped" response. + */ + private static TopQueriesResponse roundTripResponse(TopQueriesResponse response) throws Exception { + try (BytesStreamOutput out = new BytesStreamOutput()) { + response.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + return new TopQueriesResponse(in); + } + } + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesTests.java new file mode 100644 index 0000000000000..7db08b53ad1df --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesTests.java @@ -0,0 +1,35 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.action.top_queries; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.plugin.insights.QueryInsightsTestUtils; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; + +/** + * Tests for {@link TopQueries}. + */ +public class TopQueriesTests extends OpenSearchTestCase { + + public void testTopQueries() throws IOException { + TopQueries topQueries = QueryInsightsTestUtils.createRandomTopQueries(); + try (BytesStreamOutput out = new BytesStreamOutput()) { + topQueries.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + TopQueries readTopQueries = new TopQueries(in); + assertTrue( + QueryInsightsTestUtils.checkRecordsEquals(topQueries.getTopQueriesRecord(), readTopQueries.getTopQueriesRecord()) + ); + } + } + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecordTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecordTests.java new file mode 100644 index 0000000000000..793d5878e2300 --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecordTests.java @@ -0,0 +1,71 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.model; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.plugin.insights.QueryInsightsTestUtils; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Granular tests for the {@link SearchQueryRecord} class. + */ +public class SearchQueryRecordTests extends OpenSearchTestCase { + + /** + * Check that if the serialization, deserialization and equals functions are working as expected + */ + public void testSerializationAndEquals() throws Exception { + List records = QueryInsightsTestUtils.generateQueryInsightRecords(10); + List copiedRecords = new ArrayList<>(); + for (SearchQueryRecord record : records) { + copiedRecords.add(roundTripRecord(record)); + } + assertTrue(QueryInsightsTestUtils.checkRecordsEquals(records, copiedRecords)); + + } + + public void testAllMetricTypes() { + Set allMetrics = MetricType.allMetricTypes(); + Set expected = new HashSet<>(Arrays.asList(MetricType.LATENCY, MetricType.CPU, MetricType.JVM)); + assertEquals(expected, allMetrics); + } + + public void testCompare() { + SearchQueryRecord record1 = QueryInsightsTestUtils.createFixedSearchQueryRecord(); + SearchQueryRecord record2 = QueryInsightsTestUtils.createFixedSearchQueryRecord(); + assertEquals(0, SearchQueryRecord.compare(record1, record2, MetricType.LATENCY)); + } + + public void testEqual() { + SearchQueryRecord record1 = QueryInsightsTestUtils.createFixedSearchQueryRecord(); + SearchQueryRecord record2 = QueryInsightsTestUtils.createFixedSearchQueryRecord(); + assertEquals(record1, record2); + } + + /** + * Serialize and deserialize a SearchQueryRecord. + * @param record A SearchQueryRecord to serialize. + * @return The deserialized, "round-tripped" record. + */ + private static SearchQueryRecord roundTripRecord(SearchQueryRecord record) throws Exception { + try (BytesStreamOutput out = new BytesStreamOutput()) { + record.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + return new SearchQueryRecord(in); + } + } + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesActionTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesActionTests.java new file mode 100644 index 0000000000000..627d794731eed --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesActionTests.java @@ -0,0 +1,62 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.resthandler.top_queries; + +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesRequest; +import org.opensearch.rest.RestRequest; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.rest.FakeRestRequest; + +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; + +import static org.opensearch.plugin.insights.rules.resthandler.top_queries.RestTopQueriesAction.ALLOWED_METRICS; + +public class RestTopQueriesActionTests extends OpenSearchTestCase { + + public void testEmptyNodeIdsValidType() { + Map params = new HashMap<>(); + params.put("type", randomFrom(ALLOWED_METRICS)); + + RestRequest restRequest = buildRestRequest(params); + TopQueriesRequest actual = RestTopQueriesAction.prepareRequest(restRequest); + assertEquals(0, actual.nodesIds().length); + } + + public void testNodeIdsValid() { + Map params = new HashMap<>(); + params.put("type", randomFrom(ALLOWED_METRICS)); + String[] nodes = randomArray(1, 10, String[]::new, () -> randomAlphaOfLengthBetween(5, 10)); + params.put("nodeId", String.join(",", nodes)); + + RestRequest restRequest = buildRestRequest(params); + TopQueriesRequest actual = RestTopQueriesAction.prepareRequest(restRequest); + assertArrayEquals(nodes, actual.nodesIds()); + } + + public void testInValidType() { + Map params = new HashMap<>(); + params.put("type", randomAlphaOfLengthBetween(5, 10).toUpperCase(Locale.ROOT)); + + RestRequest restRequest = buildRestRequest(params); + Exception exception = assertThrows(IllegalArgumentException.class, () -> { RestTopQueriesAction.prepareRequest(restRequest); }); + assertEquals( + String.format(Locale.ROOT, "request [/_insights/top_queries] contains invalid metric type [%s]", params.get("type")), + exception.getMessage() + ); + } + + private FakeRestRequest buildRestRequest(Map params) { + return new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.GET) + .withPath("/_insights/top_queries") + .withParams(params) + .build(); + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesActionTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesActionTests.java new file mode 100644 index 0000000000000..a5f36b6e8cce0 --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesActionTests.java @@ -0,0 +1,84 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.transport.top_queries; + +import org.opensearch.action.support.ActionFilters; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.plugin.insights.core.service.QueryInsightsService; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesRequest; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesResponse; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; +import org.junit.Before; + +import java.util.List; + +import static org.mockito.Mockito.mock; + +public class TransportTopQueriesActionTests extends OpenSearchTestCase { + + private final ThreadPool threadPool = mock(ThreadPool.class); + + private final Settings.Builder settingsBuilder = Settings.builder(); + private final Settings settings = settingsBuilder.build(); + private final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + private final ClusterService clusterService = new ClusterService(settings, clusterSettings, threadPool); + private final TransportService transportService = mock(TransportService.class); + private final QueryInsightsService topQueriesByLatencyService = mock(QueryInsightsService.class); + private final ActionFilters actionFilters = mock(ActionFilters.class); + private final TransportTopQueriesAction transportTopQueriesAction = new TransportTopQueriesAction( + threadPool, + clusterService, + transportService, + topQueriesByLatencyService, + actionFilters + ); + private final DummyParentAction dummyParentAction = new DummyParentAction( + threadPool, + clusterService, + transportService, + topQueriesByLatencyService, + actionFilters + ); + + class DummyParentAction extends TransportTopQueriesAction { + public DummyParentAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + QueryInsightsService topQueriesByLatencyService, + ActionFilters actionFilters + ) { + super(threadPool, clusterService, transportService, topQueriesByLatencyService, actionFilters); + } + + public TopQueriesResponse createNewResponse() { + TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY); + return newResponse(request, List.of(), List.of()); + } + } + + @Before + public void setup() { + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE); + } + + public void testNewResponse() { + TopQueriesResponse response = dummyParentAction.createNewResponse(); + assertNotNull(response); + } + +} diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequest.java b/server/src/main/java/org/opensearch/action/search/SearchRequest.java index 96cea17ff4972..f738c182c06da 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequest.java @@ -359,7 +359,7 @@ boolean isFinalReduce() { * request. When created through {@link #subSearchRequest(SearchRequest, String[], String, long, boolean)}, this method returns * the provided current time, otherwise it will return {@link System#currentTimeMillis()}. */ - long getOrCreateAbsoluteStartMillis() { + public long getOrCreateAbsoluteStartMillis() { return absoluteStartMillis == DEFAULT_ABSOLUTE_START_MILLIS ? System.currentTimeMillis() : absoluteStartMillis; } diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java b/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java index eceac7204b196..383d9b5e82fe2 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java @@ -22,7 +22,7 @@ * @opensearch.internal */ @InternalApi -class SearchRequestContext { +public class SearchRequestContext { private final SearchRequestOperationsListener searchRequestOperationsListener; private long absoluteStartNanos; private final Map phaseTookMap; @@ -47,7 +47,7 @@ void updatePhaseTookMap(String phaseName, Long tookTime) { this.phaseTookMap.put(phaseName, tookTime); } - Map phaseTookMap() { + public Map phaseTookMap() { return phaseTookMap; } @@ -70,7 +70,7 @@ void setAbsoluteStartNanos(long absoluteStartNanos) { /** * Request start time in nanos */ - long getAbsoluteStartNanos() { + public long getAbsoluteStartNanos() { return absoluteStartNanos; } diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java index 2a09cc084f79f..2acb35af667f0 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java @@ -31,21 +31,21 @@ protected SearchRequestOperationsListener(final boolean enabled) { this.enabled = enabled; } - abstract void onPhaseStart(SearchPhaseContext context); + protected abstract void onPhaseStart(SearchPhaseContext context); - abstract void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext); + protected abstract void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext); - abstract void onPhaseFailure(SearchPhaseContext context); + protected abstract void onPhaseFailure(SearchPhaseContext context); - void onRequestStart(SearchRequestContext searchRequestContext) {} + protected void onRequestStart(SearchRequestContext searchRequestContext) {} - void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} + protected void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} - boolean isEnabled(SearchRequest searchRequest) { + protected boolean isEnabled(SearchRequest searchRequest) { return isEnabled(); } - boolean isEnabled() { + protected boolean isEnabled() { return enabled; } @@ -69,7 +69,7 @@ static final class CompositeListener extends SearchRequestOperationsListener { } @Override - void onPhaseStart(SearchPhaseContext context) { + protected void onPhaseStart(SearchPhaseContext context) { for (SearchRequestOperationsListener listener : listeners) { try { listener.onPhaseStart(context); @@ -80,7 +80,7 @@ void onPhaseStart(SearchPhaseContext context) { } @Override - void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { for (SearchRequestOperationsListener listener : listeners) { try { listener.onPhaseEnd(context, searchRequestContext); @@ -91,7 +91,7 @@ void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestCo } @Override - void onPhaseFailure(SearchPhaseContext context) { + protected void onPhaseFailure(SearchPhaseContext context) { for (SearchRequestOperationsListener listener : listeners) { try { listener.onPhaseFailure(context); @@ -102,7 +102,7 @@ void onPhaseFailure(SearchPhaseContext context) { } @Override - void onRequestStart(SearchRequestContext searchRequestContext) { + protected void onRequestStart(SearchRequestContext searchRequestContext) { for (SearchRequestOperationsListener listener : listeners) { try { listener.onRequestStart(searchRequestContext); diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java b/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java index 7f25f9026f215..74e04d976cb1c 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java @@ -134,19 +134,19 @@ public SearchRequestSlowLog(ClusterService clusterService) { } @Override - void onPhaseStart(SearchPhaseContext context) {} + protected void onPhaseStart(SearchPhaseContext context) {} @Override - void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} + protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} @Override - void onPhaseFailure(SearchPhaseContext context) {} + protected void onPhaseFailure(SearchPhaseContext context) {} @Override - void onRequestStart(SearchRequestContext searchRequestContext) {} + protected void onRequestStart(SearchRequestContext searchRequestContext) {} @Override - void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + protected void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { long tookInNanos = System.nanoTime() - searchRequestContext.getAbsoluteStartNanos(); if (warnThreshold >= 0 && tookInNanos > warnThreshold && level.isLevelEnabledFor(SlowLogLevel.WARN)) { diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java b/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java index 88d599a0dcdaa..ac32b08afb7f6 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java @@ -58,12 +58,12 @@ public long getPhaseMetric(SearchPhaseName searchPhaseName) { } @Override - void onPhaseStart(SearchPhaseContext context) { + protected void onPhaseStart(SearchPhaseContext context) { phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.inc(); } @Override - void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { StatsHolder phaseStats = phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()); phaseStats.current.dec(); phaseStats.total.inc(); @@ -71,7 +71,7 @@ void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestCo } @Override - void onPhaseFailure(SearchPhaseContext context) { + protected void onPhaseFailure(SearchPhaseContext context) { phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec(); } diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java index 78c5ba4412c68..1cb336e18b12c 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java @@ -119,13 +119,13 @@ public void testStandardListenerAndPerRequestListenerDisabled() { public SearchRequestOperationsListener createTestSearchRequestOperationsListener() { return new SearchRequestOperationsListener() { @Override - void onPhaseStart(SearchPhaseContext context) {} + protected void onPhaseStart(SearchPhaseContext context) {} @Override - void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} + protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} @Override - void onPhaseFailure(SearchPhaseContext context) {} + protected void onPhaseFailure(SearchPhaseContext context) {} }; } }