forked from opensearch-project/OpenSearch
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Initial search pipelines implementation
This commit includes the basic features of search pipelines (see opensearch-project/search-processor#80). Search pipelines are modeled after ingest pipelines and provide a simple, clean API for components to modify search requests and responses. With this commit we can: 1. Can create, retrieve, update, and delete search pipelines. 2. Transform search requests and responses by explicitly referencing a pipeline. Later work will include: 1. Adding an index setting to specify a default search pipeline. 2. Allowing search pipelines to be defined within a search request (for development/testing purposes, akin to simulating an ingest pipeline). 3. Adding a collection of search pipeline processors to support common useful transformations. (Suggestions welcome!) Signed-off-by: Michael Froh <froh@amazon.com>
- Loading branch information
Showing
65 changed files
with
3,993 additions
and
15 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
153 changes: 153 additions & 0 deletions
153
client/rest-high-level/src/main/java/org/opensearch/client/SearchPipelinesClient.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.client; | ||
|
||
import org.opensearch.action.ActionListener; | ||
import org.opensearch.action.search.DeleteSearchPipelineRequest; | ||
import org.opensearch.action.search.GetSearchPipelineRequest; | ||
import org.opensearch.action.search.GetSearchPipelineResponse; | ||
import org.opensearch.action.search.PutSearchPipelineRequest; | ||
import org.opensearch.action.support.master.AcknowledgedResponse; | ||
|
||
import java.io.IOException; | ||
import java.util.Collections; | ||
|
||
import static java.util.Collections.emptySet; | ||
|
||
public final class SearchPipelinesClient { | ||
private final RestHighLevelClient restHighLevelClient; | ||
|
||
SearchPipelinesClient(RestHighLevelClient restHighLevelClient) { | ||
this.restHighLevelClient = restHighLevelClient; | ||
} | ||
|
||
/** | ||
* Add a pipeline or update an existing pipeline. | ||
* | ||
* @param request the request | ||
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized | ||
* @return the response | ||
* @throws IOException in case there is a problem sending the request or parsing back the response | ||
*/ | ||
public AcknowledgedResponse putPipeline(PutSearchPipelineRequest request, RequestOptions options) throws IOException { | ||
return restHighLevelClient.performRequestAndParseEntity( | ||
request, | ||
SearchPipelinesRequestConverters::putPipeline, | ||
options, | ||
AcknowledgedResponse::fromXContent, | ||
emptySet() | ||
); | ||
} | ||
|
||
/** | ||
* Asynchronously add a pipeline or update an existing pipeline. | ||
* | ||
* @param request the request | ||
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized | ||
* @param listener the listener to be notified upon request completion | ||
* @return cancellable that may be used to cancel the request | ||
*/ | ||
public Cancellable putPipelineAsync( | ||
PutSearchPipelineRequest request, | ||
RequestOptions options, | ||
ActionListener<AcknowledgedResponse> listener | ||
) { | ||
return restHighLevelClient.performRequestAsyncAndParseEntity( | ||
request, | ||
SearchPipelinesRequestConverters::putPipeline, | ||
options, | ||
AcknowledgedResponse::fromXContent, | ||
listener, | ||
emptySet() | ||
); | ||
} | ||
|
||
/** | ||
* Get an existing pipeline. | ||
* | ||
* @param request the request | ||
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized | ||
* @return the response | ||
* @throws IOException in case there is a problem sending the request or parsing back the response | ||
*/ | ||
public GetSearchPipelineResponse getPipeline(GetSearchPipelineRequest request, RequestOptions options) throws IOException { | ||
return restHighLevelClient.performRequestAndParseEntity( | ||
request, | ||
SearchPipelinesRequestConverters::getPipeline, | ||
options, | ||
GetSearchPipelineResponse::fromXContent, | ||
Collections.singleton(404) | ||
); | ||
} | ||
|
||
/** | ||
* Asynchronously get an existing pipeline. | ||
* | ||
* @param request the request | ||
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized | ||
* @param listener the listener to be notified upon request completion | ||
* @return cancellable that may be used to cancel the request | ||
*/ | ||
public Cancellable getPipelineAsync( | ||
GetSearchPipelineRequest request, | ||
RequestOptions options, | ||
ActionListener<GetSearchPipelineResponse> listener | ||
) { | ||
return restHighLevelClient.performRequestAsyncAndParseEntity( | ||
request, | ||
SearchPipelinesRequestConverters::getPipeline, | ||
options, | ||
GetSearchPipelineResponse::fromXContent, | ||
listener, | ||
Collections.singleton(404) | ||
); | ||
} | ||
|
||
/** | ||
* Delete an existing pipeline. | ||
* | ||
* @param request the request | ||
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized | ||
* @return the response | ||
* @throws IOException in case there is a problem sending the request or parsing back the response | ||
*/ | ||
public AcknowledgedResponse deletePipeline(DeleteSearchPipelineRequest request, RequestOptions options) throws IOException { | ||
return restHighLevelClient.performRequestAndParseEntity( | ||
request, | ||
SearchPipelinesRequestConverters::deletePipeline, | ||
options, | ||
AcknowledgedResponse::fromXContent, | ||
emptySet() | ||
); | ||
} | ||
|
||
/** | ||
* Asynchronously delete an existing pipeline. | ||
* | ||
* @param request the request | ||
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized | ||
* @param listener the listener to be notified upon request completion | ||
* @return cancellable that may be used to cancel the request | ||
*/ | ||
public Cancellable deletePipelineAsync( | ||
DeleteSearchPipelineRequest request, | ||
RequestOptions options, | ||
ActionListener<AcknowledgedResponse> listener | ||
) { | ||
return restHighLevelClient.performRequestAsyncAndParseEntity( | ||
request, | ||
SearchPipelinesRequestConverters::deletePipeline, | ||
options, | ||
AcknowledgedResponse::fromXContent, | ||
listener, | ||
emptySet() | ||
); | ||
} | ||
|
||
} |
61 changes: 61 additions & 0 deletions
61
...rest-high-level/src/main/java/org/opensearch/client/SearchPipelinesRequestConverters.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.client; | ||
|
||
import org.apache.hc.client5.http.classic.methods.HttpDelete; | ||
import org.apache.hc.client5.http.classic.methods.HttpGet; | ||
import org.apache.hc.client5.http.classic.methods.HttpPut; | ||
import org.opensearch.action.search.DeleteSearchPipelineRequest; | ||
import org.opensearch.action.search.GetSearchPipelineRequest; | ||
import org.opensearch.action.search.PutSearchPipelineRequest; | ||
|
||
import java.io.IOException; | ||
|
||
final class SearchPipelinesRequestConverters { | ||
private SearchPipelinesRequestConverters() {} | ||
|
||
static Request putPipeline(PutSearchPipelineRequest putPipelineRequest) throws IOException { | ||
String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_search/pipeline") | ||
.addPathPart(putPipelineRequest.getId()) | ||
.build(); | ||
Request request = new Request(HttpPut.METHOD_NAME, endpoint); | ||
|
||
RequestConverters.Params params = new RequestConverters.Params(); | ||
params.withTimeout(putPipelineRequest.timeout()); | ||
params.withClusterManagerTimeout(putPipelineRequest.clusterManagerNodeTimeout()); | ||
request.addParameters(params.asMap()); | ||
request.setEntity(RequestConverters.createEntity(putPipelineRequest, RequestConverters.REQUEST_BODY_CONTENT_TYPE)); | ||
return request; | ||
} | ||
|
||
static Request deletePipeline(DeleteSearchPipelineRequest deletePipelineRequest) { | ||
String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_search/pipeline") | ||
.addPathPart(deletePipelineRequest.getId()) | ||
.build(); | ||
Request request = new Request(HttpDelete.METHOD_NAME, endpoint); | ||
|
||
RequestConverters.Params parameters = new RequestConverters.Params(); | ||
parameters.withTimeout(deletePipelineRequest.timeout()); | ||
parameters.withClusterManagerTimeout(deletePipelineRequest.clusterManagerNodeTimeout()); | ||
request.addParameters(parameters.asMap()); | ||
return request; | ||
} | ||
|
||
static Request getPipeline(GetSearchPipelineRequest getPipelineRequest) { | ||
String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_search/pipeline") | ||
.addCommaSeparatedPathParts(getPipelineRequest.getIds()) | ||
.build(); | ||
Request request = new Request(HttpGet.METHOD_NAME, endpoint); | ||
|
||
RequestConverters.Params parameters = new RequestConverters.Params(); | ||
parameters.withClusterManagerTimeout(getPipelineRequest.clusterManagerNodeTimeout()); | ||
request.addParameters(parameters.asMap()); | ||
return request; | ||
} | ||
} |
116 changes: 116 additions & 0 deletions
116
client/rest-high-level/src/test/java/org/opensearch/client/SearchPipelinesClientIT.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.client; | ||
|
||
import org.opensearch.action.search.DeleteSearchPipelineRequest; | ||
import org.opensearch.action.search.GetSearchPipelineRequest; | ||
import org.opensearch.action.search.GetSearchPipelineResponse; | ||
import org.opensearch.action.search.PutSearchPipelineRequest; | ||
import org.opensearch.action.support.master.AcknowledgedResponse; | ||
import org.opensearch.common.bytes.BytesReference; | ||
import org.opensearch.common.xcontent.XContentType; | ||
import org.opensearch.core.xcontent.XContentBuilder; | ||
import org.opensearch.search.pipeline.Pipeline; | ||
|
||
import java.io.IOException; | ||
|
||
public class SearchPipelinesClientIT extends OpenSearchRestHighLevelClientTestCase { | ||
|
||
public void testPutPipeline() throws IOException { | ||
String id = "some_pipeline_id"; | ||
XContentBuilder pipelineBuilder = buildSearchPipeline(); | ||
PutSearchPipelineRequest request = new PutSearchPipelineRequest( | ||
id, | ||
BytesReference.bytes(pipelineBuilder), | ||
pipelineBuilder.contentType() | ||
); | ||
createPipeline(request); | ||
} | ||
|
||
private static void createPipeline(PutSearchPipelineRequest request) throws IOException { | ||
AcknowledgedResponse response = execute( | ||
request, | ||
highLevelClient().searchPipelines()::putPipeline, | ||
highLevelClient().searchPipelines()::putPipelineAsync | ||
); | ||
assertTrue(response.isAcknowledged()); | ||
} | ||
|
||
public void testGetPipeline() throws IOException { | ||
String id = "some_pipeline_id"; | ||
XContentBuilder pipelineBuilder = buildSearchPipeline(); | ||
PutSearchPipelineRequest request = new PutSearchPipelineRequest( | ||
id, | ||
BytesReference.bytes(pipelineBuilder), | ||
pipelineBuilder.contentType() | ||
); | ||
createPipeline(request); | ||
|
||
GetSearchPipelineRequest getRequest = new GetSearchPipelineRequest(id); | ||
GetSearchPipelineResponse response = execute( | ||
getRequest, | ||
highLevelClient().searchPipelines()::getPipeline, | ||
highLevelClient().searchPipelines()::getPipelineAsync | ||
); | ||
assertTrue(response.isFound()); | ||
assertEquals(1, response.pipelines().size()); | ||
assertEquals(id, response.pipelines().get(0).getId()); | ||
} | ||
|
||
public void testDeletePipeline() throws IOException { | ||
String id = "some_pipeline_id"; | ||
XContentBuilder pipelineBuilder = buildSearchPipeline(); | ||
PutSearchPipelineRequest request = new PutSearchPipelineRequest( | ||
id, | ||
BytesReference.bytes(pipelineBuilder), | ||
pipelineBuilder.contentType() | ||
); | ||
createPipeline(request); | ||
|
||
DeleteSearchPipelineRequest deleteRequest = new DeleteSearchPipelineRequest(id); | ||
AcknowledgedResponse response = execute( | ||
deleteRequest, | ||
highLevelClient().searchPipelines()::deletePipeline, | ||
highLevelClient().searchPipelines()::deletePipelineAsync | ||
); | ||
assertTrue(response.isAcknowledged()); | ||
} | ||
|
||
private static XContentBuilder buildSearchPipeline() throws IOException { | ||
XContentType xContentType = randomFrom(XContentType.values()); | ||
XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent()); | ||
return buildSearchPipeline(pipelineBuilder); | ||
} | ||
|
||
private static XContentBuilder buildSearchPipeline(XContentBuilder builder) throws IOException { | ||
builder.startObject(); | ||
{ | ||
builder.field("description", "a pipeline description"); | ||
builder.startArray(Pipeline.REQUEST_PROCESSORS_KEY); | ||
{ | ||
builder.startObject().startObject("filter_query"); | ||
{ | ||
builder.startObject("query"); | ||
{ | ||
builder.startObject("term"); | ||
{ | ||
builder.field("field", "value"); | ||
} | ||
builder.endObject(); | ||
} | ||
builder.endObject(); | ||
} | ||
builder.endObject().endObject(); | ||
} | ||
builder.endArray(); | ||
} | ||
builder.endObject(); | ||
return builder; | ||
} | ||
} |
Oops, something went wrong.