Skip to content

Commit

Permalink
HLRC: reindex API with wait_for_completion false
Browse files Browse the repository at this point in the history
Extend High Level Rest Client Reindex API to support requests with
wait_for_completion=false. This method will return a TaskID and results
can be queried with Task API

refers: elastic#27205
  • Loading branch information
pgomulka committed Nov 2, 2018
1 parent 70e939e commit ec3c93d
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -474,9 +474,18 @@ static Request rankEval(RankEvalRequest rankEvalRequest) throws IOException {
}

static Request reindex(ReindexRequest reindexRequest) throws IOException {
return prepareReindexRequest(reindexRequest, true);
}

static Request submitReindex(ReindexRequest reindexRequest) throws IOException {
return prepareReindexRequest(reindexRequest, false);
}

private static Request prepareReindexRequest(ReindexRequest reindexRequest, boolean waitForCompletion) throws IOException {
String endpoint = new EndpointBuilder().addPathPart("_reindex").build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
Params params = new Params(request)
.withWaitForCompletion(waitForCompletion)
.withRefresh(reindexRequest.isRefresh())
.withTimeout(reindexRequest.getTimeout())
.withWaitForActiveShards(reindexRequest.getWaitForActiveShards())
Expand Down Expand Up @@ -885,11 +894,8 @@ Params withDetailed(boolean detailed) {
return this;
}

Params withWaitForCompletion(boolean waitForCompletion) {
if (waitForCompletion) {
return putParam("wait_for_completion", Boolean.TRUE.toString());
}
return this;
Params withWaitForCompletion(Boolean waitForCompletion) {
return putParam("wait_for_completion", waitForCompletion.toString());
}

Params withNodes(String[] nodes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.core.TermVectorsResponse;
import org.elasticsearch.client.core.TermVectorsRequest;
import org.elasticsearch.client.reindex.ReindexSubmissionResponse;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.ParseField;
Expand Down Expand Up @@ -447,6 +448,20 @@ public final BulkByScrollResponse reindex(ReindexRequest reindexRequest, Request
);
}

/**
* Submits a reindex task.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html">Reindex API on elastic.co</a>
* @param reindexRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the submission response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public final ReindexSubmissionResponse submitReindexTask(ReindexRequest reindexRequest, RequestOptions options) throws IOException {
return performRequestAndParseEntity(
reindexRequest, RequestConverters::submitReindex, options, ReindexSubmissionResponse::fromXContent, emptySet()
);
}

/**
* Asynchronously executes a reindex request.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html">Reindex API on elastic.co</a>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.reindex;

import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;
import java.util.Objects;

public class ReindexSubmissionResponse extends ActionResponse implements ToXContentObject {
private static final ParseField TASK = new ParseField("task");
public static final ConstructingObjectParser<ReindexSubmissionResponse, Void> PARSER = new ConstructingObjectParser<>(
"reindex_submission_response",
true, a -> new ReindexSubmissionResponse((TaskId) a[0]));

static {
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), TaskId.parser(), TASK, ObjectParser.ValueType.STRING);
}

public static ReindexSubmissionResponse fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}

private final TaskId task;

ReindexSubmissionResponse(@Nullable TaskId task) {
this.task = task;
}

/**
* Get the task id
*
* @return the id of the reindex task.
*/
public TaskId getTask() {
return task;
}

@Override
public int hashCode() {
return Objects.hash(task);
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
ReindexSubmissionResponse that = (ReindexSubmissionResponse) other;
return Objects.equals(task, that.task);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (task != null) {
builder.field(TASK.getPreferredName(), task.toString());
}
builder.endObject();
return builder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.client;

import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.reindex.ReindexSubmissionResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.IdsQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;
import java.util.Collections;
import java.util.function.BooleanSupplier;

public class ReindexIT extends ESRestHighLevelClientTestCase {

//TODO taken from CrudIT - should these scenarios be moved here?
public void testReindex() throws IOException {
final String sourceIndex = "source1";
final String destinationIndex = "dest";
{
// Prepare
Settings settings = Settings.builder()
.put("number_of_shards", 1)
.put("number_of_replicas", 0)
.build();
createIndex(sourceIndex, settings);
createIndex(destinationIndex, settings);
BulkRequest bulkRequest = new BulkRequest()
.add(new IndexRequest(sourceIndex, "type", "1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
.add(new IndexRequest(sourceIndex, "type", "2").source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
assertEquals(
RestStatus.OK,
highLevelClient().bulk(
bulkRequest,
RequestOptions.DEFAULT
).status()
);
}
{
// test1: create one doc in dest
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(sourceIndex);
reindexRequest.setDestIndex(destinationIndex);
reindexRequest.setSourceQuery(new IdsQueryBuilder().addIds("1").types("type"));
reindexRequest.setRefresh(true);
BulkByScrollResponse bulkResponse = execute(reindexRequest, highLevelClient()::reindex, highLevelClient()::reindexAsync);
assertEquals(1, bulkResponse.getCreated());
assertEquals(1, bulkResponse.getTotal());
assertEquals(0, bulkResponse.getDeleted());
assertEquals(0, bulkResponse.getNoops());
assertEquals(0, bulkResponse.getVersionConflicts());
assertEquals(1, bulkResponse.getBatches());
assertTrue(bulkResponse.getTook().getMillis() > 0);
assertEquals(1, bulkResponse.getBatches());
assertEquals(0, bulkResponse.getBulkFailures().size());
assertEquals(0, bulkResponse.getSearchFailures().size());
}
}

public void testReindexTask() throws IOException, InterruptedException {
final String sourceIndex = "source123";
final String destinationIndex = "dest2";
{
// Prepare
Settings settings = Settings.builder()
.put("number_of_shards", 1)
.put("number_of_replicas", 0)
.build();
createIndex(sourceIndex, settings);
createIndex(destinationIndex, settings);
BulkRequest bulkRequest = new BulkRequest()
.add(new IndexRequest(sourceIndex, "type", "1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
.add(new IndexRequest(sourceIndex, "type", "2").source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
assertEquals(
RestStatus.OK,
highLevelClient().bulk(
bulkRequest,
RequestOptions.DEFAULT
).status()
);
}
{
// test1: create one doc in dest
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(sourceIndex);
reindexRequest.setDestIndex(destinationIndex);
reindexRequest.setSourceQuery(new IdsQueryBuilder().addIds("1").types("type"));
reindexRequest.setRefresh(true);

ReindexSubmissionResponse reindexSubmission = highLevelClient().submitReindexTask(reindexRequest, RequestOptions.DEFAULT);

BooleanSupplier hasUpgradeCompleted = checkCompletionStatus(reindexSubmission.getTask());
awaitBusy(hasUpgradeCompleted);
}
}

private BooleanSupplier checkCompletionStatus(TaskId taskId) {
return () -> {
try {
Response response = client().performRequest(new Request("GET", "/_tasks/" + taskId.toString()));
return (boolean) entityAsMap(response).get("completed");
} catch (IOException e) {
fail(e.getMessage());
return false;
}
};
}
}

0 comments on commit ec3c93d

Please sign in to comment.