diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java index 3acb8d8297306..f1d37691a796c 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java @@ -474,9 +474,18 @@ static Request rankEval(RankEvalRequest rankEvalRequest) throws IOException { } static Request reindex(ReindexRequest reindexRequest) throws IOException { + return prepareReindexRequest(reindexRequest, true); + } + + static Request submitReindex(ReindexRequest reindexRequest) throws IOException { + return prepareReindexRequest(reindexRequest, false); + } + + private static Request prepareReindexRequest(ReindexRequest reindexRequest, boolean waitForCompletion) throws IOException { String endpoint = new EndpointBuilder().addPathPart("_reindex").build(); Request request = new Request(HttpPost.METHOD_NAME, endpoint); Params params = new Params(request) + .withWaitForCompletion(waitForCompletion) .withRefresh(reindexRequest.isRefresh()) .withTimeout(reindexRequest.getTimeout()) .withWaitForActiveShards(reindexRequest.getWaitForActiveShards()) @@ -885,11 +894,8 @@ Params withDetailed(boolean detailed) { return this; } - Params withWaitForCompletion(boolean waitForCompletion) { - if (waitForCompletion) { - return putParam("wait_for_completion", Boolean.TRUE.toString()); - } - return this; + Params withWaitForCompletion(Boolean waitForCompletion) { + return putParam("wait_for_completion", waitForCompletion.toString()); } Params withNodes(String[] nodes) { diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java index bd1adf634a518..7f0d7644d675b 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java @@ -58,6 +58,7 @@ import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.core.TermVectorsResponse; import org.elasticsearch.client.core.TermVectorsRequest; +import org.elasticsearch.client.reindex.ReindexSubmissionResponse; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.ParseField; @@ -447,6 +448,20 @@ public final BulkByScrollResponse reindex(ReindexRequest reindexRequest, Request ); } + /** + * Submits a reindex task. + * See Reindex API on elastic.co + * @param reindexRequest the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return the submission response + * @throws IOException in case there is a problem sending the request or parsing back the response + */ + public final ReindexSubmissionResponse submitReindexTask(ReindexRequest reindexRequest, RequestOptions options) throws IOException { + return performRequestAndParseEntity( + reindexRequest, RequestConverters::submitReindex, options, ReindexSubmissionResponse::fromXContent, emptySet() + ); + } + /** * Asynchronously executes a reindex request. * See Reindex API on elastic.co diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/reindex/ReindexSubmissionResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/reindex/ReindexSubmissionResponse.java new file mode 100644 index 0000000000000..522e48f3c2e6b --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/reindex/ReindexSubmissionResponse.java @@ -0,0 +1,89 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client.reindex; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.tasks.TaskId; + +import java.io.IOException; +import java.util.Objects; + +public class ReindexSubmissionResponse extends ActionResponse implements ToXContentObject { + private static final ParseField TASK = new ParseField("task"); + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "reindex_submission_response", + true, a -> new ReindexSubmissionResponse((TaskId) a[0])); + + static { + PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), TaskId.parser(), TASK, ObjectParser.ValueType.STRING); + } + + public static ReindexSubmissionResponse fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + private final TaskId task; + + ReindexSubmissionResponse(@Nullable TaskId task) { + this.task = task; + } + + /** + * Get the task id + * + * @return the id of the reindex task. + */ + public TaskId getTask() { + return task; + } + + @Override + public int hashCode() { + return Objects.hash(task); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other == null || getClass() != other.getClass()) { + return false; + } + ReindexSubmissionResponse that = (ReindexSubmissionResponse) other; + return Objects.equals(task, that.task); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + if (task != null) { + builder.field(TASK.getPreferredName(), task.toString()); + } + builder.endObject(); + return builder; + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java new file mode 100644 index 0000000000000..87b5c03a319a8 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java @@ -0,0 +1,134 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client; + +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.client.reindex.ReindexSubmissionResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.query.IdsQueryBuilder; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.ReindexRequest; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.tasks.TaskId; + +import java.io.IOException; +import java.util.Collections; +import java.util.function.BooleanSupplier; + +public class ReindexIT extends ESRestHighLevelClientTestCase { + + //TODO taken from CrudIT - should these scenarios be moved here? + public void testReindex() throws IOException { + final String sourceIndex = "source1"; + final String destinationIndex = "dest"; + { + // Prepare + Settings settings = Settings.builder() + .put("number_of_shards", 1) + .put("number_of_replicas", 0) + .build(); + createIndex(sourceIndex, settings); + createIndex(destinationIndex, settings); + BulkRequest bulkRequest = new BulkRequest() + .add(new IndexRequest(sourceIndex, "type", "1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON)) + .add(new IndexRequest(sourceIndex, "type", "2").source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + assertEquals( + RestStatus.OK, + highLevelClient().bulk( + bulkRequest, + RequestOptions.DEFAULT + ).status() + ); + } + { + // test1: create one doc in dest + ReindexRequest reindexRequest = new ReindexRequest(); + reindexRequest.setSourceIndices(sourceIndex); + reindexRequest.setDestIndex(destinationIndex); + reindexRequest.setSourceQuery(new IdsQueryBuilder().addIds("1").types("type")); + reindexRequest.setRefresh(true); + BulkByScrollResponse bulkResponse = execute(reindexRequest, highLevelClient()::reindex, highLevelClient()::reindexAsync); + assertEquals(1, bulkResponse.getCreated()); + assertEquals(1, bulkResponse.getTotal()); + assertEquals(0, bulkResponse.getDeleted()); + assertEquals(0, bulkResponse.getNoops()); + assertEquals(0, bulkResponse.getVersionConflicts()); + assertEquals(1, bulkResponse.getBatches()); + assertTrue(bulkResponse.getTook().getMillis() > 0); + assertEquals(1, bulkResponse.getBatches()); + assertEquals(0, bulkResponse.getBulkFailures().size()); + assertEquals(0, bulkResponse.getSearchFailures().size()); + } + } + + public void testReindexTask() throws IOException, InterruptedException { + final String sourceIndex = "source123"; + final String destinationIndex = "dest2"; + { + // Prepare + Settings settings = Settings.builder() + .put("number_of_shards", 1) + .put("number_of_replicas", 0) + .build(); + createIndex(sourceIndex, settings); + createIndex(destinationIndex, settings); + BulkRequest bulkRequest = new BulkRequest() + .add(new IndexRequest(sourceIndex, "type", "1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON)) + .add(new IndexRequest(sourceIndex, "type", "2").source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + assertEquals( + RestStatus.OK, + highLevelClient().bulk( + bulkRequest, + RequestOptions.DEFAULT + ).status() + ); + } + { + // test1: create one doc in dest + ReindexRequest reindexRequest = new ReindexRequest(); + reindexRequest.setSourceIndices(sourceIndex); + reindexRequest.setDestIndex(destinationIndex); + reindexRequest.setSourceQuery(new IdsQueryBuilder().addIds("1").types("type")); + reindexRequest.setRefresh(true); + + ReindexSubmissionResponse reindexSubmission = highLevelClient().submitReindexTask(reindexRequest, RequestOptions.DEFAULT); + + BooleanSupplier hasUpgradeCompleted = checkCompletionStatus(reindexSubmission.getTask()); + awaitBusy(hasUpgradeCompleted); + } + } + + private BooleanSupplier checkCompletionStatus(TaskId taskId) { + return () -> { + try { + Response response = client().performRequest(new Request("GET", "/_tasks/" + taskId.toString())); + return (boolean) entityAsMap(response).get("completed"); + } catch (IOException e) { + fail(e.getMessage()); + return false; + } + }; + } +}