Skip to content

Commit

Permalink
Add HLRC support for enrich execute policy API (#47991)
Browse files Browse the repository at this point in the history
This PR also includes HLRC docs for the enrich stats api.

Relates to #32789
  • Loading branch information
martijnvg committed Oct 14, 2019
1 parent d4901a7 commit 7cc73f6
Show file tree
Hide file tree
Showing 10 changed files with 379 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.core.AcknowledgedResponse;
import org.elasticsearch.client.enrich.DeletePolicyRequest;
import org.elasticsearch.client.enrich.ExecutePolicyRequest;
import org.elasticsearch.client.enrich.ExecutePolicyResponse;
import org.elasticsearch.client.enrich.GetPolicyRequest;
import org.elasticsearch.client.enrich.GetPolicyResponse;
import org.elasticsearch.client.enrich.PutPolicyRequest;
Expand Down Expand Up @@ -224,4 +226,49 @@ public Cancellable statsAsync(StatsRequest request,
Collections.emptySet()
);
}

/**
* Executes the execute policy api, which executes an enrich policy.
*
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/enrich-policy-apis.html#execute-policy">
* the docs</a> for more.
*
* @param request the {@link ExecutePolicyRequest}
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public ExecutePolicyResponse executePolicy(ExecutePolicyRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(
request,
EnrichRequestConverters::executePolicy,
options,
ExecutePolicyResponse::fromXContent,
Collections.emptySet()
);
}

/**
* Asynchronously executes the execute policy api, which executes an enrich policy.
*
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/enrich-policy-apis.html#execute-policy">
* the docs</a> for more.
*
* @param request the {@link ExecutePolicyRequest}
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return cancellable that may be used to cancel the request
*/
public Cancellable executePolicyAsync(ExecutePolicyRequest request,
RequestOptions options,
ActionListener<ExecutePolicyResponse> listener) {
return restHighLevelClient.performRequestAsyncAndParseEntity(
request,
EnrichRequestConverters::executePolicy,
options,
ExecutePolicyResponse::fromXContent,
listener,
Collections.emptySet()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.elasticsearch.client.enrich.DeletePolicyRequest;
import org.elasticsearch.client.enrich.ExecutePolicyRequest;
import org.elasticsearch.client.enrich.GetPolicyRequest;
import org.elasticsearch.client.enrich.PutPolicyRequest;
import org.elasticsearch.client.enrich.StatsRequest;
Expand Down Expand Up @@ -66,4 +68,17 @@ static Request stats(StatsRequest statsRequest) {
return new Request(HttpGet.METHOD_NAME, endpoint);
}

static Request executePolicy(ExecutePolicyRequest executePolicyRequest) {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_enrich", "policy")
.addPathPart(executePolicyRequest.getName())
.addPathPartAsIs("_execute")
.build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
if (executePolicyRequest.getWaitForCompletion() != null) {
request.addParameter("wait_for_completion", executePolicyRequest.getWaitForCompletion().toString());
}
return request;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.enrich;

import org.elasticsearch.client.Validatable;

public final class ExecutePolicyRequest implements Validatable {

private final String name;
private Boolean waitForCompletion;

public ExecutePolicyRequest(String name) {
this.name = name;
}

public String getName() {
return name;
}

public Boolean getWaitForCompletion() {
return waitForCompletion;
}

public void setWaitForCompletion(boolean waitForCompletion) {
this.waitForCompletion = waitForCompletion;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.enrich;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;

public final class ExecutePolicyResponse {

private static final ParseField TASK_FIELD = new ParseField("task");
private static final ParseField STATUS_FIELD = new ParseField("status");

private static final ConstructingObjectParser<ExecutePolicyResponse, Void> PARSER = new ConstructingObjectParser<>(
"execute_policy_response",
true,
args -> new ExecutePolicyResponse((String) args[0], (ExecutionStatus) args[1])
);

static {
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), TASK_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), ExecutionStatus.PARSER, STATUS_FIELD);
}

public static ExecutePolicyResponse fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}

private final String taskId;
private final ExecutionStatus executionStatus;

ExecutePolicyResponse(String taskId, ExecutionStatus executionStatus) {
this.taskId = taskId;
this.executionStatus = executionStatus;
}

public String getTaskId() {
return taskId;
}

public ExecutionStatus getExecutionStatus() {
return executionStatus;
}

public static final class ExecutionStatus {

private static final ParseField PHASE_FIELD = new ParseField("phase");

private static final ConstructingObjectParser<ExecutionStatus, Void> PARSER = new ConstructingObjectParser<>(
"execution_status",
true,
args -> new ExecutionStatus((String) args[0])
);

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), PHASE_FIELD);
}

private final String phase;

ExecutionStatus(String phase) {
this.phase = phase;
}

public String getPhase() {
return phase;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@

import org.elasticsearch.client.core.AcknowledgedResponse;
import org.elasticsearch.client.enrich.DeletePolicyRequest;
import org.elasticsearch.client.enrich.ExecutePolicyRequest;
import org.elasticsearch.client.enrich.ExecutePolicyResponse;
import org.elasticsearch.client.enrich.GetPolicyRequest;
import org.elasticsearch.client.enrich.GetPolicyResponse;
import org.elasticsearch.client.enrich.PutPolicyRequest;
import org.elasticsearch.client.enrich.StatsRequest;
import org.elasticsearch.client.enrich.StatsResponse;
import org.elasticsearch.client.indices.CreateIndexRequest;

import java.util.Collections;

Expand All @@ -36,6 +39,11 @@
public class EnrichIT extends ESRestHighLevelClientTestCase {

public void testCRUD() throws Exception {
CreateIndexRequest createIndexRequest = new CreateIndexRequest("my-index")
.mapping(Collections.singletonMap("properties", Collections.singletonMap("enrich_key",
Collections.singletonMap("type", "keyword"))));
highLevelClient().indices().create(createIndexRequest, RequestOptions.DEFAULT);

final EnrichClient enrichClient = highLevelClient().enrich();
PutPolicyRequest putPolicyRequest = new PutPolicyRequest("my-policy", "match",
Collections.singletonList("my-index"), "enrich_key", Collections.singletonList("enrich_value"));
Expand All @@ -60,6 +68,11 @@ public void testCRUD() throws Exception {
assertThat(statsResponse.getCoordinatorStats().get(0).getRemoteRequestsTotal(), greaterThanOrEqualTo(0L));
assertThat(statsResponse.getCoordinatorStats().get(0).getExecutedSearchesTotal(), greaterThanOrEqualTo(0L));

ExecutePolicyRequest executePolicyRequest = new ExecutePolicyRequest("my-policy");
ExecutePolicyResponse executePolicyResponse =
execute(executePolicyRequest, enrichClient::executePolicy, enrichClient::executePolicyAsync);
assertThat(executePolicyResponse.getExecutionStatus().getPhase(), equalTo("COMPLETE"));

DeletePolicyRequest deletePolicyRequest = new DeletePolicyRequest("my-policy");
AcknowledgedResponse deletePolicyResponse =
execute(deletePolicyRequest, enrichClient::deletePolicy, enrichClient::deletePolicyAsync);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.elasticsearch.client.enrich.DeletePolicyRequest;
import org.elasticsearch.client.enrich.ExecutePolicyRequest;
import org.elasticsearch.client.enrich.GetPolicyRequest;
import org.elasticsearch.client.enrich.PutPolicyRequest;
import org.elasticsearch.client.enrich.PutPolicyRequestTests;
Expand Down Expand Up @@ -89,4 +91,24 @@ public void testStats() {
assertThat(result.getEntity(), nullValue());
}

public void testExecutePolicy() {
ExecutePolicyRequest request = new ExecutePolicyRequest(randomAlphaOfLength(4));
Request result = EnrichRequestConverters.executePolicy(request);

assertThat(result.getMethod(), equalTo(HttpPost.METHOD_NAME));
assertThat(result.getEndpoint(), equalTo("/_enrich/policy/" + request.getName() + "/_execute"));
assertThat(result.getParameters().size(), equalTo(0));
assertThat(result.getEntity(), nullValue());

request = new ExecutePolicyRequest(randomAlphaOfLength(4));
request.setWaitForCompletion(randomBoolean());
result = EnrichRequestConverters.executePolicy(request);

assertThat(result.getMethod(), equalTo(HttpPost.METHOD_NAME));
assertThat(result.getEndpoint(), equalTo("/_enrich/policy/" + request.getName() + "/_execute"));
assertThat(result.getParameters().size(), equalTo(1));
assertThat(result.getParameters().get("wait_for_completion"), equalTo(request.getWaitForCompletion().toString()));
assertThat(result.getEntity(), nullValue());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.AcknowledgedResponse;
import org.elasticsearch.client.enrich.DeletePolicyRequest;
import org.elasticsearch.client.enrich.ExecutePolicyRequest;
import org.elasticsearch.client.enrich.ExecutePolicyResponse;
import org.elasticsearch.client.enrich.NamedPolicy;
import org.elasticsearch.client.enrich.GetPolicyRequest;
import org.elasticsearch.client.enrich.GetPolicyResponse;
Expand All @@ -33,6 +35,7 @@
import org.elasticsearch.client.enrich.StatsResponse;
import org.elasticsearch.client.enrich.StatsResponse.CoordinatorStats;
import org.elasticsearch.client.enrich.StatsResponse.ExecutingPolicy;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.junit.After;

import java.util.Arrays;
Expand Down Expand Up @@ -156,7 +159,7 @@ public void testGetPolicy() throws Exception {
RestHighLevelClient client = highLevelClient();

PutPolicyRequest putPolicyRequest = new PutPolicyRequest(
"users-policy", "exact_match", Collections.singletonList("users"),
"users-policy", "match", Collections.singletonList("users"),
"email", Arrays.asList("address", "zip", "city", "state"));
client.enrich().putPolicy(putPolicyRequest, RequestOptions.DEFAULT);

Expand Down Expand Up @@ -251,4 +254,61 @@ public void onFailure(Exception e) {
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}

public void testExecutePolicy() throws Exception {
RestHighLevelClient client = highLevelClient();

{
CreateIndexRequest createIndexRequest = new CreateIndexRequest("users")
.mapping(Collections.singletonMap("properties", Collections.singletonMap("email",
Collections.singletonMap("type", "keyword"))));
client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
PutPolicyRequest putPolicyRequest = new PutPolicyRequest(
"users-policy", "match", Collections.singletonList("users"),
"email", Arrays.asList("address", "zip", "city", "state"));
client.enrich().putPolicy(putPolicyRequest, RequestOptions.DEFAULT);
}

// tag::enrich-execute-policy-request
ExecutePolicyRequest request =
new ExecutePolicyRequest("users-policy");
// end::enrich-execute-policy-request

// tag::enrich-execute-policy-execute
ExecutePolicyResponse response =
client.enrich().executePolicy(request, RequestOptions.DEFAULT);
// end::enrich-execute-policy-execute

// tag::enrich-execute-policy-response
ExecutePolicyResponse.ExecutionStatus status =
response.getExecutionStatus();
// end::enrich-execute-policy-response

// tag::enrich-execute-policy-execute-listener
ActionListener<ExecutePolicyResponse> listener =
new ActionListener<ExecutePolicyResponse>() {
@Override
public void onResponse(ExecutePolicyResponse response) { // <1>
ExecutePolicyResponse.ExecutionStatus status =
response.getExecutionStatus();
}

@Override
public void onFailure(Exception e) {
// <2>
}
};
// end::enrich-execute-policy-execute-listener

// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
listener = new LatchedActionListener<>(listener, latch);

// tag::enrich-execute-policy-execute-async
client.enrich().executePolicyAsync(request, RequestOptions.DEFAULT,
listener); // <1>
// end::enrich-execute-policy-execute-async

assertTrue(latch.await(30L, TimeUnit.SECONDS));
}

}
Loading

0 comments on commit 7cc73f6

Please sign in to comment.