Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REST high-level client: add flush API #28852

Merged
merged 2 commits into from
Mar 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
Expand Down Expand Up @@ -237,6 +239,26 @@ public void refreshAsync(RefreshRequest refreshRequest, ActionListener<RefreshRe
listener, emptySet(), headers);
}

/**
* Flush one or more indices using the Flush API
* <p>
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-flush.html"> Flush API on elastic.co</a>
*/
public FlushResponse flush(FlushRequest flushRequest, Header... headers) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(flushRequest, Request::flush, FlushResponse::fromXContent,
emptySet(), headers);
}

/**
* Asynchronously flush one or more indices using the Flush API
* <p>
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-flush.html"> Flush API on elastic.co</a>
*/
public void flushAsync(FlushRequest flushRequest, ActionListener<FlushResponse> listener, Header... headers) {
restHighLevelClient.performRequestAsyncAndParseEntity(flushRequest, Request::flush, FlushResponse::fromXContent,
listener, emptySet(), headers);
}

/**
* Checks if the index (indices) exists or not.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
Expand Down Expand Up @@ -219,10 +220,17 @@ static Request putMapping(PutMappingRequest putMappingRequest) throws IOExceptio

static Request refresh(RefreshRequest refreshRequest) {
String endpoint = endpoint(refreshRequest.indices(), "_refresh");

Params parameters = Params.builder();
parameters.withIndicesOptions(refreshRequest.indicesOptions());
return new Request(HttpPost.METHOD_NAME, endpoint, parameters.getParams(), null);
}

static Request flush(FlushRequest flushRequest) {
String endpoint = endpoint(flushRequest.indices(), "_flush");
Params parameters = Params.builder();
parameters.withIndicesOptions(flushRequest.indicesOptions());
parameters.putParam("wait_if_ongoing", Boolean.toString(flushRequest.waitIfOngoing()));
parameters.putParam("force", Boolean.toString(flushRequest.force()));
return new Request(HttpPost.METHOD_NAME, endpoint, parameters.getParams(), null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
Expand Down Expand Up @@ -410,6 +412,32 @@ public void testRefresh() throws IOException {
}
}

public void testFlush() throws IOException {
{
String index = "index";
Settings settings = Settings.builder()
.put("number_of_shards", 1)
.put("number_of_replicas", 0)
.build();
createIndex(index, settings);
FlushRequest flushRequest = new FlushRequest(index);
FlushResponse flushResponse =
execute(flushRequest, highLevelClient().indices()::flush, highLevelClient().indices()::flushAsync);
assertThat(flushResponse.getTotalShards(), equalTo(1));
assertThat(flushResponse.getSuccessfulShards(), equalTo(1));
assertThat(flushResponse.getFailedShards(), equalTo(0));
assertThat(flushResponse.getShardFailures(), equalTo(BroadcastResponse.EMPTY));
}
{
String nonExistentIndex = "non_existent_index";
assertFalse(indexExists(nonExistentIndex));
FlushRequest refreshRequest = new FlushRequest(nonExistentIndex);
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
() -> execute(refreshRequest, highLevelClient().indices()::flush, highLevelClient().indices()::flushAsync));
assertEquals(RestStatus.NOT_FOUND, exception.status());
}
}

public void testExistsAlias() throws IOException {
GetAliasesRequest getAliasesRequest = new GetAliasesRequest("alias");
assertFalse(execute(getAliasesRequest, highLevelClient().indices()::existsAlias, highLevelClient().indices()::existsAliasAsync));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
Expand Down Expand Up @@ -537,15 +538,43 @@ public void testIndex() throws IOException {
}

public void testRefresh() {
String[] indices = randomIndicesNames(1, 5);
String[] indices = randomIndicesNames(0, 5);
RefreshRequest refreshRequest = new RefreshRequest(indices);

Map<String, String> expectedParams = new HashMap<>();
setRandomIndicesOptions(refreshRequest::indicesOptions, refreshRequest::indicesOptions, expectedParams);

Request request = Request.refresh(refreshRequest);
StringJoiner endpoint = new StringJoiner("/", "/", "").add(String.join(",", indices)).add("_refresh");
assertThat(endpoint.toString(), equalTo(request.getEndpoint()));
StringJoiner endpoint = new StringJoiner("/", "/", "");
if (indices.length > 0) {
endpoint.add(String.join(",", indices));
}
endpoint.add("_refresh");
assertThat(request.getEndpoint(), equalTo(endpoint.toString()));
assertThat(request.getParameters(), equalTo(expectedParams));
assertThat(request.getEntity(), nullValue());
assertThat(request.getMethod(), equalTo(HttpPost.METHOD_NAME));
}

public void testFlush() {
String[] indices = randomIndicesNames(0, 5);
FlushRequest flushRequest = new FlushRequest(indices);
Map<String, String> expectedParams = new HashMap<>();
setRandomIndicesOptions(flushRequest::indicesOptions, flushRequest::indicesOptions, expectedParams);
if (randomBoolean()) {
flushRequest.force(randomBoolean());
}
expectedParams.put("force", Boolean.toString(flushRequest.force()));
if (randomBoolean()) {
flushRequest.waitIfOngoing(randomBoolean());
}
expectedParams.put("wait_if_ongoing", Boolean.toString(flushRequest.waitIfOngoing()));

Request request = Request.flush(flushRequest);
StringJoiner endpoint = new StringJoiner("/", "/", "");
if (indices.length > 0) {
endpoint.add(String.join(",", indices));
}
endpoint.add("_flush");
assertThat(request.getEndpoint(), equalTo(endpoint.toString()));
assertThat(request.getParameters(), equalTo(expectedParams));
assertThat(request.getEntity(), nullValue());
assertThat(request.getMethod(), equalTo(HttpPost.METHOD_NAME));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
Expand Down Expand Up @@ -691,6 +693,82 @@ public void onFailure(Exception e) {
}
}

public void testFlushIndex() throws Exception {
RestHighLevelClient client = highLevelClient();

{
createIndex("index1", Settings.EMPTY);
}

{
// tag::flush-request
FlushRequest request = new FlushRequest("index1"); // <1>
FlushRequest requestMultiple = new FlushRequest("index1", "index2"); // <2>
FlushRequest requestAll = new FlushRequest(); // <3>
// end::flush-request

// tag::flush-request-indicesOptions
request.indicesOptions(IndicesOptions.lenientExpandOpen()); // <1>
// end::flush-request-indicesOptions

// tag::flush-request-wait
request.waitIfOngoing(true); // <1>
// end::flush-request-wait

// tag::flush-request-force
request.force(true); // <1>
// end::flush-request-force

// tag::flush-execute
FlushResponse flushResponse = client.indices().flush(request);
// end::flush-execute

// tag::flush-response
int totalShards = flushResponse.getTotalShards(); // <1>
int successfulShards = flushResponse.getSuccessfulShards(); // <2>
int failedShards = flushResponse.getFailedShards(); // <3>
DefaultShardOperationFailedException[] failures = flushResponse.getShardFailures(); // <4>
// end::flush-response

// tag::flush-execute-listener
ActionListener<FlushResponse> listener = new ActionListener<FlushResponse>() {
@Override
public void onResponse(FlushResponse refreshResponse) {
// <1>
}

@Override
public void onFailure(Exception e) {
// <2>
}
};
// end::flush-execute-listener

// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
listener = new LatchedActionListener<>(listener, latch);

// tag::flush-execute-async
client.indices().flushAsync(request, listener); // <1>
// end::flush-execute-async

assertTrue(latch.await(30L, TimeUnit.SECONDS));
}

{
// tag::flush-notfound
try {
FlushRequest request = new FlushRequest("does_not_exist");
client.indices().flush(request);
} catch (ElasticsearchException exception) {
if (exception.status() == RestStatus.NOT_FOUND) {
// <1>
}
}
// end::flush-notfound
}
}

public void testCloseIndex() throws Exception {
RestHighLevelClient client = highLevelClient();

Expand Down
96 changes: 96 additions & 0 deletions docs/java-rest/high-level/indices/flush.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
[[java-rest-high-flush]]
=== Flush API

[[java-rest-high-flush-request]]
==== Flush Request

A `FlushRequest` can be applied to one or more indices, or even on `_all` the indices:

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-request]
--------------------------------------------------
<1> Flush one index
<2> Flush multiple indices
<3> Flush all the indices

==== Optional arguments

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-request-indicesOptions]
--------------------------------------------------
<1> Setting `IndicesOptions` controls how unavailable indices are resolved and
how wildcard expressions are expanded

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-request-wait]
--------------------------------------------------
<1> Set the `wait_if_ongoing` flag to `true`

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-request-force]
--------------------------------------------------
<1> Set the `force` flag to `true`

[[java-rest-high-flush-sync]]
==== Synchronous Execution

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-execute]
--------------------------------------------------

[[java-rest-high-flush-async]]
==== Asynchronous Execution

The asynchronous execution of a flush request requires both the `FlushRequest`
instance and an `ActionListener` instance to be passed to the asynchronous
method:

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-execute-async]
--------------------------------------------------
<1> The `FlushRequest` to execute and the `ActionListener` to use when
the execution completes

The asynchronous method does not block and returns immediately. Once it is
completed the `ActionListener` is called back using the `onResponse` method
if the execution successfully completed or using the `onFailure` method if
it failed.

A typical listener for `FlushResponse` looks like:

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-execute-listener]
--------------------------------------------------
<1> Called when the execution is successfully completed. The response is
provided as an argument
<2> Called in case of failure. The raised exception is provided as an argument

[[java-rest-high-flush-response]]
==== Flush Response

The returned `FlushResponse` allows to retrieve information about the
executed operation as follows:

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-response]
--------------------------------------------------
<1> Total number of shards hit by the flush request
<2> Number of shards where the flush has succeeded
<3> Number of shards where the flush has failed
<4> A list of failures if the operation failed on one or more shards

By default, if the indices were not found, an `ElasticsearchException` will be thrown:

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-notfound]
--------------------------------------------------
<1> Do something if the indices to be flushed were not found
2 changes: 2 additions & 0 deletions docs/java-rest/high-level/supported-apis.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ Index Management::
* <<java-rest-high-shrink-index>>
* <<java-rest-high-split-index>>
* <<java-rest-high-refresh>>
* <<java-rest-high-flush>>
* <<java-rest-high-rollover-index>>

Mapping Management::
Expand All @@ -70,6 +71,7 @@ include::indices/close_index.asciidoc[]
include::indices/shrink_index.asciidoc[]
include::indices/split_index.asciidoc[]
include::indices/refresh.asciidoc[]
include::indices/flush.asciidoc[]
include::indices/rollover.asciidoc[]
include::indices/put_mapping.asciidoc[]
include::indices/update_aliases.asciidoc[]
Expand Down
Loading