Skip to content

Commit

Permalink
SQL in search API
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <reddyvam@amazon.com>

SQL in search API

Signed-off-by: Vamsi Manohar <reddyvam@amazon.com>
  • Loading branch information
vamsi-amazon committed Feb 19, 2024
1 parent 84750b3 commit 187206b
Show file tree
Hide file tree
Showing 11 changed files with 466 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.search.SearchExtBuilder;
import org.opensearch.search.SearchHits;
import org.opensearch.search.aggregations.Aggregations;
import org.opensearch.search.externalengine.QueryEngineExtBuilder;
import org.opensearch.search.profile.ProfileShardResult;
import org.opensearch.search.profile.SearchProfileShardResults;
import org.opensearch.search.suggest.Suggest;
Expand Down Expand Up @@ -74,6 +75,7 @@ public class SearchResponseSections implements ToXContentFragment {
protected final Boolean terminatedEarly;
protected final int numReducePhases;
protected final List<SearchExtBuilder> searchExtBuilders = new ArrayList<>();
protected final List<QueryEngineExtBuilder> queryEngineExtBuilders = new ArrayList<>();

public SearchResponseSections(
SearchHits hits,
Expand All @@ -84,7 +86,7 @@ public SearchResponseSections(
SearchProfileShardResults profileResults,
int numReducePhases
) {
this(hits, aggregations, suggest, timedOut, terminatedEarly, profileResults, numReducePhases, Collections.emptyList());
this(hits, aggregations, suggest, timedOut, terminatedEarly, profileResults, numReducePhases, Collections.emptyList(), Collections.emptyList());
}

public SearchResponseSections(
Expand All @@ -107,6 +109,28 @@ public SearchResponseSections(
this.searchExtBuilders.addAll(Objects.requireNonNull(searchExtBuilders, "searchExtBuilders must not be null"));
}

public SearchResponseSections(
SearchHits hits,
Aggregations aggregations,
Suggest suggest,
boolean timedOut,
Boolean terminatedEarly,
SearchProfileShardResults profileResults,
int numReducePhases,
List<SearchExtBuilder> searchExtBuilders,
List<QueryEngineExtBuilder> queryEngineExtBuilders
) {
this.hits = hits;
this.aggregations = aggregations;
this.suggest = suggest;
this.profileResults = profileResults;
this.timedOut = timedOut;
this.terminatedEarly = terminatedEarly;
this.numReducePhases = numReducePhases;
this.searchExtBuilders.addAll(Objects.requireNonNull(searchExtBuilders, "searchExtBuilders must not be null"));
this.queryEngineExtBuilders.addAll(Objects.requireNonNull(queryEngineExtBuilders, "queryEngineExtBuilders must not be null"));
}

public final boolean timedOut() {
return this.timedOut;
}
Expand Down Expand Up @@ -166,6 +190,13 @@ public final XContentBuilder toXContent(XContentBuilder builder, Params params)
}
builder.endObject();
}

if(!queryEngineExtBuilders.isEmpty()) {
for (QueryEngineExtBuilder queryEngineExtBuilder: queryEngineExtBuilders) {
queryEngineExtBuilder.toXContent(builder, params);
}
}

return builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,11 @@ private void executeRequest(
relativeStartNanos,
System::nanoTime
);
// taking over by query engine.
if (!originalSearchRequest.source().queryEngines().isEmpty()) {
originalSearchRequest.source().queryEngines().get(0).executeQuery(originalSearchRequest, originalListener);
return;
}
if (originalSearchRequest.isPhaseTook() == null) {
originalSearchRequest.setPhaseTook(clusterService.getClusterSettings().get(SEARCH_PHASE_TOOK_ENABLED));
}
Expand Down
48 changes: 48 additions & 0 deletions server/src/main/java/org/opensearch/plugins/SearchPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import org.opensearch.search.aggregations.pipeline.MovAvgPipelineAggregator;
import org.opensearch.search.aggregations.pipeline.PipelineAggregator;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.search.externalengine.QueryEngineParser;
import org.opensearch.search.externalengine.QueryEngine;
import org.opensearch.search.fetch.FetchSubPhase;
import org.opensearch.search.fetch.subphase.highlight.Highlighter;
import org.opensearch.search.query.QueryPhaseSearcher;
Expand Down Expand Up @@ -216,6 +218,10 @@ default Optional<ExecutorServiceProvider> getIndexSearcherExecutorProvider() {
return Optional.empty();
}

default List<QueryEngineSpec<?>> getQueryEnginesSpecs() {
return emptyList();
}

/**
* Executor service provider
*/
Expand Down Expand Up @@ -877,4 +883,46 @@ public Map<String, Highlighter> getHighlighters() {
return highlighters;
}
}

/**
* Specification for a {@link SearchExtBuilder} which represents an additional section that can be
* parsed in a search request (within the ext element).
*/
class QueryEngineSpec<T extends QueryEngine> extends SearchExtensionSpec<T, QueryEngineParser<T>> {
/**
* Specification of custom {@link SearchExtBuilder}.
*
* @param name holds the names by which this search ext might be parsed. The {@link ParseField#getPreferredName()} is special as it
* is the name by under which the reader is registered. So it is the name that the search ext should use as its
* {@link NamedWriteable#getWriteableName()} too. It is an error if {@link ParseField#getPreferredName()} conflicts with
* another registered name, including names from other plugins.
* @param reader the reader registered for this search ext's builder. Typically, a reference to a constructor that takes a
* {@link StreamInput}
* @param parser the parser function that reads the search ext builder from xcontent
*/
public QueryEngineSpec(
ParseField name,
Writeable.Reader<? extends T> reader,
QueryEngineParser<T> parser
) {
super(name, reader, parser);
}

/**
* Specification of custom {@link SearchExtBuilder}.
*
* @param name the name by which this search ext might be parsed or deserialized. Make sure that the search ext builder returns this name for
* {@link NamedWriteable#getWriteableName()}. It is an error if this name conflicts with another registered name, including
* names from other plugins.
* @param reader the reader registered for this search ext's builder. Typically, a reference to a constructor that takes a
* {@link StreamInput}
* @param parser the parser function that reads the search ext builder from xcontent
*/
public QueryEngineSpec(String name,
Writeable.Reader<? extends T> reader,
QueryEngineParser<T> parser) {
super(name, reader, parser);
}

}
}
14 changes: 14 additions & 0 deletions server/src/main/java/org/opensearch/search/SearchModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import org.opensearch.plugins.SearchPlugin.SignificanceHeuristicSpec;
import org.opensearch.plugins.SearchPlugin.SortSpec;
import org.opensearch.plugins.SearchPlugin.SuggesterSpec;
import org.opensearch.plugins.SearchPlugin.QueryEngineSpec;
import org.opensearch.search.aggregations.AggregationBuilder;
import org.opensearch.search.aggregations.BaseAggregationBuilder;
import org.opensearch.search.aggregations.InternalAggregation;
Expand Down Expand Up @@ -240,6 +241,8 @@
import org.opensearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.search.externalengine.QueryEngine;
import org.opensearch.search.externalengine.SQLQueryEngine;
import org.opensearch.search.fetch.FetchPhase;
import org.opensearch.search.fetch.FetchSubPhase;
import org.opensearch.search.fetch.subphase.ExplainPhase;
Expand Down Expand Up @@ -340,6 +343,7 @@ public SearchModule(Settings settings, List<SearchPlugin> plugins) {
registerQueryParsers(plugins);
registerRescorers(plugins);
registerSortParsers(plugins);
registerQueryEngines(plugins);
registerValueFormats();
registerSignificanceHeuristics(plugins);
this.valuesSourceRegistry = registerAggregations(plugins);
Expand Down Expand Up @@ -838,6 +842,16 @@ private void registerRescorers(List<SearchPlugin> plugins) {
registerFromPlugin(plugins, SearchPlugin::getRescorers, this::registerRescorer);
}

private void registerQueryEngines(List<SearchPlugin> plugins) {
registerQueryEngine(new SearchPlugin.QueryEngineSpec<>(SQLQueryEngine.NAME, SQLQueryEngine::new, SQLQueryEngine::fromXContent));
registerFromPlugin(plugins, SearchPlugin::getQueryEnginesSpecs, this::registerQueryEngine);
}

private void registerQueryEngine(SearchPlugin.QueryEngineSpec<?> spec) {
namedXContents.add(new NamedXContentRegistry.Entry(QueryEngine.class, spec.getName(), (p, c) -> spec.getParser().fromXContent(p)));
namedWriteables.add(new NamedWriteableRegistry.Entry(QueryEngine.class, spec.getName().getPreferredName(), spec.getReader()));
}

private void registerRescorer(RescorerSpec<?> spec) {
namedXContents.add(new NamedXContentRegistry.Entry(RescorerBuilder.class, spec.getName(), (p, c) -> spec.getParser().apply(p)));
namedWriteables.add(new NamedWriteableRegistry.Entry(RescorerBuilder.class, spec.getName().getPreferredName(), spec.getReader()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.opensearch.search.aggregations.AggregatorFactories;
import org.opensearch.search.aggregations.PipelineAggregationBuilder;
import org.opensearch.search.collapse.CollapseBuilder;
import org.opensearch.search.externalengine.QueryEngine;
import org.opensearch.search.fetch.StoredFieldsContext;
import org.opensearch.search.fetch.subphase.FetchSourceContext;
import org.opensearch.search.fetch.subphase.FieldAndFormat;
Expand Down Expand Up @@ -216,6 +217,7 @@ public static HighlightBuilder highlight() {
private PointInTimeBuilder pointInTimeBuilder = null;

private Map<String, Object> searchPipelineSource = null;
private List<QueryEngine> queryEngines = new ArrayList<>();

/**
* Constructs a new search source builder.
Expand Down Expand Up @@ -1039,6 +1041,15 @@ public SearchSourceBuilder searchPipelineSource(Map<String, Object> searchPipeli
return this;
}

public List<QueryEngine> queryEngines() {
return queryEngines;
}

public List<QueryEngine> queryEngines(List<QueryEngine> queryEngines) {
this.queryEngines = queryEngines;
return queryEngines;
}

/**
* Rewrites this search source builder into its primitive form. e.g. by
* rewriting the QueryBuilder. If the builder did not change the identity
Expand Down Expand Up @@ -1282,11 +1293,17 @@ public void parseXContent(XContentParser parser, boolean checkTrailingTokens) th
} else if (SEARCH_PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
searchPipelineSource = parser.mapOrdered();
} else {
throw new ParsingException(
parser.getTokenLocation(),
"Unknown key for a " + token + " in [" + currentFieldName + "].",
parser.getTokenLocation()
);
QueryEngine queryEngine = parser.namedObject(QueryEngine.class, currentFieldName, null);
if (queryEngine != null) {
queryEngines.add(queryEngine);
}
else {
throw new ParsingException(
parser.getTokenLocation(),
"Unknown key for a " + token + " in [" + currentFieldName + "].",
parser.getTokenLocation()
);
}
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (STORED_FIELDS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.externalengine;

import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.NamedWriteable;
import org.opensearch.core.xcontent.ToXContentObject;

/**
* QueryEngine abstract interface.
*/
public abstract class QueryEngine implements NamedWriteable, ToXContentObject {
public abstract void executeQuery(SearchRequest searchRequest,
ActionListener<SearchResponse> actionListener);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.search.externalengine;

import org.opensearch.common.CheckedFunction;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.io.stream.NamedWriteable;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.plugins.SearchPlugin;
import org.opensearch.plugins.SearchPlugin.SearchExtSpec;

/**
* Intermediate serializable representation of a search ext section. To be subclassed by plugins that support
* a custom section as part of a search request, which will be provided within the ext element.
* Any state needs to be serialized as part of the {@link Writeable#writeTo(StreamOutput)} method and
* read from the incoming stream, usually done adding a constructor that takes {@link StreamInput} as
* an argument.
* <p>
* Registration happens through {@link SearchPlugin#getSearchExts()}, which also needs a {@link CheckedFunction} that's able to parse
* the incoming request from the REST layer into the proper {@link QueryEngineExtBuilder} subclass.
* <p>
* {@link #getWriteableName()} must return the same name as the one used for the registration
* of the {@link SearchExtSpec}.
*
* @see SearchExtSpec
*
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public abstract class QueryEngineExtBuilder implements NamedWriteable, ToXContentFragment {

public abstract int hashCode();

public abstract boolean equals(Object obj);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.externalengine;

import java.io.IOException;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.query.QueryBuilder;


/**
* Query Engine Parser.
* @param <T> extend QuerEgnien.
*/
@FunctionalInterface
public interface QueryEngineParser<T extends QueryEngine> {

/**
* Creates a new {@link QueryBuilder} from the query held by the
* {@link XContentParser}. The state on the parser contained in this context
* will be changed as a side effect of this method call
*/
T fromXContent(XContentParser parser) throws IOException;

}
Loading

0 comments on commit 187206b

Please sign in to comment.