Skip to content

Commit

Permalink
Add UpdateRequest support to High Level Rest client (#23266)
Browse files Browse the repository at this point in the history
This commit adds support for UpdateRequest to the High Level Rest client
  • Loading branch information
tlrx authored Feb 22, 2017
1 parent eeaa0cc commit 09734e7
Show file tree
Hide file tree
Showing 5 changed files with 480 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,16 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
Expand Down Expand Up @@ -118,6 +122,48 @@ static Request index(IndexRequest indexRequest) {
return new Request(method, endpoint, parameters.getParams(), entity);
}

static Request update(UpdateRequest updateRequest) throws IOException {
String endpoint = endpoint(updateRequest.index(), updateRequest.type(), updateRequest.id(), "_update");

Params parameters = Params.builder();
parameters.withRouting(updateRequest.routing());
parameters.withParent(updateRequest.parent());
parameters.withTimeout(updateRequest.timeout());
parameters.withRefreshPolicy(updateRequest.getRefreshPolicy());
parameters.withWaitForActiveShards(updateRequest.waitForActiveShards());
parameters.withDocAsUpsert(updateRequest.docAsUpsert());
parameters.withFetchSourceContext(updateRequest.fetchSource());
parameters.withRetryOnConflict(updateRequest.retryOnConflict());
parameters.withVersion(updateRequest.version());
parameters.withVersionType(updateRequest.versionType());

// The Java API allows update requests with different content types
// set for the partial document and the upsert document. This client
// only accepts update requests that have the same content types set
// for both doc and upsert.
XContentType xContentType = null;
if (updateRequest.doc() != null) {
xContentType = updateRequest.doc().getContentType();
}
if (updateRequest.upsertRequest() != null) {
XContentType upsertContentType = updateRequest.upsertRequest().getContentType();
if ((xContentType != null) && (xContentType != upsertContentType)) {
throw new IllegalStateException("Update request cannot have different content types for doc [" + xContentType + "]" +
" and upsert [" + upsertContentType + "] documents");
} else {
xContentType = upsertContentType;
}
}
if (xContentType == null) {
xContentType = Requests.INDEX_CONTENT_TYPE;
}

BytesRef source = XContentHelper.toXContent(updateRequest, xContentType, false).toBytesRef();
HttpEntity entity = new ByteArrayEntity(source.bytes, source.offset, source.length, ContentType.create(xContentType.mediaType()));

return new Request(HttpPost.METHOD_NAME, endpoint, parameters.getParams(), entity);
}

/**
* Utility method to build request's endpoint.
*/
Expand Down Expand Up @@ -160,6 +206,13 @@ Params putParam(String key, TimeValue value) {
return this;
}

Params withDocAsUpsert(boolean docAsUpsert) {
if (docAsUpsert) {
return putParam("doc_as_upsert", Boolean.TRUE.toString());
}
return this;
}

Params withFetchSourceContext(FetchSourceContext fetchSourceContext) {
if (fetchSourceContext != null) {
if (fetchSourceContext.fetchSource() == false) {
Expand Down Expand Up @@ -203,7 +256,14 @@ Params withRefresh(boolean refresh) {

Params withRefreshPolicy(WriteRequest.RefreshPolicy refreshPolicy) {
if (refreshPolicy != WriteRequest.RefreshPolicy.NONE) {
putParam("refresh", refreshPolicy.getValue());
return putParam("refresh", refreshPolicy.getValue());
}
return this;
}

Params withRetryOnConflict(int retryOnConflict) {
if (retryOnConflict > 0) {
return putParam("retry_on_conflict", String.valueOf(retryOnConflict));
}
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.main.MainRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
Expand All @@ -39,10 +41,8 @@
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;

import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
Expand Down Expand Up @@ -121,14 +121,35 @@ public void indexAsync(IndexRequest indexRequest, ActionListener<IndexResponse>
performRequestAsyncAndParseEntity(indexRequest, Request::index, IndexResponse::fromXContent, listener, emptySet(), headers);
}

private <Req extends ActionRequest, Resp> Resp performRequestAndParseEntity(Req request, Function<Req, Request> requestConverter,
CheckedFunction<XContentParser, Resp, IOException> entityParser, Set<Integer> ignores, Header... headers) throws IOException {
return performRequest(request, requestConverter, (response) -> parseEntity(response.getEntity(), entityParser), ignores, headers);
/**
* Updates a document using the Update API
* <p>
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html">Update API on elastic.co</a>
*/
public UpdateResponse update(UpdateRequest updateRequest, Header... headers) throws IOException {
return performRequestAndParseEntity(updateRequest, Request::update, UpdateResponse::fromXContent, emptySet(), headers);
}

/**
* Asynchronously updates a document using the Update API
* <p>
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html">Update API on elastic.co</a>
*/
public void updateAsync(UpdateRequest updateRequest, ActionListener<UpdateResponse> listener, Header... headers) {
performRequestAsyncAndParseEntity(updateRequest, Request::update, UpdateResponse::fromXContent, listener, emptySet(), headers);
}

<Req extends ActionRequest, Resp> Resp performRequest(Req request, Function<Req, Request> requestConverter,
CheckedFunction<Response, Resp, IOException> responseConverter, Set<Integer> ignores, Header... headers) throws IOException {
private <Req extends ActionRequest, Resp> Resp performRequestAndParseEntity(Req request,
CheckedFunction<Req, Request, IOException> requestConverter,
CheckedFunction<XContentParser, Resp, IOException> entityParser,
Set<Integer> ignores, Header... headers) throws IOException {
return performRequest(request, requestConverter, (response) -> parseEntity(response.getEntity(), entityParser), ignores, headers);
}

<Req extends ActionRequest, Resp> Resp performRequest(Req request,
CheckedFunction<Req, Request, IOException> requestConverter,
CheckedFunction<Response, Resp, IOException> responseConverter,
Set<Integer> ignores, Header... headers) throws IOException {
ActionRequestValidationException validationException = request.validate();
if (validationException != null) {
throw validationException;
Expand All @@ -154,22 +175,31 @@ <Req extends ActionRequest, Resp> Resp performRequest(Req request, Function<Req,
}
}

private <Req extends ActionRequest, Resp> void performRequestAsyncAndParseEntity(Req request, Function<Req, Request> requestConverter,
CheckedFunction<XContentParser, Resp, IOException> entityParser, ActionListener<Resp> listener,
Set<Integer> ignores, Header... headers) {
private <Req extends ActionRequest, Resp> void performRequestAsyncAndParseEntity(Req request,
CheckedFunction<Req, Request, IOException> requestConverter,
CheckedFunction<XContentParser, Resp, IOException> entityParser,
ActionListener<Resp> listener, Set<Integer> ignores, Header... headers) {
performRequestAsync(request, requestConverter, (response) -> parseEntity(response.getEntity(), entityParser),
listener, ignores, headers);
}

<Req extends ActionRequest, Resp> void performRequestAsync(Req request, Function<Req, Request> requestConverter,
CheckedFunction<Response, Resp, IOException> responseConverter, ActionListener<Resp> listener,
Set<Integer> ignores, Header... headers) {
<Req extends ActionRequest, Resp> void performRequestAsync(Req request,
CheckedFunction<Req, Request, IOException> requestConverter,
CheckedFunction<Response, Resp, IOException> responseConverter,
ActionListener<Resp> listener, Set<Integer> ignores, Header... headers) {
ActionRequestValidationException validationException = request.validate();
if (validationException != null) {
listener.onFailure(validationException);
return;
}
Request req = requestConverter.apply(request);
Request req;
try {
req = requestConverter.apply(request);
} catch (Exception e) {
listener.onFailure(e);
return;
}

ResponseListener responseListener = wrapResponseListener(responseConverter, listener, ignores);
client.performRequestAsync(req.method, req.endpoint, req.params, req.entity, responseListener, headers);
}
Expand Down
Loading

0 comments on commit 09734e7

Please sign in to comment.