Skip to content

Commit

Permalink
Async search: rename REST parameters (#54198)
Browse files Browse the repository at this point in the history
This commit renames wait_for_completion to wait_for_completion_timeout in submit async search and get async search.
Also it renames clean_on_completion to keep_on_completion and turns around its behaviour.

Closes #54069
  • Loading branch information
javanna authored Mar 26, 2020
1 parent a65e95e commit 1c48214
Show file tree
Hide file tree
Showing 16 changed files with 87 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ static Request submitAsyncSearch(SubmitAsyncSearchRequest asyncSearchRequest) th
request.setEntity(RequestConverters.createEntity(asyncSearchRequest.getSearchSource(), REQUEST_BODY_CONTENT_TYPE));
}
// set async search submit specific parameters
if (asyncSearchRequest.isCleanOnCompletion() != null) {
params.putParam("clean_on_completion", asyncSearchRequest.isCleanOnCompletion().toString());
if (asyncSearchRequest.isKeepOnCompletion() != null) {
params.putParam("keep_on_completion", asyncSearchRequest.isKeepOnCompletion().toString());
}
if (asyncSearchRequest.getKeepAlive() != null) {
params.putParam("keep_alive", asyncSearchRequest.getKeepAlive().getStringRep());
}
if (asyncSearchRequest.getWaitForCompletion() != null) {
params.putParam("wait_for_completion", asyncSearchRequest.getWaitForCompletion().getStringRep());
if (asyncSearchRequest.getWaitForCompletionTimeout() != null) {
params.putParam("wait_for_completion_timeout", asyncSearchRequest.getWaitForCompletionTimeout().getStringRep());
}
request.addParameters(params.asMap());
return request;
Expand All @@ -76,7 +76,7 @@ static void addSearchRequestParams(Params params, SubmitAsyncSearchRequest reque
params.withBatchedReduceSize(request.getBatchedReduceSize());
}

static Request getAsyncSearch(GetAsyncSearchRequest asyncSearchRequest) throws IOException {
static Request getAsyncSearch(GetAsyncSearchRequest asyncSearchRequest) {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_async_search")
.addPathPart(asyncSearchRequest.getId())
Expand All @@ -87,13 +87,13 @@ static Request getAsyncSearch(GetAsyncSearchRequest asyncSearchRequest) throws I
params.putParam("keep_alive", asyncSearchRequest.getKeepAlive().getStringRep());
}
if (asyncSearchRequest.getWaitForCompletion() != null) {
params.putParam("wait_for_completion", asyncSearchRequest.getWaitForCompletion().getStringRep());
params.putParam("wait_for_completion_timeout", asyncSearchRequest.getWaitForCompletion().getStringRep());
}
request.addParameters(params.asMap());
return request;
}

static Request deleteAsyncSearch(DeleteAsyncSearchRequest deleteAsyncSearchRequest) throws IOException {
static Request deleteAsyncSearch(DeleteAsyncSearchRequest deleteAsyncSearchRequest) {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_async_search")
.addPathPart(deleteAsyncSearchRequest.getId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public class SubmitAsyncSearchRequest implements Validatable {

public static long MIN_KEEP_ALIVE = TimeValue.timeValueMinutes(1).millis();

private TimeValue waitForCompletion;
private Boolean cleanOnCompletion;
private TimeValue waitForCompletionTimeout;
private Boolean keepOnCompletion;
private TimeValue keepAlive;
private final SearchRequest searchRequest;

Expand All @@ -70,29 +70,29 @@ public String[] getIndices() {
/**
* Get the minimum time that the request should wait before returning a partial result (defaults to 1 second).
*/
public TimeValue getWaitForCompletion() {
return waitForCompletion;
public TimeValue getWaitForCompletionTimeout() {
return waitForCompletionTimeout;
}

/**
* Sets the minimum time that the request should wait before returning a partial result (defaults to 1 second).
*/
public void setWaitForCompletion(TimeValue waitForCompletion) {
this.waitForCompletion = waitForCompletion;
public void setWaitForCompletionTimeout(TimeValue waitForCompletionTimeout) {
this.waitForCompletionTimeout = waitForCompletionTimeout;
}

/**
* Returns whether the resource resource should be removed on completion or failure (defaults to true).
*/
public Boolean isCleanOnCompletion() {
return cleanOnCompletion;
public Boolean isKeepOnCompletion() {
return keepOnCompletion;
}

/**
* Determines if the resource should be removed on completion or failure (defaults to true).
*/
public void setCleanOnCompletion(boolean cleanOnCompletion) {
this.cleanOnCompletion = cleanOnCompletion;
public void setKeepOnCompletion(boolean keepOnCompletion) {
this.keepOnCompletion = keepOnCompletion;
}

/**
Expand Down Expand Up @@ -273,12 +273,12 @@ public boolean equals(Object o) {
SubmitAsyncSearchRequest request = (SubmitAsyncSearchRequest) o;
return Objects.equals(searchRequest, request.searchRequest)
&& Objects.equals(getKeepAlive(), request.getKeepAlive())
&& Objects.equals(getWaitForCompletion(), request.getWaitForCompletion())
&& Objects.equals(isCleanOnCompletion(), request.isCleanOnCompletion());
&& Objects.equals(getWaitForCompletionTimeout(), request.getWaitForCompletionTimeout())
&& Objects.equals(isKeepOnCompletion(), request.isKeepOnCompletion());
}

@Override
public int hashCode() {
return Objects.hash(searchRequest, getKeepAlive(), getWaitForCompletion(), isCleanOnCompletion());
return Objects.hash(searchRequest, getKeepAlive(), getWaitForCompletionTimeout(), isKeepOnCompletion());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,19 @@ public void testSubmitAsyncSearch() throws Exception {
setRandomIndicesOptions(submitRequest::setIndicesOptions, submitRequest::getIndicesOptions, expectedParams);

if (randomBoolean()) {
boolean cleanOnCompletion = randomBoolean();
submitRequest.setCleanOnCompletion(cleanOnCompletion);
expectedParams.put("clean_on_completion", Boolean.toString(cleanOnCompletion));
boolean keepOnCompletion = randomBoolean();
submitRequest.setKeepOnCompletion(keepOnCompletion);
expectedParams.put("keep_on_completion", Boolean.toString(keepOnCompletion));
}
if (randomBoolean()) {
TimeValue keepAlive = TimeValue.parseTimeValue(randomTimeValue(), "test");
submitRequest.setKeepAlive(keepAlive);
expectedParams.put("keep_alive", keepAlive.getStringRep());
}
if (randomBoolean()) {
TimeValue waitForCompletion = TimeValue.parseTimeValue(randomTimeValue(), "test");
submitRequest.setWaitForCompletion(waitForCompletion);
expectedParams.put("wait_for_completion", waitForCompletion.getStringRep());
TimeValue waitForCompletionTimeout = TimeValue.parseTimeValue(randomTimeValue(), "test");
submitRequest.setWaitForCompletionTimeout(waitForCompletionTimeout);
expectedParams.put("wait_for_completion_timeout", waitForCompletionTimeout.getStringRep());
}

Request request = AsyncSearchRequestConverters.submitAsyncSearch(submitRequest);
Expand Down Expand Up @@ -128,7 +128,7 @@ public void testGetAsyncSearch() throws Exception {
if (randomBoolean()) {
TimeValue waitForCompletion = TimeValue.parseTimeValue(randomTimeValue(), "test");
submitRequest.setWaitForCompletion(waitForCompletion);
expectedParams.put("wait_for_completion", waitForCompletion.getStringRep());
expectedParams.put("wait_for_completion_timeout", waitForCompletion.getStringRep());
}

Request request = AsyncSearchRequestConverters.getAsyncSearch(submitRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public void testAsyncSearch() throws IOException {

SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(QueryBuilders.matchAllQuery());
SubmitAsyncSearchRequest submitRequest = new SubmitAsyncSearchRequest(sourceBuilder, index);
submitRequest.setCleanOnCompletion(false);
submitRequest.setKeepOnCompletion(true);
AsyncSearchResponse submitResponse = highLevelClient().asyncSearch().submit(submitRequest, RequestOptions.DEFAULT);
assertNotNull(submitResponse.getId());
assertFalse(submitResponse.isPartial());
Expand Down
17 changes: 9 additions & 8 deletions docs/reference/search/async-search.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ POST /sales*/_async_search?size=0
}
--------------------------------------------------
// TEST[setup:sales]
// TEST[s/size=0/size=0&wait_for_completion=10s&clean_on_completion=false/]
// TEST[s/size=0/size=0&wait_for_completion_timeout=10s&keep_on_completion=true/]

The response contains an identifier of the search being executed.
You can use this ID to later retrieve the search's final results.
Expand Down Expand Up @@ -88,8 +88,8 @@ results are returned as part of the <<search-api-response-body,`response`>> obje
<6> How many documents are currently matching the query, which belong to the shards that have already completed the search

It is possible to block and wait until the search is completed up to a certain
timeout by providing the `wait_for_completion` parameter, which defaults to
`1` second.
timeout by providing the `wait_for_completion_timeout` parameter, which
defaults to `1` second.

You can also specify how long the async search needs to be
available through the `keep_alive` parameter, which defaults to `5d` (five days).
Expand Down Expand Up @@ -193,11 +193,12 @@ first.
<6> Partial aggregations results, coming from the shards that have already
completed the execution of the query.

The `wait_for_completion` parameter, which defaults to `1`, can also be provided
when calling the Get Async Search API, in order to wait for the search to be
completed up until the provided timeout. Final results will be returned if
available before the timeout expires, otherwise the currently available results
will be returned once the timeout expires.
The `wait_for_completion_timeout` parameter can also be provided when calling
the Get Async Search API, in order to wait for the search to be completed up
until the provided timeout. Final results will be returned if available before
the timeout expires, otherwise the currently available results will be
returned once the timeout expires. By default no timeout is set meaning that
the currently available results will be returned without any additional wait.

The `keep_alive` parameter specifies how long the async search should be
available in the cluster. When not specified, the `keep_alive` set with the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,16 +127,16 @@ static Response submitAsyncSearch(String indexName, String query, TimeValue wait
final Request request = new Request("POST", indexName + "/_async_search");
setRunAsHeader(request, user);
request.addParameter("q", query);
request.addParameter("wait_for_completion", waitForCompletion.toString());
request.addParameter("wait_for_completion_timeout", waitForCompletion.toString());
// we do the cleanup explicitly
request.addParameter("clean_on_completion", "false");
request.addParameter("keep_on_completion", "true");
return client().performRequest(request);
}

static Response getAsyncSearch(String id, String user) throws IOException {
final Request request = new Request("GET", "/_async_search/" + id);
setRunAsHeader(request, user);
request.addParameter("wait_for_completion", "0ms");
request.addParameter("wait_for_completion_timeout", "0ms");
return client().performRequest(request);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ public String getName() {
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
GetAsyncSearchAction.Request get = new GetAsyncSearchAction.Request(request.param("id"));
if (request.hasParam("wait_for_completion")) {
get.setWaitForCompletion(request.paramAsTime("wait_for_completion", get.getWaitForCompletion()));
if (request.hasParam("wait_for_completion_timeout")) {
get.setWaitForCompletion(request.paramAsTime("wait_for_completion_timeout", get.getWaitForCompletion()));
}
if (request.hasParam("keep_alive")) {
get.setKeepAlive(request.paramAsTime("keep_alive", get.getKeepAlive()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
request.withContentOrSourceParamParserOrNull(parser ->
parseSearchRequest(submit.getSearchRequest(), request, parser, setSize));

if (request.hasParam("wait_for_completion")) {
submit.setWaitForCompletion(request.paramAsTime("wait_for_completion", submit.getWaitForCompletion()));
if (request.hasParam("wait_for_completion_timeout")) {
submit.setWaitForCompletionTimeout(request.paramAsTime("wait_for_completion_timeout", submit.getWaitForCompletionTimeout()));
}
if (request.hasParam("keep_alive")) {
submit.setKeepAlive(request.paramAsTime("keep_alive", submit.getKeepAlive()));
}
if (request.hasParam("clean_on_completion")) {
submit.setCleanOnCompletion(request.paramAsBoolean("clean_on_completion", submit.isCleanOnCompletion()));
if (request.hasParam("keep_on_completion")) {
submit.setKeepOnCompletion(request.paramAsBoolean("keep_on_completion", submit.isKeepOnCompletion()));
}
return channel -> {
RestStatusToXContentListener<AsyncSearchResponse> listener = new RestStatusToXContentListener<>(channel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected void doExecute(Task task, SubmitAsyncSearchRequest request, ActionList
new ActionListener<>() {
@Override
public void onResponse(AsyncSearchResponse searchResponse) {
if (searchResponse.isRunning() || request.isCleanOnCompletion() == false) {
if (searchResponse.isRunning() || request.isKeepOnCompletion()) {
// the task is still running and the user cannot wait more so we create
// a document for further retrieval
try {
Expand Down Expand Up @@ -126,7 +126,7 @@ public void onFailure(Exception exc) {
public void onFailure(Exception exc) {
submitListener.onFailure(exc);
}
}, request.getWaitForCompletion());
}, request.getWaitForCompletionTimeout());
}

private SearchRequest createSearchRequest(SubmitAsyncSearchRequest request, CancellableTask submitTask, TimeValue keepAlive) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,14 +238,14 @@ public void testInvalidId() throws Exception {

public void testNoIndex() throws Exception {
SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest("invalid-*");
request.setWaitForCompletion(TimeValue.timeValueMillis(1));
request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
AsyncSearchResponse response = submitAsyncSearch(request);
assertNotNull(response.getSearchResponse());
assertFalse(response.isRunning());
assertThat(response.getSearchResponse().getTotalShards(), equalTo(0));

request = new SubmitAsyncSearchRequest("invalid");
request.setWaitForCompletion(TimeValue.timeValueMillis(1));
request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
response = submitAsyncSearch(request);
assertNull(response.getSearchResponse());
assertNotNull(response.getFailure());
Expand All @@ -259,7 +259,7 @@ public void testCancellation() throws Exception {
request.getSearchRequest().source(
new SearchSourceBuilder().aggregation(new CancellingAggregationBuilder("test"))
);
request.setWaitForCompletion(TimeValue.timeValueMillis(1));
request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
AsyncSearchResponse response = submitAsyncSearch(request);
assertNotNull(response.getSearchResponse());
assertTrue(response.isRunning());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ protected SearchResponseIterator assertBlockingIterator(String indexName,
int progressStep) throws Exception {
SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(source, indexName);
request.setBatchedReduceSize(progressStep);
request.setWaitForCompletion(TimeValue.timeValueMillis(1));
request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
ClusterSearchShardsResponse response = dataNodeClient().admin().cluster()
.prepareSearchShards(request.getSearchRequest().indices()).get();
AtomicInteger failures = new AtomicInteger(numFailures);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ protected SubmitAsyncSearchRequest createTestInstance() {
searchRequest = new SubmitAsyncSearchRequest();
}
if (randomBoolean()) {
searchRequest.setWaitForCompletion(TimeValue.parseTimeValue(randomPositiveTimeValue(), "wait_for_completion"));
searchRequest.setWaitForCompletionTimeout(TimeValue.parseTimeValue(randomPositiveTimeValue(), "wait_for_completion"));
}
searchRequest.setCleanOnCompletion(randomBoolean());
searchRequest.setKeepOnCompletion(randomBoolean());
if (randomBoolean()) {
searchRequest.setKeepAlive(TimeValue.parseTimeValue(randomPositiveTimeValue(), "keep_alive"));
}
Expand Down
Loading

0 comments on commit 1c48214

Please sign in to comment.