Skip to content

Commit

Permalink
Add UpdateRequest support to High Level Rest client
Browse files Browse the repository at this point in the history
  • Loading branch information
tlrx committed Feb 21, 2017
1 parent 3a0fc52 commit e101f40
Show file tree
Hide file tree
Showing 4 changed files with 449 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,27 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.StringJoiner;

import static org.elasticsearch.common.xcontent.XContentHelper.createParser;

final class Request {

private static final String DELIMITER = "/";
Expand Down Expand Up @@ -118,6 +127,74 @@ static Request index(IndexRequest indexRequest) {
return new Request(method, endpoint, parameters.getParams(), entity);
}

static Request update(UpdateRequest updateRequest) {
String endpoint = endpoint(updateRequest.index(), updateRequest.type(), updateRequest.id(), "_update");

Params parameters = Params.builder();
parameters.withRouting(updateRequest.routing());
parameters.withParent(updateRequest.parent());
parameters.withTimeout(updateRequest.timeout());
parameters.withRefreshPolicy(updateRequest.getRefreshPolicy());
parameters.withWaitForActiveShards(updateRequest.waitForActiveShards());
parameters.withDocAsUpsert(updateRequest.docAsUpsert());
parameters.withFetchSourceContext(updateRequest.fetchSource());
parameters.withRetryOnConflict(updateRequest.retryOnConflict());
parameters.withVersion(updateRequest.version());
parameters.withVersionType(updateRequest.versionType());

XContentType xContentType = null;
if (updateRequest.doc() != null) {
xContentType = updateRequest.doc().getContentType();
} else if (updateRequest.upsertRequest() != null) {
xContentType = updateRequest.upsertRequest().getContentType();
} else {
xContentType = Requests.INDEX_CONTENT_TYPE;
}

return new Request(HttpPost.METHOD_NAME, endpoint, parameters.getParams(), toHttpEntity(updateRequest, xContentType));
}

static ByteArrayEntity toHttpEntity(UpdateRequest updateRequest, XContentType xContentType) {
try (XContentBuilder builder = XContentBuilder.builder(xContentType.xContent())) {
builder.startObject();
if (updateRequest.docAsUpsert()) {
builder.field("doc_as_upsert", updateRequest.docAsUpsert());
}
IndexRequest doc = updateRequest.doc();
if (doc != null) {
try (XContentParser parser = createParser(NamedXContentRegistry.EMPTY, doc.source(), doc.getContentType())) {
builder.field("doc").copyCurrentStructure(parser);
}
}
Script script = updateRequest.script();
if (script != null) {
builder.field("script", script);
}
IndexRequest upsert = updateRequest.upsertRequest();
if (upsert != null) {
try (XContentParser parser = createParser(NamedXContentRegistry.EMPTY, upsert.source(), upsert.getContentType())) {
builder.field("upsert").copyCurrentStructure(parser);
}
}
if (updateRequest.scriptedUpsert()) {
builder.field("scripted_upsert", updateRequest.scriptedUpsert());
}
if (updateRequest.detectNoop() == false) {
builder.field("detect_noop", updateRequest.detectNoop());
}
if (updateRequest.fetchSource() != null) {
builder.field("_source", updateRequest.fetchSource());
}
builder.endObject();

BytesRef requestBody = builder.bytes().toBytesRef();
ContentType contentType = ContentType.create(xContentType.mediaType());
return new ByteArrayEntity(requestBody.bytes, requestBody.offset, requestBody.length, contentType);
} catch (IOException e) {
throw new IllegalStateException("Failed to build HTTP entity from update request", e);
}
}

/**
* Utility method to build request's endpoint.
*/
Expand Down Expand Up @@ -160,6 +237,13 @@ Params putParam(String key, TimeValue value) {
return this;
}

Params withDocAsUpsert(boolean docAsUpsert) {
if (docAsUpsert) {
return putParam("doc_as_upsert", Boolean.TRUE.toString());
}
return this;
}

Params withFetchSourceContext(FetchSourceContext fetchSourceContext) {
if (fetchSourceContext != null) {
if (fetchSourceContext.fetchSource() == false) {
Expand Down Expand Up @@ -203,7 +287,14 @@ Params withRefresh(boolean refresh) {

Params withRefreshPolicy(WriteRequest.RefreshPolicy refreshPolicy) {
if (refreshPolicy != WriteRequest.RefreshPolicy.NONE) {
putParam("refresh", refreshPolicy.getValue());
return putParam("refresh", refreshPolicy.getValue());
}
return this;
}

Params withRetryOnConflict(int retryOnConflict) {
if (retryOnConflict > 0) {
return putParam("retry_on_conflict", String.valueOf(retryOnConflict));
}
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.main.MainRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
Expand All @@ -39,7 +41,6 @@
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
Expand Down Expand Up @@ -121,6 +122,24 @@ public void indexAsync(IndexRequest indexRequest, ActionListener<IndexResponse>
performRequestAsyncAndParseEntity(indexRequest, Request::index, IndexResponse::fromXContent, listener, emptySet(), headers);
}

/**
* Updates a document using the Update API
* <p>
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html">Update API on elastic.co</a>
*/
public UpdateResponse update(UpdateRequest updateRequest, Header... headers) throws IOException {
return performRequestAndParseEntity(updateRequest, Request::update, UpdateResponse::fromXContent, emptySet(), headers);
}

/**
* Asynchronously updates a document using the Update API
* <p>
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html">Update API on elastic.co</a>
*/
public void updateAsync(UpdateRequest updateRequest, ActionListener<UpdateResponse> listener, Header... headers) {
performRequestAsyncAndParseEntity(updateRequest, Request::update, UpdateResponse::fromXContent, listener, emptySet(), headers);
}

private <Req extends ActionRequest, Resp> Resp performRequestAndParseEntity(Req request, Function<Req, Request> requestConverter,
CheckedFunction<XContentParser, Resp, IOException> entityParser, Set<Integer> ignores, Header... headers) throws IOException {
return performRequest(request, requestConverter, (response) -> parseEntity(response.getEntity(), entityParser), ignores, headers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,23 @@
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;

import static org.hamcrest.CoreMatchers.containsString;
import static java.util.Collections.singletonMap;

public class CrudIT extends ESRestHighLevelClientTestCase {

Expand Down Expand Up @@ -262,4 +267,167 @@ public void testIndex() throws IOException {
"version conflict, document already exists (current version [1])]", exception.getMessage());
}
}

public void testUpdate() throws IOException {
{
UpdateRequest updateRequest = new UpdateRequest("index", "type", "does_not_exist");
updateRequest.doc(singletonMap("field", "value"), randomFrom(XContentType.values()));

ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () ->
execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync));
assertEquals(RestStatus.NOT_FOUND, exception.status());
assertEquals("Elasticsearch exception [type=document_missing_exception, reason=[type][does_not_exist]: document missing]",
exception.getMessage());
}
{
IndexRequest indexRequest = new IndexRequest("index", "type", "id");
indexRequest.source(singletonMap("field", "value"));
IndexResponse indexResponse = highLevelClient().index(indexRequest);
assertEquals(RestStatus.CREATED, indexResponse.status());

UpdateRequest updateRequest = new UpdateRequest("index", "type", "id");
updateRequest.doc(singletonMap("field", "updated"), randomFrom(XContentType.values()));

UpdateResponse updateResponse = execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync);
assertEquals(RestStatus.OK, updateResponse.status());
assertEquals(indexResponse.getVersion() + 1, updateResponse.getVersion());

UpdateRequest updateRequestConflict = new UpdateRequest("index", "type", "id");
updateRequestConflict.doc(singletonMap("field", "with_version_conflict"), randomFrom(XContentType.values()));
updateRequestConflict.version(indexResponse.getVersion());

ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () ->
execute(updateRequestConflict, highLevelClient()::update, highLevelClient()::updateAsync));
assertEquals(RestStatus.CONFLICT, exception.status());
assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[type][id]: version conflict, " +
"current version [2] is different than the one provided [1]]", exception.getMessage());
}
{
ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () -> {
UpdateRequest updateRequest = new UpdateRequest("index", "type", "id");
updateRequest.doc(singletonMap("field", "updated"), randomFrom(XContentType.values()));
if (randomBoolean()) {
updateRequest.parent("missing");
} else {
updateRequest.routing("missing");
}
execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync);
});

assertEquals(RestStatus.NOT_FOUND, exception.status());
assertEquals("Elasticsearch exception [type=document_missing_exception, reason=[type][id]: document missing]",
exception.getMessage());
}
{
IndexRequest indexRequest = new IndexRequest("index", "type", "with_script");
indexRequest.source(singletonMap("counter", 12));
IndexResponse indexResponse = highLevelClient().index(indexRequest);
assertEquals(RestStatus.CREATED, indexResponse.status());

UpdateRequest updateRequest = new UpdateRequest("index", "type", "with_script");
Script script = new Script(ScriptType.INLINE, "painless", "ctx._source.counter += params.count", singletonMap("count", 8));
updateRequest.script(script);
updateRequest.fetchSource(true);

UpdateResponse updateResponse = execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync);
assertEquals(RestStatus.OK, updateResponse.status());
assertEquals(DocWriteResponse.Result.UPDATED, updateResponse.getResult());
assertEquals(2L, updateResponse.getVersion());
assertEquals(20, updateResponse.getGetResult().sourceAsMap().get("counter"));

}
{
IndexRequest indexRequest = new IndexRequest("index", "type", "with_doc");
indexRequest.source("field_1", "one", "field_3", "three");
indexRequest.version(12L);
indexRequest.versionType(VersionType.EXTERNAL);
IndexResponse indexResponse = highLevelClient().index(indexRequest);
assertEquals(RestStatus.CREATED, indexResponse.status());
assertEquals(12L, indexResponse.getVersion());

UpdateRequest updateRequest = new UpdateRequest("index", "type", "with_doc");
updateRequest.doc(singletonMap("field_2", "two"), randomFrom(XContentType.values()));
updateRequest.fetchSource("field_*", "field_3");

UpdateResponse updateResponse = execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync);
assertEquals(RestStatus.OK, updateResponse.status());
assertEquals(DocWriteResponse.Result.UPDATED, updateResponse.getResult());
assertEquals(13L, updateResponse.getVersion());
GetResult getResult = updateResponse.getGetResult();
assertEquals(13L, updateResponse.getVersion());
Map<String, Object> sourceAsMap = getResult.sourceAsMap();
assertEquals("one", sourceAsMap.get("field_1"));
assertEquals("two", sourceAsMap.get("field_2"));
assertFalse(sourceAsMap.containsKey("field_3"));
}
{
IndexRequest indexRequest = new IndexRequest("index", "type", "noop");
indexRequest.source("field", "value");
IndexResponse indexResponse = highLevelClient().index(indexRequest);
assertEquals(RestStatus.CREATED, indexResponse.status());
assertEquals(1L, indexResponse.getVersion());

UpdateRequest updateRequest = new UpdateRequest("index", "type", "noop");
updateRequest.doc(singletonMap("field", "value"), randomFrom(XContentType.values()));

UpdateResponse updateResponse = execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync);
assertEquals(RestStatus.OK, updateResponse.status());
assertEquals(DocWriteResponse.Result.NOOP, updateResponse.getResult());
assertEquals(1L, updateResponse.getVersion());

updateRequest.detectNoop(false);

updateResponse = execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync);
assertEquals(RestStatus.OK, updateResponse.status());
assertEquals(DocWriteResponse.Result.UPDATED, updateResponse.getResult());
assertEquals(2L, updateResponse.getVersion());
}
{
UpdateRequest updateRequest = new UpdateRequest("index", "type", "with_upsert");
updateRequest.upsert(singletonMap("doc_status", "created"));
updateRequest.doc(singletonMap("doc_status", "updated"));
updateRequest.fetchSource(true);

UpdateResponse updateResponse = execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync);
assertEquals(RestStatus.CREATED, updateResponse.status());
assertEquals("index", updateResponse.getIndex());
assertEquals("type", updateResponse.getType());
assertEquals("with_upsert", updateResponse.getId());
GetResult getResult = updateResponse.getGetResult();
assertEquals(1L, updateResponse.getVersion());
assertEquals("created", getResult.sourceAsMap().get("doc_status"));
}
{
UpdateRequest updateRequest = new UpdateRequest("index", "type", "with_doc_as_upsert");
updateRequest.doc(singletonMap("field", "initialized"));
updateRequest.fetchSource(true);
updateRequest.docAsUpsert(true);

UpdateResponse updateResponse = execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync);
assertEquals(RestStatus.CREATED, updateResponse.status());
assertEquals("index", updateResponse.getIndex());
assertEquals("type", updateResponse.getType());
assertEquals("with_doc_as_upsert", updateResponse.getId());
GetResult getResult = updateResponse.getGetResult();
assertEquals(1L, updateResponse.getVersion());
assertEquals("initialized", getResult.sourceAsMap().get("field"));
}
{
UpdateRequest updateRequest = new UpdateRequest("index", "type", "with_scripted_upsert");
updateRequest.fetchSource(true);
updateRequest.script(new Script(ScriptType.INLINE, "painless", "ctx._source.level = params.test", singletonMap("test", "C")));
updateRequest.scriptedUpsert(true);
updateRequest.upsert(singletonMap("level", "A"));

UpdateResponse updateResponse = execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync);
assertEquals(RestStatus.CREATED, updateResponse.status());
assertEquals("index", updateResponse.getIndex());
assertEquals("type", updateResponse.getType());
assertEquals("with_scripted_upsert", updateResponse.getId());

GetResult getResult = updateResponse.getGetResult();
assertEquals(1L, updateResponse.getVersion());
assertEquals("C", getResult.sourceAsMap().get("level"));
}
}
}
Loading

0 comments on commit e101f40

Please sign in to comment.