From 416d41e6e4c064da99701f8ccb9f0d2c235ac154 Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Fri, 23 Feb 2024 17:51:03 -0800 Subject: [PATCH] Accurate instrumentation: correlate queries and tasks results on cluster manager node --- .../resourcetracker/TaskResourceUsage.java | 3 +- .../common/CommonAnalysisModulePlugin.java | 4 +- .../painless/PainlessModulePlugin.java | 4 +- .../index/reindex/ReindexModulePlugin.java | 4 +- .../systemd/SystemdModulePlugin.java | 4 +- .../plugin/insights/QueryInsightsPlugin.java | 22 ++- .../core/listener/QueryInsightsListener.java | 7 +- .../listener/ResourceTrackingListener.java | 62 ++++++++ .../core/service/QueryInsightsService.java | 104 ++++++++++++- .../action/top_queries/SearchMetadata.java | 90 +++++++++++ .../top_queries/SearchMetadataAction.java | 32 ++++ .../top_queries/SearchMetadataRequest.java | 50 ++++++ .../top_queries/SearchMetadataResponse.java | 94 ++++++++++++ .../top_queries/TopQueriesResponse.java | 15 -- .../insights/rules/model/MetricType.java | 3 +- .../rules/model/SearchQueryRecord.java | 20 ++- .../rules/model/SearchTaskMetadata.java | 101 +++++++++++++ .../top_queries/RestTopQueriesAction.java | 15 +- .../TransportSearchMetadataAction.java | 142 ++++++++++++++++++ .../settings/QueryInsightsSettings.java | 3 +- .../action/search/FetchSearchPhase.java | 1 - .../main/java/org/opensearch/node/Node.java | 62 ++++---- .../java/org/opensearch/plugins/Plugin.java | 4 +- .../tasks/TaskResourceTrackingService.java | 22 +++ 24 files changed, 790 insertions(+), 78 deletions(-) create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/ResourceTrackingListener.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/SearchMetadata.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/SearchMetadataAction.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/SearchMetadataRequest.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/SearchMetadataResponse.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchTaskMetadata.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportSearchMetadataAction.java diff --git a/libs/core/src/main/java/org/opensearch/core/tasks/resourcetracker/TaskResourceUsage.java b/libs/core/src/main/java/org/opensearch/core/tasks/resourcetracker/TaskResourceUsage.java index 654f1c5695937..de20995442c20 100644 --- a/libs/core/src/main/java/org/opensearch/core/tasks/resourcetracker/TaskResourceUsage.java +++ b/libs/core/src/main/java/org/opensearch/core/tasks/resourcetracker/TaskResourceUsage.java @@ -90,7 +90,8 @@ public static TaskResourceUsage fromXContent(XContentParser parser) { @Override public String toString() { - return Strings.toString(MediaTypeRegistry.JSON, this, true, true); + // TODO revert after debugging + return Strings.toString(MediaTypeRegistry.JSON, this, false, true); } // Implements equals and hashcode for testing diff --git a/modules/analysis-common/src/main/java/org/opensearch/analysis/common/CommonAnalysisModulePlugin.java b/modules/analysis-common/src/main/java/org/opensearch/analysis/common/CommonAnalysisModulePlugin.java index cf2736a8583d2..a28a4f93f2826 100644 --- a/modules/analysis-common/src/main/java/org/opensearch/analysis/common/CommonAnalysisModulePlugin.java +++ b/modules/analysis-common/src/main/java/org/opensearch/analysis/common/CommonAnalysisModulePlugin.java @@ -153,6 +153,7 @@ import org.opensearch.repositories.RepositoriesService; import org.opensearch.script.ScriptContext; import org.opensearch.script.ScriptService; +import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.threadpool.ThreadPool; import org.opensearch.watcher.ResourceWatcherService; @@ -187,7 +188,8 @@ public Collection createComponents( NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry, IndexNameExpressionResolver expressionResolver, - Supplier repositoriesServiceSupplier + Supplier repositoriesServiceSupplier, + TaskResourceTrackingService taskResourceTrackingService ) { this.scriptService.set(scriptService); return Collections.emptyList(); diff --git a/modules/lang-painless/src/main/java/org/opensearch/painless/PainlessModulePlugin.java b/modules/lang-painless/src/main/java/org/opensearch/painless/PainlessModulePlugin.java index c7638b3c41c63..9adef1b9c77fa 100644 --- a/modules/lang-painless/src/main/java/org/opensearch/painless/PainlessModulePlugin.java +++ b/modules/lang-painless/src/main/java/org/opensearch/painless/PainlessModulePlugin.java @@ -66,6 +66,7 @@ import org.opensearch.script.ScriptEngine; import org.opensearch.script.ScriptService; import org.opensearch.search.aggregations.pipeline.MovingFunctionScript; +import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.threadpool.ThreadPool; import org.opensearch.watcher.ResourceWatcherService; @@ -140,7 +141,8 @@ public Collection createComponents( NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry, IndexNameExpressionResolver expressionResolver, - Supplier repositoriesServiceSupplier + Supplier repositoriesServiceSupplier, + TaskResourceTrackingService taskResourceTrackingService ) { // this is a hack to bind the painless script engine in guice (all components are added to guice), so that // the painless context api. this is a temporary measure until transport actions do no require guice diff --git a/modules/reindex/src/main/java/org/opensearch/index/reindex/ReindexModulePlugin.java b/modules/reindex/src/main/java/org/opensearch/index/reindex/ReindexModulePlugin.java index c211f937c1dd9..61bb26359cf97 100644 --- a/modules/reindex/src/main/java/org/opensearch/index/reindex/ReindexModulePlugin.java +++ b/modules/reindex/src/main/java/org/opensearch/index/reindex/ReindexModulePlugin.java @@ -58,6 +58,7 @@ import org.opensearch.rest.RestHandler; import org.opensearch.script.ScriptService; import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.threadpool.ThreadPool; import org.opensearch.watcher.ResourceWatcherService; @@ -122,7 +123,8 @@ public Collection createComponents( NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry, IndexNameExpressionResolver expressionResolver, - Supplier repositoriesServiceSupplier + Supplier repositoriesServiceSupplier, + TaskResourceTrackingService taskResourceTrackingService ) { return Collections.singletonList(new ReindexSslConfig(environment.settings(), environment, resourceWatcherService)); } diff --git a/modules/systemd/src/main/java/org/opensearch/systemd/SystemdModulePlugin.java b/modules/systemd/src/main/java/org/opensearch/systemd/SystemdModulePlugin.java index 6e291027fa35f..a6acf4f3797c0 100644 --- a/modules/systemd/src/main/java/org/opensearch/systemd/SystemdModulePlugin.java +++ b/modules/systemd/src/main/java/org/opensearch/systemd/SystemdModulePlugin.java @@ -47,6 +47,7 @@ import org.opensearch.plugins.Plugin; import org.opensearch.repositories.RepositoriesService; import org.opensearch.script.ScriptService; +import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; import org.opensearch.watcher.ResourceWatcherService; @@ -100,7 +101,8 @@ public Collection createComponents( final NodeEnvironment nodeEnvironment, final NamedWriteableRegistry namedWriteableRegistry, final IndexNameExpressionResolver expressionResolver, - final Supplier repositoriesServiceSupplier + final Supplier repositoriesServiceSupplier, + TaskResourceTrackingService taskResourceTrackingService ) { if (enabled == false) { extender.set(null); diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java index 4d7e0d486068a..8bb3b42b886c5 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java @@ -26,9 +26,12 @@ import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; import org.opensearch.plugin.insights.core.listener.QueryInsightsListener; +import org.opensearch.plugin.insights.core.listener.ResourceTrackingListener; import org.opensearch.plugin.insights.core.service.QueryInsightsService; +import org.opensearch.plugin.insights.rules.action.top_queries.SearchMetadataAction; import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction; import org.opensearch.plugin.insights.rules.resthandler.top_queries.RestTopQueriesAction; +import org.opensearch.plugin.insights.rules.transport.top_queries.TransportSearchMetadataAction; import org.opensearch.plugin.insights.rules.transport.top_queries.TransportTopQueriesAction; import org.opensearch.plugin.insights.settings.QueryInsightsSettings; import org.opensearch.plugins.ActionPlugin; @@ -37,6 +40,7 @@ import org.opensearch.rest.RestController; import org.opensearch.rest.RestHandler; import org.opensearch.script.ScriptService; +import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.ScalingExecutorBuilder; import org.opensearch.threadpool.ThreadPool; @@ -67,11 +71,16 @@ public Collection createComponents( final NodeEnvironment nodeEnvironment, final NamedWriteableRegistry namedWriteableRegistry, final IndexNameExpressionResolver indexNameExpressionResolver, - final Supplier repositoriesServiceSupplier + final Supplier repositoriesServiceSupplier, + final TaskResourceTrackingService taskResourceTrackingService ) { // create top n queries service - final QueryInsightsService queryInsightsService = new QueryInsightsService(threadPool); - return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService)); + final QueryInsightsService queryInsightsService = new QueryInsightsService(threadPool, client, clusterService); + return List.of( + queryInsightsService, + new QueryInsightsListener(clusterService, queryInsightsService), + new ResourceTrackingListener(queryInsightsService, taskResourceTrackingService) + ); } @Override @@ -96,12 +105,15 @@ public List getRestHandlers( final IndexNameExpressionResolver indexNameExpressionResolver, final Supplier nodesInCluster ) { - return List.of(new RestTopQueriesAction()); + return List.of(new RestTopQueriesAction(nodesInCluster)); } @Override public List> getActions() { - return List.of(new ActionPlugin.ActionHandler<>(TopQueriesAction.INSTANCE, TransportTopQueriesAction.class)); + return List.of( + new ActionPlugin.ActionHandler<>(TopQueriesAction.INSTANCE, TransportTopQueriesAction.class), + new ActionPlugin.ActionHandler<>(SearchMetadataAction.INSTANCE, TransportSearchMetadataAction.class) + ); } @Override diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java index 705273f52a567..02be204ca2651 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java @@ -123,6 +123,10 @@ public void onRequestStart(SearchRequestContext searchRequestContext) {} @Override public void onRequestEnd(final SearchPhaseContext context, final SearchRequestContext searchRequestContext) { + long taskGroupId = context.getTask().getParentTaskId().getId(); + if (taskGroupId == -1) { + taskGroupId = context.getTask().getId(); + } final SearchRequest request = context.getRequest(); try { Map measurements = new HashMap<>(); @@ -138,7 +142,8 @@ public void onRequestEnd(final SearchPhaseContext context, final SearchRequestCo attributes.put(Attribute.TOTAL_SHARDS, context.getNumShards()); attributes.put(Attribute.INDICES, request.indices()); attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap()); - SearchQueryRecord record = new SearchQueryRecord(request.getOrCreateAbsoluteStartMillis(), measurements, attributes); + attributes.put(Attribute.NODE_ID, this.queryInsightsService.clusterService.localNode().getId()); + SearchQueryRecord record = new SearchQueryRecord(request.getOrCreateAbsoluteStartMillis(), taskGroupId, measurements, attributes); queryInsightsService.addRecord(record); } catch (Exception e) { log.error(String.format(Locale.ROOT, "fail to ingest query insight data, error: %s", e)); diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/ResourceTrackingListener.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/ResourceTrackingListener.java new file mode 100644 index 0000000000000..215ca45292cab --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/ResourceTrackingListener.java @@ -0,0 +1,62 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.listener; + +import org.opensearch.core.tasks.resourcetracker.ResourceStats; +import org.opensearch.plugin.insights.core.service.QueryInsightsService; +import org.opensearch.plugin.insights.rules.model.SearchTaskMetadata; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskResourceTrackingService; +import org.opensearch.tasks.TaskResourceTrackingService.TaskCompletionListener; +import org.opensearch.tasks.TaskResourceTrackingService.TaskStartListener; + +import java.util.concurrent.atomic.AtomicInteger; + +public class ResourceTrackingListener implements TaskCompletionListener, TaskStartListener { + + private final TaskResourceTrackingService taskResourceTrackingService; + private final QueryInsightsService queryInsightsService; + + public ResourceTrackingListener ( + QueryInsightsService queryInsightsService, + TaskResourceTrackingService taskResourceTrackingService + ) { + this.queryInsightsService = queryInsightsService; + this.taskResourceTrackingService = taskResourceTrackingService; + this.taskResourceTrackingService.addTaskCompletionListener(this); + this.taskResourceTrackingService.addTaskStartListener(this); + } + @Override + public void onTaskCompleted(Task task) { + long taskGroupId = task.getParentTaskId().getId(); + if (taskGroupId == -1) { + taskGroupId = task.getId(); + } + SearchTaskMetadata info = new SearchTaskMetadata( + task.getAction(), task.getId(), taskGroupId, task.getTotalResourceStats() + ); + + int pendingTaskCount = this.queryInsightsService.taskStatusMap.get(taskGroupId).decrementAndGet(); + if (pendingTaskCount == 0) { + this.queryInsightsService.taskStatusMap.remove(taskGroupId); + } + queryInsightsService.taskRecordsQueue.add(info); + System.out.println(String.format("id = %s, parent = %s, resource = %s, action = %s, total CPU and MEM: %s, %s", task.getId(), task.getParentTaskId(), task.getResourceStats(), task.getAction(),task.getTotalResourceUtilization(ResourceStats.CPU),task.getTotalResourceUtilization(ResourceStats.MEMORY) )); + } + + @Override + public void onTaskStarts(Task task) { + long taskGroupId = task.getParentTaskId().getId(); + if (taskGroupId == -1) { + taskGroupId = task.getId(); + } + this.queryInsightsService.taskStatusMap.putIfAbsent(taskGroupId, new AtomicInteger(0)); + this.queryInsightsService.taskStatusMap.get(taskGroupId).incrementAndGet(); + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java index 525ca0d4a3d33..937edaf38f3c7 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java @@ -8,10 +8,17 @@ package org.opensearch.plugin.insights.core.service; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.core.action.ActionListener; +import org.opensearch.plugin.insights.rules.action.top_queries.SearchMetadataAction; +import org.opensearch.plugin.insights.rules.action.top_queries.SearchMetadataRequest; +import org.opensearch.plugin.insights.rules.action.top_queries.SearchMetadataResponse; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.plugin.insights.rules.model.SearchTaskMetadata; import org.opensearch.plugin.insights.settings.QueryInsightsSettings; import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; @@ -21,7 +28,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; /** * Service responsible for gathering, analyzing, storing and exporting @@ -34,6 +44,9 @@ public class QueryInsightsService extends AbstractLifecycleComponent { * The internal OpenSearch thread pool that execute async processing and exporting tasks */ private final ThreadPool threadPool; + private final Client client; + + public final ClusterService clusterService; /** * Services to capture top n queries for different metric types @@ -50,6 +63,11 @@ public class QueryInsightsService extends AbstractLifecycleComponent { */ private final LinkedBlockingQueue queryRecordsQueue; + // TODO Move these to top queries service and change to private + public final LinkedBlockingQueue taskRecordsQueue = new LinkedBlockingQueue<>(); + + public final ConcurrentHashMap taskStatusMap = new ConcurrentHashMap<>(); + /** * Holds a reference to delayed operation {@link Scheduler.Cancellable} so it can be cancelled when * the service closed concurrently. @@ -62,7 +80,7 @@ public class QueryInsightsService extends AbstractLifecycleComponent { * @param threadPool The OpenSearch thread pool to run async tasks */ @Inject - public QueryInsightsService(final ThreadPool threadPool) { + public QueryInsightsService(final ThreadPool threadPool, final Client client, final ClusterService clusterService) { enableCollect = new HashMap<>(); queryRecordsQueue = new LinkedBlockingQueue<>(QueryInsightsSettings.QUERY_RECORD_QUEUE_CAPACITY); topQueriesServices = new HashMap<>(); @@ -71,6 +89,8 @@ public QueryInsightsService(final ThreadPool threadPool) { topQueriesServices.put(metricType, new TopQueriesService(metricType)); } this.threadPool = threadPool; + this.client = client; + this.clusterService = clusterService; } /** @@ -102,17 +122,68 @@ public boolean addRecord(final SearchQueryRecord record) { * Drain the queryRecordsQueue into internal stores and services */ public void drainRecords() { - final List records = new ArrayList<>(); - queryRecordsQueue.drainTo(records); - records.sort(Comparator.comparingLong(SearchQueryRecord::getTimestamp)); + SearchMetadataRequest request = new SearchMetadataRequest(); + + // Am on Cluster Manager Node, get all top queries and tasks data from all nodes and correlate them + client.execute(SearchMetadataAction.INSTANCE, request, new ActionListener() { + @Override + public void onResponse(SearchMetadataResponse searchMetadataResponse) { + List clusterQueryRecordsList = searchMetadataResponse.getNodes().stream().flatMap(a -> a.queryRecordList.stream()).collect(Collectors.toList()); + List clusterTasksList = searchMetadataResponse.getNodes().stream().flatMap(a -> a.taskMetadataList.stream()).collect(Collectors.toList()); + Map clusterTasksMap = searchMetadataResponse.getNodes().stream().flatMap(a -> a.taskStatusMap.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, Integer::sum)); + drain(clusterQueryRecordsList, clusterTasksList, clusterTasksMap); + } + + @Override + public void onFailure(Exception e) { + // TODO + } + }); + } + + private void drain(List clusterQueryRecords, List clusterTaskRecords, Map clusterTasksStatusMap) { + final List finishedQueryRecord = correlateTasks(clusterQueryRecords, clusterTaskRecords, clusterTasksStatusMap); + finishedQueryRecord.sort(Comparator.comparingLong(SearchQueryRecord::getTimestamp)); for (MetricType metricType : MetricType.allMetricTypes()) { if (enableCollect.get(metricType)) { // ingest the records into topQueriesService - topQueriesServices.get(metricType).consumeRecords(records); + topQueriesServices.get(metricType).consumeRecords(finishedQueryRecord); } } } + + + public List correlateTasks(List clusterQueryRecords, List clusterTaskRecords, Map clusterTaskStatusMap) { + List finalResults = new ArrayList<>(); + // group taskRecords by parent task + Map> taskIdToResources = new HashMap<>(); + for (SearchTaskMetadata info : clusterTaskRecords) { + taskIdToResources.putIfAbsent(info.parentTaskId, new ArrayList<>()); + taskIdToResources.get(info.parentTaskId).add(info); + } + for (SearchQueryRecord record : clusterQueryRecords) { + if (!taskIdToResources.containsKey(record.taskId)) { + // TODO: No task info for a request, this shouldn't happen - something is wrong. + continue; + } + // parent task has finished + // TODO can remove first check after debugging + if (!clusterTaskStatusMap.containsKey(record.taskId) || clusterTaskStatusMap.get(record.taskId) == 0) { + long cpuUsage = taskIdToResources.get(record.taskId).stream().map(r -> r.taskResourceUsage.getCpuTimeInNanos()).reduce(0L, Long::sum); + long memUsage = taskIdToResources.get(record.taskId).stream().map(r -> r.taskResourceUsage.getMemoryInBytes()).reduce(0L, Long::sum); + record.measurements.put(MetricType.CPU, cpuUsage); + record.measurements.put(MetricType.JVM, memUsage); + finalResults.add(record); + } else { + // write back since the task information is not completed + queryRecordsQueue.offer(record); + taskRecordsQueue.addAll(taskIdToResources.get(record.taskId)); + } + } + return finalResults; + } + /** * Get the top queries service based on metricType * @param metricType {@link MetricType} @@ -122,6 +193,27 @@ public TopQueriesService getTopQueriesService(final MetricType metricType) { return topQueriesServices.get(metricType); } + public List getQueryRecordsList() { + final List queryRecords = new ArrayList<>(); + queryRecordsQueue.drainTo(queryRecords); + + return queryRecords; + } + + public List getTaskRecordsList() { + final List taskRecords = new ArrayList<>(); + taskRecordsQueue.drainTo(taskRecords); + + return taskRecords; + } + + public Map getTaskStatusMap() { + Map res = taskStatusMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e-> e.getValue().get())); +// taskStatusMap.clear(); + + return res; + } + /** * Set flag to enable or disable Query Insights data collection * @@ -159,7 +251,7 @@ public boolean isEnabled() { @Override protected void doStart() { - if (isEnabled()) { + if (isEnabled() && clusterService.state().nodes().isLocalNodeElectedClusterManager()) { scheduledFuture = threadPool.scheduleWithFixedDelay( this::drainRecords, QueryInsightsSettings.QUERY_RECORD_QUEUE_DRAIN_INTERVAL, diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/SearchMetadata.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/SearchMetadata.java new file mode 100644 index 0000000000000..5a7adec66ff4b --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/SearchMetadata.java @@ -0,0 +1,90 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.action.top_queries; + +import org.opensearch.action.support.nodes.BaseNodeResponse; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.annotation.PublicApi; +import org.opensearch.core.common.Strings; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage; +import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.plugin.insights.rules.model.SearchTaskMetadata; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Task resource usage information with minimal information about the task + *

+ * Writeable TaskResourceInfo objects are used to represent resource usage + * information of running tasks, which can be propagated to coordinator node + * to infer query-level resource usage + * + * @opensearch.api + */ +@PublicApi(since = "2.1.0") +public class SearchMetadata extends BaseNodeResponse implements Writeable, ToXContentObject { + public List queryRecordList; + public List taskMetadataList; + public Map taskStatusMap; + + /** + * Create the TopQueries Object from StreamInput + * @param in A {@link StreamInput} object. + * @throws IOException IOException + */ + public SearchMetadata(final StreamInput in) throws IOException { + super(in); + queryRecordList = in.readList(SearchQueryRecord::new); + taskMetadataList = in.readList(SearchTaskMetadata::new); + taskStatusMap = in.readMap(StreamInput::readLong, StreamInput::readInt); + } + + /** + * Create the TopQueries Object + * @param node A node that is part of the cluster. + * @param taskMetadataList A list of SearchQueryRecord associated in this TopQueries. + */ + public SearchMetadata(final DiscoveryNode node, List queryRecordList, List taskMetadataList, Map taskStatusMap) { + super(node); + this.queryRecordList = queryRecordList; + this.taskMetadataList = taskMetadataList; + this.taskStatusMap = taskStatusMap; + + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeList(queryRecordList); + out.writeList(taskMetadataList); + out.writeMap(taskStatusMap, StreamOutput::writeLong, StreamOutput::writeInt); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + if (taskMetadataList != null) { + for (SearchTaskMetadata metadata : taskMetadataList) { + metadata.toXContent(builder, params); + } + } + return builder.endObject(); + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/SearchMetadataAction.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/SearchMetadataAction.java new file mode 100644 index 0000000000000..524fb5b0d7882 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/SearchMetadataAction.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.action.top_queries; + +import org.opensearch.action.ActionType; + +/** + * Transport action for cluster/node level top queries information. + * + * @opensearch.internal + */ +public class SearchMetadataAction extends ActionType { + + /** + * The TopQueriesAction Instance. + */ + public static final SearchMetadataAction INSTANCE = new SearchMetadataAction(); + /** + * The name of this Action + */ + public static final String NAME = "cluster:admin/opensearch/insights/search_metadata"; + + private SearchMetadataAction() { + super(NAME, SearchMetadataResponse::new); + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/SearchMetadataRequest.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/SearchMetadataRequest.java new file mode 100644 index 0000000000000..28434844404c0 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/SearchMetadataRequest.java @@ -0,0 +1,50 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.action.top_queries; + +import org.opensearch.action.support.nodes.BaseNodesRequest; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.plugin.insights.rules.model.MetricType; + +import java.io.IOException; + +/** + * A request to get cluster/node level top queries information. + * + * @opensearch.internal + */ +public class SearchMetadataRequest extends BaseNodesRequest { + + /** + * Constructor for TopQueriesRequest + * + * @param in A {@link StreamInput} object. + * @throws IOException if the stream cannot be deserialized. + */ + public SearchMetadataRequest(final StreamInput in) throws IOException { + super(in); + } + + /** + * Get top queries from nodes based on the nodes ids specified. + * If none are passed, cluster level top queries will be returned. + * + * @param nodesIds the nodeIds specified in the request + */ + // TODO remove this + public SearchMetadataRequest(final String... nodesIds) { + super(nodesIds); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/SearchMetadataResponse.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/SearchMetadataResponse.java new file mode 100644 index 0000000000000..bfd05b9e6261f --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/SearchMetadataResponse.java @@ -0,0 +1,94 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.action.top_queries; + +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.support.nodes.BaseNodesResponse; +import org.opensearch.cluster.ClusterName; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.plugin.insights.rules.model.Attribute; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Transport response for cluster/node level top queries information. + * + * @opensearch.internal + */ +public class SearchMetadataResponse extends BaseNodesResponse implements ToXContentObject { + /** + * Constructor for TopQueriesResponse. + * + * @param in A {@link StreamInput} object. + * @throws IOException if the stream cannot be deserialized. + */ + public SearchMetadataResponse(final StreamInput in) throws IOException { + super(in); + } + + /** + * Constructor for TopQueriesResponse + * + * @param clusterName The current cluster name + * @param nodes A list that contains top queries results from all nodes + * @param failures A list that contains FailedNodeException + */ + public SearchMetadataResponse( + final ClusterName clusterName, + final List nodes, + final List failures + ) { + super(clusterName, nodes, failures); + } + + @Override + protected List readNodesFrom(final StreamInput in) throws IOException { + return in.readList(SearchMetadata::new); + } + + @Override + protected void writeNodesTo(final StreamOutput out, final List nodes) throws IOException { + out.writeList(nodes); + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + final List results = getNodes(); + builder.startObject(); + builder.startArray(); + for (SearchMetadata result : results) { + result.toXContent(builder, params); + } + builder.endArray(); + return builder.endObject(); + } + + @Override + public String toString() { + try { + final XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint(); + builder.startObject(); + this.toXContent(builder, EMPTY_PARAMS); + builder.endObject(); + return builder.toString(); + } catch (IOException e) { + return "{ \"error\" : \"" + e.getMessage() + "\"}"; + } + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponse.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponse.java index 2e66bb7f77baf..d2358c34fffc9 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponse.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponse.java @@ -84,7 +84,6 @@ protected void writeNodesTo(final StreamOutput out, final List nodes @Override public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { final List results = getNodes(); - postProcess(results); builder.startObject(); toClusterLevelResult(builder, params, results); return builder.endObject(); @@ -103,20 +102,6 @@ public String toString() { } } - /** - * Post process the top queries results to add customized attributes - * - * @param results the top queries results - */ - private void postProcess(final List results) { - for (TopQueries topQueries : results) { - final String nodeId = topQueries.getNode().getId(); - for (SearchQueryRecord record : topQueries.getTopQueriesRecord()) { - record.addAttribute(Attribute.NODE_ID, nodeId); - } - } - } - /** * Merge top n queries results from nodes into cluster level results in XContent format. * diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/MetricType.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/MetricType.java index cdd090fbf4804..d4c5e9623c8f0 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/MetricType.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/MetricType.java @@ -110,10 +110,9 @@ public int compare(final Number a, final Number b) { Number parseValue(final Object o) { switch (this) { case LATENCY: - return (Long) o; case JVM: case CPU: - return (Double) o; + return (Long) o; default: return (Number) o; } diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java index 060711edb5580..51136959d46bf 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java @@ -28,8 +28,9 @@ */ public class SearchQueryRecord implements ToXContentObject, Writeable { private final long timestamp; - private final Map measurements; - private final Map attributes; + public long taskId; + public Map measurements; + public Map attributes; /** * Constructor of SearchQueryRecord @@ -40,6 +41,7 @@ public class SearchQueryRecord implements ToXContentObject, Writeable { */ public SearchQueryRecord(final StreamInput in) throws IOException, ClassCastException { this.timestamp = in.readLong(); + this.taskId = in.readLong(); measurements = new HashMap<>(); in.readMap(MetricType::readFromStream, StreamInput::readGenericValue) .forEach(((metricType, o) -> measurements.put(metricType, metricType.parseValue(o)))); @@ -53,13 +55,15 @@ public SearchQueryRecord(final StreamInput in) throws IOException, ClassCastExce * @param measurements A list of Measurement associated with this query * @param attributes A list of Attributes associated with this query */ - public SearchQueryRecord(final long timestamp, Map measurements, final Map attributes) { + public SearchQueryRecord(final long timestamp, final long taskId, Map measurements, final Map attributes) { + this.timestamp = timestamp; + this.taskId = taskId; if (measurements == null) { throw new IllegalArgumentException("Measurements cannot be null"); } this.measurements = measurements; this.attributes = attributes; - this.timestamp = timestamp; + } /** @@ -113,10 +117,11 @@ public void addAttribute(final Attribute attribute, final Object value) { public XContentBuilder toXContent(final XContentBuilder builder, final ToXContent.Params params) throws IOException { builder.startObject(); builder.field("timestamp", timestamp); - for (Map.Entry entry : attributes.entrySet()) { + builder.field("taskId", taskId); + for (Map.Entry entry : measurements.entrySet()) { builder.field(entry.getKey().toString(), entry.getValue()); } - for (Map.Entry entry : measurements.entrySet()) { + for (Map.Entry entry : attributes.entrySet()) { builder.field(entry.getKey().toString(), entry.getValue()); } return builder.endObject(); @@ -131,6 +136,7 @@ public XContentBuilder toXContent(final XContentBuilder builder, final ToXConten @Override public void writeTo(final StreamOutput out) throws IOException { out.writeLong(timestamp); + out.writeLong(taskId); out.writeMap(measurements, (stream, metricType) -> MetricType.writeTo(out, metricType), StreamOutput::writeGenericValue); out.writeMap(attributes, (stream, attribute) -> Attribute.writeTo(out, attribute), StreamOutput::writeGenericValue); } @@ -171,6 +177,6 @@ public boolean equals(final Object o) { @Override public int hashCode() { - return Objects.hash(timestamp, measurements, attributes); + return Objects.hash(timestamp, taskId, measurements, attributes); } } diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchTaskMetadata.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchTaskMetadata.java new file mode 100644 index 0000000000000..06d739313c7e3 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchTaskMetadata.java @@ -0,0 +1,101 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.model; + +import org.opensearch.common.annotation.PublicApi; +import org.opensearch.core.common.Strings; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage; +import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +/** + * Task resource usage information with minimal information about the task + *

+ * Writeable TaskResourceInfo objects are used to represent resource usage + * information of running tasks, which can be propagated to coordinator node + * to infer query-level resource usage + * + * @opensearch.api + */ +@PublicApi(since = "2.1.0") +public class SearchTaskMetadata implements Writeable, ToXContentObject { + public TaskResourceUsage taskResourceUsage; + public String action; + public long taskId; + public long parentTaskId; + + + /** + * Create the TopQueries Object from StreamInput + * @param in A {@link StreamInput} object. + * @throws IOException IOException + */ + public SearchTaskMetadata(final StreamInput in) throws IOException { + action = in.readString(); + taskId = in.readLong(); + taskResourceUsage = TaskResourceUsage.readFromStream(in); + parentTaskId = in.readLong(); + } + + /** + * Create the TopQueries Object + */ + public SearchTaskMetadata(String action, long taskId, long parentTaskId, TaskResourceUsage taskResourceUsage) { + this.action = action; + this.taskId = taskId; + this.taskResourceUsage = taskResourceUsage; + this.parentTaskId = parentTaskId; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(action); + out.writeLong(taskId); + taskResourceUsage.writeTo(out); + out.writeLong(parentTaskId); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + // TODO: change to a constant + builder.field("Action", action); + builder.field("TaskId", taskId); + builder.field("ParentTaskId", parentTaskId); + taskResourceUsage.toXContent(builder, params); + return builder.endObject(); + } + + @Override + public String toString() { + return Strings.toString(MediaTypeRegistry.JSON, this, false, true); + } + + // Implements equals and hashcode for testing + @Override + public boolean equals(Object obj) { + if (obj == null || obj.getClass() != SearchTaskMetadata.class) { + return false; + } + SearchTaskMetadata other = (SearchTaskMetadata) obj; + return action.equals(other.action) && taskId == other.taskId && taskResourceUsage.equals(other.taskResourceUsage); + } + + @Override + public int hashCode() { + return Objects.hash(action, taskId, taskResourceUsage); + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesAction.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesAction.java index 6aa511c626ab1..3050e251d15a1 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesAction.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesAction.java @@ -9,6 +9,8 @@ package org.opensearch.plugin.insights.rules.resthandler.top_queries; import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.Strings; import org.opensearch.core.rest.RestStatus; @@ -27,6 +29,7 @@ import java.util.List; import java.util.Locale; import java.util.Set; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_QUERIES_BASE_URI; @@ -40,11 +43,14 @@ public class RestTopQueriesAction extends BaseRestHandler { /** The metric types that are allowed in top N queries */ static final Set ALLOWED_METRICS = MetricType.allMetricTypes().stream().map(MetricType::toString).collect(Collectors.toSet()); + private Supplier nodes; /** * Constructor for RestTopQueriesAction */ - public RestTopQueriesAction() {} + public RestTopQueriesAction(Supplier nodes) { + this.nodes = nodes; + } @Override public List routes() { @@ -67,15 +73,16 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC return channel -> client.execute(TopQueriesAction.INSTANCE, topQueriesRequest, topQueriesResponse(channel)); } - static TopQueriesRequest prepareRequest(final RestRequest request) { - final String[] nodesIds = Strings.splitStringByCommaToArray(request.param("nodeId")); + private TopQueriesRequest prepareRequest(final RestRequest request) { +// final String[] nodesIds = Strings.splitStringByCommaToArray(request.param("nodeId")); final String metricType = request.param("type", MetricType.LATENCY.toString()); if (!ALLOWED_METRICS.contains(metricType)) { throw new IllegalArgumentException( String.format(Locale.ROOT, "request [%s] contains invalid metric type [%s]", request.path(), metricType) ); } - return new TopQueriesRequest(MetricType.fromString(metricType), nodesIds); +// return new TopQueriesRequest(MetricType.fromString(metricType), nodesIds); + return new TopQueriesRequest(MetricType.fromString(metricType), nodes.get().getClusterManagerNodeId()); } @Override diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportSearchMetadataAction.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportSearchMetadataAction.java new file mode 100644 index 0000000000000..169c1a02774c0 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportSearchMetadataAction.java @@ -0,0 +1,142 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.transport.top_queries; + +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.nodes.TransportNodesAction; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.plugin.insights.core.service.QueryInsightsService; +import org.opensearch.plugin.insights.rules.action.top_queries.SearchMetadata; +import org.opensearch.plugin.insights.rules.action.top_queries.SearchMetadataAction; +import org.opensearch.plugin.insights.rules.action.top_queries.SearchMetadataRequest; +import org.opensearch.plugin.insights.rules.action.top_queries.SearchMetadataResponse; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.List; + +/** + * Transport action for cluster/node level top queries information. + * + * @opensearch.internal + */ +public class TransportSearchMetadataAction extends TransportNodesAction< + SearchMetadataRequest, + SearchMetadataResponse, + TransportSearchMetadataAction.NodeRequest, + SearchMetadata> { + + private final QueryInsightsService queryInsightsService; + + /** + * Create the TransportTopQueriesAction Object + + * @param threadPool The OpenSearch thread pool to run async tasks + * @param clusterService The clusterService of this node + * @param transportService The TransportService of this node + * @param queryInsightsService The topQueriesByLatencyService associated with this Transport Action + * @param actionFilters the action filters + */ + @Inject + public TransportSearchMetadataAction( + final ThreadPool threadPool, + final ClusterService clusterService, + final TransportService transportService, + final QueryInsightsService queryInsightsService, + final ActionFilters actionFilters + ) { + super( + SearchMetadataAction.NAME, + threadPool, + clusterService, + transportService, + actionFilters, + SearchMetadataRequest::new, + NodeRequest::new, + ThreadPool.Names.GENERIC, + SearchMetadata.class + ); + this.queryInsightsService = queryInsightsService; + } + + @Override + protected SearchMetadataResponse newResponse( + final SearchMetadataRequest searchMetadataRequest, + final List responses, + final List failures + ) { + return new SearchMetadataResponse( + clusterService.getClusterName(), + responses, + failures + ); + } + + @Override + protected NodeRequest newNodeRequest(final SearchMetadataRequest request) { + return new NodeRequest(request); + } + + @Override + protected SearchMetadata newNodeResponse(final StreamInput in) throws IOException { + return new SearchMetadata(in); + } + + @Override + protected SearchMetadata nodeOperation(final NodeRequest nodeRequest) { + final SearchMetadataRequest request = nodeRequest.request; + return new SearchMetadata( + clusterService.localNode(), + queryInsightsService.getQueryRecordsList(), + queryInsightsService.getTaskRecordsList(), + queryInsightsService.getTaskStatusMap() + ); + } + + /** + * Inner Node Top Queries Request + * + * @opensearch.internal + */ + public static class NodeRequest extends TransportRequest { + + final SearchMetadataRequest request; + + /** + * Create the NodeResponse object from StreamInput + * + * @param in the StreamInput to read the object + * @throws IOException IOException + */ + public NodeRequest(StreamInput in) throws IOException { + super(in); + request = new SearchMetadataRequest(in); + } + + /** + * Create the NodeResponse object from a TopQueriesRequest + * @param request the TopQueriesRequest object + */ + public NodeRequest(final SearchMetadataRequest request) { + this.request = request; + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + request.writeTo(out); + } + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java index 52cc1fbde790f..09594a3e21e7c 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java @@ -38,7 +38,8 @@ public class QueryInsightsSettings { /** * Time interval for record queue consumer to run */ - public static final TimeValue QUERY_RECORD_QUEUE_DRAIN_INTERVAL = new TimeValue(5, TimeUnit.SECONDS); + // TODO revert after debugging + public static final TimeValue QUERY_RECORD_QUEUE_DRAIN_INTERVAL = new TimeValue(10, TimeUnit.SECONDS); /** * Default Values and Settings */ diff --git a/server/src/main/java/org/opensearch/action/search/FetchSearchPhase.java b/server/src/main/java/org/opensearch/action/search/FetchSearchPhase.java index ebb2f33f8f37d..7a6aa88111d1d 100644 --- a/server/src/main/java/org/opensearch/action/search/FetchSearchPhase.java +++ b/server/src/main/java/org/opensearch/action/search/FetchSearchPhase.java @@ -110,7 +110,6 @@ final class FetchSearchPhase extends SearchPhase { this.resultConsumer = resultConsumer; this.progressListener = context.getTask().getProgressListener(); } - @Override public void run() { context.execute(new AbstractRunnable() { diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 547f610f4a752..029db778e176b 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -864,36 +864,6 @@ protected Node( metadataCreateIndexService ); - Collection pluginComponents = pluginsService.filterPlugins(Plugin.class) - .stream() - .flatMap( - p -> p.createComponents( - client, - clusterService, - threadPool, - resourceWatcherService, - scriptService, - xContentRegistry, - environment, - nodeEnvironment, - namedWriteableRegistry, - clusterModule.getIndexNameExpressionResolver(), - repositoriesServiceReference::get - ).stream() - ) - .collect(Collectors.toList()); - - // register all standard SearchRequestOperationsCompositeListenerFactory to the SearchRequestOperationsCompositeListenerFactory - final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory = - new SearchRequestOperationsCompositeListenerFactory( - Stream.concat( - Stream.of(searchRequestStats, searchRequestSlowLog), - pluginComponents.stream() - .filter(p -> p instanceof SearchRequestOperationsListener) - .map(p -> (SearchRequestOperationsListener) p) - ).toArray(SearchRequestOperationsListener[]::new) - ); - ActionModule actionModule = new ActionModule( settings, clusterModule.getIndexNameExpressionResolver(), @@ -1025,6 +995,38 @@ protected Node( transportService.getTaskManager() ); + Collection pluginComponents = pluginsService.filterPlugins(Plugin.class) + .stream() + .flatMap( + p -> p.createComponents( + client, + clusterService, + threadPool, + resourceWatcherService, + scriptService, + xContentRegistry, + environment, + nodeEnvironment, + namedWriteableRegistry, + clusterModule.getIndexNameExpressionResolver(), + repositoriesServiceReference::get, + taskResourceTrackingService + ).stream() + ) + .collect(Collectors.toList()); + + // register all standard SearchRequestOperationsCompositeListenerFactory to the SearchRequestOperationsCompositeListenerFactory + final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory = + new SearchRequestOperationsCompositeListenerFactory( + Stream.concat( + Stream.of(searchRequestStats, searchRequestSlowLog), + pluginComponents.stream() + .filter(p -> p instanceof SearchRequestOperationsListener) + .map(p -> (SearchRequestOperationsListener) p) + ).toArray(SearchRequestOperationsListener[]::new) + ); + + final SegmentReplicationStatsTracker segmentReplicationStatsTracker = new SegmentReplicationStatsTracker(indicesService); RepositoriesModule repositoriesModule = new RepositoriesModule( this.environment, diff --git a/server/src/main/java/org/opensearch/plugins/Plugin.java b/server/src/main/java/org/opensearch/plugins/Plugin.java index 48486a6b55dfd..89df44f28ffbd 100644 --- a/server/src/main/java/org/opensearch/plugins/Plugin.java +++ b/server/src/main/java/org/opensearch/plugins/Plugin.java @@ -56,6 +56,7 @@ import org.opensearch.index.shard.IndexSettingProvider; import org.opensearch.repositories.RepositoriesService; import org.opensearch.script.ScriptService; +import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.ThreadPool; import org.opensearch.watcher.ResourceWatcherService; @@ -150,7 +151,8 @@ public Collection createComponents( NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry, IndexNameExpressionResolver indexNameExpressionResolver, - Supplier repositoriesServiceSupplier + Supplier repositoriesServiceSupplier, + TaskResourceTrackingService taskResourceTrackingService ) { return Collections.emptyList(); } diff --git a/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java b/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java index f32559f6314c0..53202204cb8ba 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java +++ b/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java @@ -56,6 +56,7 @@ public class TaskResourceTrackingService implements RunnableTaskExecutionListene private final ConcurrentMapLong resourceAwareTasks = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency(); private final List taskCompletionListeners = new ArrayList<>(); + private final List taskStartListeners = new ArrayList<>(); private final ThreadPool threadPool; private volatile boolean taskResourceTrackingEnabled; @@ -96,6 +97,17 @@ public ThreadContext.StoredContext startTracking(Task task) { logger.debug("Starting resource tracking for task: {}", task.getId()); resourceAwareTasks.put(task.getId(), task); + + List exceptions = new ArrayList<>(); + for (TaskStartListener listener : taskStartListeners) { + try { + listener.onTaskStarts(task); + } catch (Exception e) { + exceptions.add(e); + } + } + ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptions); + return addTaskIdToThreadContext(task); } @@ -268,7 +280,17 @@ public interface TaskCompletionListener { void onTaskCompleted(Task task); } + /** + * Listener that gets invoked when a task execution starts. + */ + public interface TaskStartListener { + void onTaskStarts(Task task); + } + public void addTaskCompletionListener(TaskCompletionListener listener) { this.taskCompletionListeners.add(listener); } + public void addTaskStartListener(TaskStartListener listener) { + this.taskStartListeners.add(listener); + } }