From e390c3292f78258c88bafba2a02e3f13cb487ebb Mon Sep 17 00:00:00 2001 From: Vamsi Manohar Date: Thu, 15 Feb 2024 09:53:52 -0800 Subject: [PATCH] PPL in search API Signed-off-by: Vamsi Manohar --- .../org/opensearch/sql/plugin/SQLPlugin.java | 21 +- .../plugin/queryengine/PPLQueryEngine.java | 196 +++++++++++++++++ .../plugin/queryengine/SQLQueryEngine.java | 197 ++++++++++++++++++ 3 files changed, 412 insertions(+), 2 deletions(-) create mode 100644 plugin/src/main/java/org/opensearch/sql/plugin/queryengine/PPLQueryEngine.java create mode 100644 plugin/src/main/java/org/opensearch/sql/plugin/queryengine/SQLQueryEngine.java diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index 2b75a8b2c9..b696e51f7f 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -42,6 +42,7 @@ import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.Plugin; import org.opensearch.plugins.ScriptPlugin; +import org.opensearch.plugins.SearchPlugin; import org.opensearch.repositories.RepositoriesService; import org.opensearch.rest.RestController; import org.opensearch.rest.RestHandler; @@ -71,12 +72,15 @@ import org.opensearch.sql.opensearch.storage.script.ExpressionScriptEngine; import org.opensearch.sql.opensearch.storage.serialization.DefaultExpressionSerializer; import org.opensearch.sql.plugin.config.OpenSearchPluginModule; +import org.opensearch.sql.plugin.queryengine.PPLQueryEngine; +import org.opensearch.sql.plugin.queryengine.SQLQueryEngine; import org.opensearch.sql.plugin.rest.RestPPLQueryAction; import org.opensearch.sql.plugin.rest.RestPPLStatsAction; import org.opensearch.sql.plugin.rest.RestQuerySettingsAction; import org.opensearch.sql.plugin.transport.PPLQueryAction; import org.opensearch.sql.plugin.transport.TransportPPLQueryAction; import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse; +import org.opensearch.sql.ppl.PPLService; import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory; import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService; import org.opensearch.sql.spark.cluster.ClusterManagerEventListener; @@ -89,13 +93,14 @@ import org.opensearch.sql.spark.transport.model.CancelAsyncQueryActionResponse; import org.opensearch.sql.spark.transport.model.CreateAsyncQueryActionResponse; import org.opensearch.sql.spark.transport.model.GetAsyncQueryResultActionResponse; +import org.opensearch.sql.sql.SQLService; import org.opensearch.sql.storage.DataSourceFactory; import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.FixedExecutorBuilder; import org.opensearch.threadpool.ThreadPool; import org.opensearch.watcher.ResourceWatcherService; -public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin { +public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin, SearchPlugin { private static final Logger LOGGER = LogManager.getLogger(SQLPlugin.class); @@ -109,7 +114,7 @@ public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin { private Injector injector; public String name() { - return "sql"; + return "sql_plugin"; } public String description() { @@ -222,6 +227,8 @@ public Collection createComponents( OpenSearchSettings.RESULT_INDEX_TTL_SETTING, OpenSearchSettings.AUTO_INDEX_MANAGEMENT_ENABLED_SETTING, environment.settings()); + PPLQueryEngine.initialize(injector.getInstance(PPLService.class)); + SQLQueryEngine.initialize(injector.getInstance(SQLService.class)); return ImmutableList.of( dataSourceService, injector.getInstance(AsyncQueryExecutorService.class), @@ -283,4 +290,14 @@ private DataSourceServiceImpl createDataSourceService() { dataSourceMetadataStorage, dataSourceUserAuthorizationHelper); } + + @Override + public List> getQueryEnginesSpecs() { + return Arrays.asList( new QueryEngineSpec<>( + PPLQueryEngine.NAME, PPLQueryEngine::new, PPLQueryEngine::fromXContent), + new QueryEngineSpec<>( + SQLQueryEngine.NAME, SQLQueryEngine::new, SQLQueryEngine::fromXContent) + ); + } + } diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/queryengine/PPLQueryEngine.java b/plugin/src/main/java/org/opensearch/sql/plugin/queryengine/PPLQueryEngine.java new file mode 100644 index 0000000000..519e92afdc --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/queryengine/PPLQueryEngine.java @@ -0,0 +1,196 @@ +package org.opensearch.sql.plugin.queryengine; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import org.apache.lucene.search.TotalHits; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.search.ShardSearchFailure; +import org.opensearch.core.ParseField; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.search.SearchHit; +import org.opensearch.search.SearchHits; +import org.opensearch.search.aggregations.InternalAggregations; +import org.opensearch.search.externalengine.QueryEngine; +import org.opensearch.search.externalengine.QueryEngineExtBuilder; +import org.opensearch.search.internal.InternalSearchResponse; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.ppl.PPLService; +import org.opensearch.sql.ppl.domain.PPLQueryRequest; + +public class PPLQueryEngine extends QueryEngine { + + public static final String NAME = "ppl"; + private static PPLService pplService; + private String query; + + public static void initialize(PPLService pplService) { + PPLQueryEngine.pplService = pplService; + } + + @Override + public void executeQuery( + SearchRequest searchRequest, ActionListener actionListener) { + PPLQueryRequest pplQueryRequest = new PPLQueryRequest(query, null, "_search", "json"); + pplService.execute( + pplQueryRequest, + new ResponseListener<>() { + @Override + public void onResponse(ExecutionEngine.QueryResponse queryResponse) { + SearchResponse searchResponse = + transformFromQueryResponseToSearchResponse(queryResponse); + actionListener.onResponse(searchResponse); + } + + @Override + public void onFailure(Exception e) { + actionListener.onFailure(e); + } + }); + } + + private SearchResponse transformFromQueryResponseToSearchResponse( + ExecutionEngine.QueryResponse queryResponse) { + SearchHit[] hits = new SearchHit[0]; + return new SearchResponse( + new InternalSearchResponse( + new SearchHits(hits, new TotalHits(0L, TotalHits.Relation.EQUAL_TO), 0.0F), + (InternalAggregations) null, + null, + null, + false, + (Boolean) null, + 1, + Collections.emptyList(), + List.of(new PPLQueryEngine.PPLResponseExternalBuilder(queryResponse))), + (String) null, + 0, + 0, + 0, + 0L, + ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY, + null); + } + + static class PPLResponseExternalBuilder extends QueryEngineExtBuilder { + + static ParseField DUMMY_FIELD = new ParseField("ppl"); + + protected final ExecutionEngine.QueryResponse queryResponse; + + public PPLResponseExternalBuilder(ExecutionEngine.QueryResponse queryResponse) { + this.queryResponse = queryResponse; + } + + public PPLResponseExternalBuilder(StreamInput in) throws IOException { + this.queryResponse = null; + } + + @Override + public String getWriteableName() { + return DUMMY_FIELD.getPreferredName(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString("1"); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + // Serialize the schema + builder.startObject(NAME); + ArrayList columnNames = new ArrayList<>(); + builder.startArray("schema"); + for (ExecutionEngine.Schema.Column column : queryResponse.getSchema().getColumns()) { + builder.startObject(); + String columnName = getColumnName(column); + columnNames.add(columnName); + builder.field("name", columnName); + builder.field("type", column.getExprType().typeName().toLowerCase(Locale.ROOT)); + builder.endObject(); + } + builder.endArray(); + builder.startArray("datarows"); + for (ExprValue result : queryResponse.getResults()) { + builder.startArray(); + for (String columnName : columnNames) { + builder.value(result.tupleValue().get(columnName).value()); + } + builder.endArray(); + } + builder.endArray(); + builder.field("total", queryResponse.getResults().size()); + builder.field("size", queryResponse.getResults().size()); + builder.endObject(); + return builder; + } + + @Override + public int hashCode() { + return 0; + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + return true; + } + + public static PPLQueryEngine.PPLResponseExternalBuilder parse(XContentParser parser) + throws IOException { + return null; + } + + private String getColumnName(ExecutionEngine.Schema.Column column) { + return (column.getAlias() != null) ? column.getAlias() : column.getName(); + } + + } + + public PPLQueryEngine(String query) { + this.query = query; + } + + public PPLQueryEngine(StreamInput in) {} + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + public void writeTo(StreamOutput out) throws IOException {} + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return null; + } + + public static QueryEngine fromXContent(XContentParser parser) throws IOException { + XContentParser.Token token; + String query = ""; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + String fieldName = parser.currentName(); + token = parser.nextToken(); + if (fieldName.equals("query")) { + query = parser.textOrNull(); + } + } + return new PPLQueryEngine(query); + } + + +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/queryengine/SQLQueryEngine.java b/plugin/src/main/java/org/opensearch/sql/plugin/queryengine/SQLQueryEngine.java new file mode 100644 index 0000000000..49f2a12ba9 --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/queryengine/SQLQueryEngine.java @@ -0,0 +1,197 @@ +package org.opensearch.sql.plugin.queryengine; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import org.apache.lucene.search.TotalHits; +import org.json.JSONObject; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.search.ShardSearchFailure; +import org.opensearch.core.ParseField; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.search.SearchHit; +import org.opensearch.search.SearchHits; +import org.opensearch.search.aggregations.InternalAggregations; +import org.opensearch.search.externalengine.QueryEngine; +import org.opensearch.search.externalengine.QueryEngineExtBuilder; +import org.opensearch.search.internal.InternalSearchResponse; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.sql.SQLService; +import org.opensearch.sql.sql.domain.SQLQueryRequest; + +public class SQLQueryEngine extends QueryEngine { + + public static final String NAME = "sql"; + private static SQLService sqlService; + private String query; + + public static void initialize(SQLService sqlService) { + SQLQueryEngine.sqlService = sqlService; + } + + @Override + public void executeQuery( + SearchRequest searchRequest, ActionListener actionListener) { + SQLQueryRequest sqlQueryRequest = new SQLQueryRequest(new JSONObject(), query, "_search", "json"); + sqlService.execute( + sqlQueryRequest, + new ResponseListener<>() { + @Override + public void onResponse(ExecutionEngine.QueryResponse queryResponse) { + SearchResponse searchResponse = + transformFromQueryResponseToSearchResponse(queryResponse); + actionListener.onResponse(searchResponse); + } + + @Override + public void onFailure(Exception e) { + actionListener.onFailure(e); + } + }); + } + + private SearchResponse transformFromQueryResponseToSearchResponse( + ExecutionEngine.QueryResponse queryResponse) { + SearchHit[] hits = new SearchHit[0]; + return new SearchResponse( + new InternalSearchResponse( + new SearchHits(hits, new TotalHits(0L, TotalHits.Relation.EQUAL_TO), 0.0F), + (InternalAggregations) null, + null, + null, + false, + (Boolean) null, + 1, + Collections.emptyList(), + List.of(new SQLResponseExternalBuilder(queryResponse))), + (String) null, + 0, + 0, + 0, + 0L, + ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY, + null); + } + + static class SQLResponseExternalBuilder extends QueryEngineExtBuilder { + + static ParseField DUMMY_FIELD = new ParseField("sql"); + + protected final ExecutionEngine.QueryResponse queryResponse; + + public SQLResponseExternalBuilder(ExecutionEngine.QueryResponse queryResponse) { + this.queryResponse = queryResponse; + } + + public SQLResponseExternalBuilder(StreamInput in) throws IOException { + this.queryResponse = null; + } + + @Override + public String getWriteableName() { + return DUMMY_FIELD.getPreferredName(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString("1"); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + // Serialize the schema + builder.startObject(NAME); + ArrayList columnNames = new ArrayList<>(); + builder.startArray("schema"); + for (ExecutionEngine.Schema.Column column : queryResponse.getSchema().getColumns()) { + builder.startObject(); + String columnName = getColumnName(column); + columnNames.add(columnName); + builder.field("name", columnName); + builder.field("type", column.getExprType().typeName().toLowerCase(Locale.ROOT)); + builder.endObject(); + } + builder.endArray(); + builder.startArray("datarows"); + for (ExprValue result : queryResponse.getResults()) { + builder.startArray(); + for (String columnName : columnNames) { + builder.value(result.tupleValue().get(columnName).value()); + } + builder.endArray(); + } + builder.endArray(); + builder.field("total", queryResponse.getResults().size()); + builder.field("size", queryResponse.getResults().size()); + builder.endObject(); + return builder; + } + + @Override + public int hashCode() { + return 0; + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + return true; + } + + public static SQLResponseExternalBuilder parse(XContentParser parser) + throws IOException { + return null; + } + + private String getColumnName(ExecutionEngine.Schema.Column column) { + return (column.getAlias() != null) ? column.getAlias() : column.getName(); + } + + } + + public SQLQueryEngine(String query) { + this.query = query; + } + + public SQLQueryEngine(StreamInput in) {} + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + public void writeTo(StreamOutput out) throws IOException {} + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return null; + } + + public static QueryEngine fromXContent(XContentParser parser) throws IOException { + XContentParser.Token token; + String query = ""; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + String fieldName = parser.currentName(); + token = parser.nextToken(); + if (fieldName.equals("query")) { + query = parser.textOrNull(); + } + } + return new SQLQueryEngine(query); + } + + +}