Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query insights exporters implementation #12982

Merged
merged 6 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add capability to disable source recovery_source for an index ([#13590](https://github.com/opensearch-project/OpenSearch/pull/13590))
- Add remote routing table for remote state publication with experimental feature flag ([#13304](https://github.com/opensearch-project/OpenSearch/pull/13304))
- [Remote Store] Add support to disable flush based on translog reader count ([#14027](https://github.com/opensearch-project/OpenSearch/pull/14027))
- [Query Insights] Add exporter support for top n queries ([#12982](https://github.com/opensearch-project/OpenSearch/pull/12982))

### Dependencies
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public Collection<Object> createComponents(
final Supplier<RepositoriesService> repositoriesServiceSupplier
) {
// create top n queries service
final QueryInsightsService queryInsightsService = new QueryInsightsService(threadPool);
final QueryInsightsService queryInsightsService = new QueryInsightsService(clusterService.getClusterSettings(), threadPool, client);
return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService));
}

Expand Down Expand Up @@ -110,7 +110,8 @@ public List<Setting<?>> getSettings() {
// Settings for top N queries
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED,
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE,
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE,
QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.insights.core.exporter;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;

import java.util.List;

/**
* Debug exporter for development purpose
*/
public final class DebugExporter implements QueryInsightsExporter {
/**
* Logger of the debug exporter
*/
private final Logger logger = LogManager.getLogger();

/**
* Constructor of DebugExporter
*/
private DebugExporter() {}

private static class InstanceHolder {
private static final DebugExporter INSTANCE = new DebugExporter();
}

/**
Get the singleton instance of DebugExporter
*
@return DebugExporter instance
*/
public static DebugExporter getInstance() {
return InstanceHolder.INSTANCE;
}

/**
* Write the list of SearchQueryRecord to debug log
*
* @param records list of {@link SearchQueryRecord}
*/
@Override
public void export(final List<SearchQueryRecord> records) {
logger.debug("QUERY_INSIGHTS_RECORDS: " + records.toString());
}

/**
* Close the debugger exporter sink
*/
@Override
public void close() {
logger.debug("Closing the DebugExporter..");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.insights.core.exporter;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.bulk.BulkRequestBuilder;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.client.Client;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormatter;

import java.util.List;

/**
* Local index exporter for exporting query insights data to local OpenSearch indices.
*/
public final class LocalIndexExporter implements QueryInsightsExporter {
/**
* Logger of the local index exporter
*/
private final Logger logger = LogManager.getLogger();
private final Client client;
private DateTimeFormatter indexPattern;

/**
* Constructor of LocalIndexExporter
*
* @param client OS client
* @param indexPattern the pattern of index to export to
*/
public LocalIndexExporter(final Client client, final DateTimeFormatter indexPattern) {
this.indexPattern = indexPattern;
this.client = client;
}

/**
* Getter of indexPattern
*
* @return indexPattern
*/
public DateTimeFormatter getIndexPattern() {
return indexPattern;
}

/**
* Setter of indexPattern
*
* @param indexPattern index pattern
* @return the current LocalIndexExporter
*/
public LocalIndexExporter setIndexPattern(DateTimeFormatter indexPattern) {
this.indexPattern = indexPattern;
return this;
}

/**
* Export a list of SearchQueryRecord to a local index
*
* @param records list of {@link SearchQueryRecord}
*/
@Override
public void export(final List<SearchQueryRecord> records) {
if (records == null || records.size() == 0) {
return;
ansjcy marked this conversation as resolved.
Show resolved Hide resolved
}
try {
final String index = getDateTimeFromFormat();
ansjcy marked this conversation as resolved.
Show resolved Hide resolved
final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setTimeout(TimeValue.timeValueMinutes(1));
for (SearchQueryRecord record : records) {
bulkRequestBuilder.add(
ansjcy marked this conversation as resolved.
Show resolved Hide resolved
ansjcy marked this conversation as resolved.
Show resolved Hide resolved
new IndexRequest(index).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
);
}
bulkRequestBuilder.execute(new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkItemResponses) {}

Check warning on line 90 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java

View check run for this annotation

Codecov / codecov/patch

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java#L90

Added line #L90 was not covered by tests

@Override
public void onFailure(Exception e) {
logger.error("Failed to execute bulk operation for query insights data: ", e);
}

Check warning on line 95 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java

View check run for this annotation

Codecov / codecov/patch

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java#L94-L95

Added lines #L94 - L95 were not covered by tests
});
} catch (final Exception e) {
logger.error("Unable to index query insights data: ", e);

Check warning on line 98 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java

View check run for this annotation

Codecov / codecov/patch

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java#L97-L98

Added lines #L97 - L98 were not covered by tests
}
}

/**
* Close the exporter sink
*/
@Override
public void close() {
logger.debug("Closing the LocalIndexExporter..");
}

private String getDateTimeFromFormat() {
return indexPattern.print(DateTime.now(DateTimeZone.UTC));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.insights.core.exporter;

import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;

import java.io.Closeable;
import java.util.List;

/**
* Base interface for Query Insights exporters
*/
public interface QueryInsightsExporter extends Closeable {
/**
* Export a list of SearchQueryRecord to the exporter sink
*
* @param records list of {@link SearchQueryRecord}
*/
void export(final List<SearchQueryRecord> records);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.insights.core.exporter;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.Client;
import org.opensearch.common.settings.Settings;
import org.joda.time.format.DateTimeFormat;

import java.io.IOException;
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;

import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_QUERIES_EXPORTER_TYPE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORTER_TYPE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORT_INDEX;

/**
* Factory class for validating and creating exporters based on provided settings
*/
public class QueryInsightsExporterFactory {
/**
* Logger of the query insights exporter factory
*/
private final Logger logger = LogManager.getLogger();
final private Client client;
final private Set<QueryInsightsExporter> exporters;

/**
* Constructor of QueryInsightsExporterFactory
*
* @param client OS client
*/
public QueryInsightsExporterFactory(final Client client) {
this.client = client;
this.exporters = new HashSet<>();
}

/**
* Validate exporter sink config
*
* @param settings exporter sink config {@link Settings}
* @throws IllegalArgumentException if provided exporter sink config settings are invalid
*/
public void validateExporterConfig(final Settings settings) throws IllegalArgumentException {
// Disable exporter if the EXPORTER_TYPE setting is null
if (settings.get(EXPORTER_TYPE) == null) {
return;
}
SinkType type;
try {
type = SinkType.parse(settings.get(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"Invalid exporter type [%s], type should be one of %s",
settings.get(EXPORTER_TYPE),
SinkType.allSinkTypes()
)
);
}
switch (type) {
case LOCAL_INDEX:
final String indexPattern = settings.get(EXPORT_INDEX, DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN);
if (indexPattern.length() == 0) {
throw new IllegalArgumentException("Empty index pattern configured for the exporter");
}
try {
DateTimeFormat.forPattern(indexPattern);

Check warning on line 79 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java

View check run for this annotation

Codecov / codecov/patch

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java#L79

Added line #L79 was not covered by tests
ansjcy marked this conversation as resolved.
Show resolved Hide resolved
} catch (Exception e) {
throw new IllegalArgumentException(
String.format(Locale.ROOT, "Invalid index pattern [%s] configured for the exporter", indexPattern)
);
}

Check warning on line 84 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java

View check run for this annotation

Codecov / codecov/patch

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java#L84

Added line #L84 was not covered by tests
}
}

Check warning on line 86 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java

View check run for this annotation

Codecov / codecov/patch

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java#L86

Added line #L86 was not covered by tests

/**
* Create an exporter based on provided parameters
*
* @param type The type of exporter to create
* @param indexPattern the index pattern if creating a index exporter
* @return QueryInsightsExporter the created exporter sink
*/
public QueryInsightsExporter createExporter(SinkType type, String indexPattern) {
if (SinkType.LOCAL_INDEX.equals(type)) {
QueryInsightsExporter exporter = new LocalIndexExporter(client, DateTimeFormat.forPattern(indexPattern));
this.exporters.add(exporter);
return exporter;
}
return DebugExporter.getInstance();
}

/**
* Update an exporter based on provided parameters
*
* @param exporter The exporter to update
* @param indexPattern the index pattern if creating a index exporter
* @return QueryInsightsExporter the updated exporter sink
*/
public QueryInsightsExporter updateExporter(QueryInsightsExporter exporter, String indexPattern) {
if (exporter.getClass() == LocalIndexExporter.class) {
((LocalIndexExporter) exporter).setIndexPattern(DateTimeFormat.forPattern(indexPattern));
}
return exporter;
}

/**
* Close an exporter
*
* @param exporter the exporter to close
*/
public void closeExporter(QueryInsightsExporter exporter) throws IOException {
if (exporter != null) {
exporter.close();
this.exporters.remove(exporter);
}
}

/**
* Close all exporters
*
*/
public void closeAllExporters() {
for (QueryInsightsExporter exporter : exporters) {
try {
closeExporter(exporter);
} catch (IOException e) {
logger.error("Fail to close query insights exporter, error: ", e);
}
}

Check warning on line 141 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java

View check run for this annotation

Codecov / codecov/patch

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java#L137-L141

Added lines #L137 - L141 were not covered by tests
}
}
Loading
Loading