diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/eql/EqlSearchRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/eql/EqlSearchRequest.java index 8eb27b30596fa..876d2a8d25bec 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/eql/EqlSearchRequest.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/eql/EqlSearchRequest.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Validatable; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -44,6 +45,11 @@ public class EqlSearchRequest implements Validatable, ToXContentObject { private SearchAfterBuilder searchAfterBuilder; private String query; + // Async settings + private TimeValue waitForCompletionTimeout; + private boolean keepOnCompletion; + private TimeValue keepAlive; + static final String KEY_FILTER = "filter"; static final String KEY_TIMESTAMP_FIELD = "timestamp_field"; static final String KEY_EVENT_CATEGORY_FIELD = "event_category_field"; @@ -51,6 +57,9 @@ public class EqlSearchRequest implements Validatable, ToXContentObject { static final String KEY_SIZE = "size"; static final String KEY_SEARCH_AFTER = "search_after"; static final String KEY_QUERY = "query"; + static final String KEY_WAIT_FOR_COMPLETION_TIMEOUT = "wait_for_completion_timeout"; + static final String KEY_KEEP_ALIVE = "keep_alive"; + static final String KEY_KEEP_ON_COMPLETION = "keep_on_completion"; public EqlSearchRequest(String indices, String query) { indices(indices); @@ -75,6 +84,13 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par } builder.field(KEY_QUERY, query); + if (waitForCompletionTimeout != null) { + builder.field(KEY_WAIT_FOR_COMPLETION_TIMEOUT, waitForCompletionTimeout); + } + if (keepAlive != null) { + builder.field(KEY_KEEP_ALIVE, keepAlive); + } + builder.field(KEY_KEEP_ON_COMPLETION, keepOnCompletion); builder.endObject(); return builder; } @@ -166,6 +182,32 @@ public EqlSearchRequest query(String query) { return this; } + public TimeValue waitForCompletionTimeout() { + return waitForCompletionTimeout; + } + + public EqlSearchRequest waitForCompletionTimeout(TimeValue waitForCompletionTimeout) { + this.waitForCompletionTimeout = waitForCompletionTimeout; + return this; + } + + public Boolean keepOnCompletion() { + return keepOnCompletion; + } + + public void keepOnCompletion(Boolean keepOnCompletion) { + this.keepOnCompletion = keepOnCompletion; + } + + public TimeValue keepAlive() { + return keepAlive; + } + + public EqlSearchRequest keepAlive(TimeValue keepAlive) { + this.keepAlive = keepAlive; + return this; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -183,7 +225,10 @@ public boolean equals(Object o) { Objects.equals(eventCategoryField, that.eventCategoryField) && Objects.equals(implicitJoinKeyField, that.implicitJoinKeyField) && Objects.equals(searchAfterBuilder, that.searchAfterBuilder) && - Objects.equals(query, that.query); + Objects.equals(query, that.query) && + Objects.equals(waitForCompletionTimeout, that.waitForCompletionTimeout) && + Objects.equals(keepAlive, that.keepAlive) && + Objects.equals(keepOnCompletion, that.keepOnCompletion); } @Override @@ -197,7 +242,10 @@ public int hashCode() { eventCategoryField, implicitJoinKeyField, searchAfterBuilder, - query); + query, + waitForCompletionTimeout, + keepAlive, + keepOnCompletion); } public String[] indices() { diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/eql/EqlSearchResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/eql/EqlSearchResponse.java index 76d224342739c..f359f3813107a 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/eql/EqlSearchResponse.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/eql/EqlSearchResponse.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.InstantiatingObjectParser; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; @@ -32,43 +33,56 @@ import java.util.List; import java.util.Objects; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; + public class EqlSearchResponse { private final Hits hits; private final long tookInMillis; private final boolean isTimeout; + private final String asyncExecutionId; + private final boolean isRunning; + private final boolean isPartial; private static final class Fields { static final String TOOK = "took"; static final String TIMED_OUT = "timed_out"; static final String HITS = "hits"; + static final String ID = "id"; + static final String IS_RUNNING = "is_running"; + static final String IS_PARTIAL = "is_partial"; } private static final ParseField TOOK = new ParseField(Fields.TOOK); private static final ParseField TIMED_OUT = new ParseField(Fields.TIMED_OUT); private static final ParseField HITS = new ParseField(Fields.HITS); + private static final ParseField ID = new ParseField(Fields.ID); + private static final ParseField IS_RUNNING = new ParseField(Fields.IS_RUNNING); + private static final ParseField IS_PARTIAL = new ParseField(Fields.IS_PARTIAL); - private static final ConstructingObjectParser PARSER = - new ConstructingObjectParser<>("eql/search_response", true, - args -> { - int i = 0; - Hits hits = (Hits) args[i++]; - Long took = (Long) args[i++]; - Boolean timeout = (Boolean) args[i]; - return new EqlSearchResponse(hits, took, timeout); - }); - + private static final InstantiatingObjectParser PARSER; static { - PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> Hits.fromXContent(p), HITS); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOOK); - PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), TIMED_OUT); + InstantiatingObjectParser.Builder parser = + InstantiatingObjectParser.builder("eql/search_response", true, EqlSearchResponse.class); + parser.declareObject(constructorArg(), (p, c) -> Hits.fromXContent(p), HITS); + parser.declareLong(constructorArg(), TOOK); + parser.declareBoolean(constructorArg(), TIMED_OUT); + parser.declareString(optionalConstructorArg(), ID); + parser.declareBoolean(constructorArg(), IS_RUNNING); + parser.declareBoolean(constructorArg(), IS_PARTIAL); + PARSER = parser.build(); } - public EqlSearchResponse(Hits hits, long tookInMillis, boolean isTimeout) { + public EqlSearchResponse(Hits hits, long tookInMillis, boolean isTimeout, String asyncExecutionId, + boolean isRunning, boolean isPartial) { super(); this.hits = hits == null ? Hits.EMPTY : hits; this.tookInMillis = tookInMillis; this.isTimeout = isTimeout; + this.asyncExecutionId = asyncExecutionId; + this.isRunning = isRunning; + this.isPartial = isPartial; } public static EqlSearchResponse fromXContent(XContentParser parser) { @@ -87,6 +101,18 @@ public Hits hits() { return hits; } + public String id() { + return asyncExecutionId; + } + + public boolean isRunning() { + return isRunning; + } + + public boolean isPartial() { + return isPartial; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/eql/EqlSearchResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/eql/EqlSearchResponseTests.java index c4baf7cd6233c..784964aa2bb9c 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/eql/EqlSearchResponseTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/eql/EqlSearchResponseTests.java @@ -57,7 +57,12 @@ public static org.elasticsearch.xpack.eql.action.EqlSearchResponse createRandomE if (randomBoolean()) { hits = new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Hits(randomEvents(), null, null, totalHits); } - return new org.elasticsearch.xpack.eql.action.EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean()); + if (randomBoolean()) { + return new org.elasticsearch.xpack.eql.action.EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean()); + } else { + return new org.elasticsearch.xpack.eql.action.EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean(), + randomAlphaOfLength(10), randomBoolean(), randomBoolean()); + } } public static org.elasticsearch.xpack.eql.action.EqlSearchResponse createRandomSequencesResponse(TotalHits totalHits) { @@ -77,7 +82,12 @@ public static org.elasticsearch.xpack.eql.action.EqlSearchResponse createRandomS if (randomBoolean()) { hits = new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Hits(null, seq, null, totalHits); } - return new org.elasticsearch.xpack.eql.action.EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean()); + if (randomBoolean()) { + return new org.elasticsearch.xpack.eql.action.EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean()); + } else { + return new org.elasticsearch.xpack.eql.action.EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean(), + randomAlphaOfLength(10), randomBoolean(), randomBoolean()); + } } public static org.elasticsearch.xpack.eql.action.EqlSearchResponse createRandomCountResponse(TotalHits totalHits) { @@ -97,7 +107,12 @@ public static org.elasticsearch.xpack.eql.action.EqlSearchResponse createRandomC if (randomBoolean()) { hits = new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Hits(null, null, cn, totalHits); } - return new org.elasticsearch.xpack.eql.action.EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean()); + if (randomBoolean()) { + return new org.elasticsearch.xpack.eql.action.EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean()); + } else { + return new org.elasticsearch.xpack.eql.action.EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean(), + randomAlphaOfLength(10), randomBoolean(), randomBoolean()); + } } public static org.elasticsearch.xpack.eql.action.EqlSearchResponse createRandomInstance(TotalHits totalHits) { diff --git a/docs/reference/eql/eql-search-api.asciidoc b/docs/reference/eql/eql-search-api.asciidoc index 87ba38cba8564..b176979d10d30 100644 --- a/docs/reference/eql/eql-search-api.asciidoc +++ b/docs/reference/eql/eql-search-api.asciidoc @@ -328,7 +328,9 @@ the https://en.wikipedia.org/wiki/Unix_time[Unix epoch], in ascending order. ] } ] - } + }, + "is_partial": false, + "is_running": false } ---- -// TESTRESPONSE[s/"took": 6/"took": $body.took/] \ No newline at end of file +// TESTRESPONSE[s/"took": 6/"took": $body.took/] diff --git a/docs/reference/eql/search.asciidoc b/docs/reference/eql/search.asciidoc index f458d0d30e404..84f3e1f28f0b6 100644 --- a/docs/reference/eql/search.asciidoc +++ b/docs/reference/eql/search.asciidoc @@ -121,7 +121,9 @@ https://en.wikipedia.org/wiki/Unix_time[Unix epoch], in ascending order. ] } ] - } + }, + "is_partial": false, + "is_running": false } ---- // TESTRESPONSE[s/"took": 60/"took": $body.took/] diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java index 1324dd1f4624b..1b3238ea9d528 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java @@ -97,7 +97,7 @@ public void onResponse(AsyncSearchResponse searchResponse) { // creates the fallback response if the node crashes/restarts in the middle of the request // TODO: store intermediate results ? AsyncSearchResponse initialResp = searchResponse.clone(searchResponse.getId()); - store.storeInitialResponse(docId, searchTask.getOriginHeaders(), initialResp, + store.createResponse(docId, searchTask.getOriginHeaders(), initialResp, new ActionListener<>() { @Override public void onResponse(IndexResponse r) { @@ -191,7 +191,7 @@ private void onFinalResponse(CancellableTask submitTask, } try { - store.storeFinalResponse(searchTask.getExecutionId().getDocId(), threadContext.getResponseHeaders(),response, + store.updateResponse(searchTask.getExecutionId().getDocId(), threadContext.getResponseHeaders(),response, ActionListener.wrap(resp -> unregisterTaskAndMoveOn(searchTask, nextAction), exc -> { Throwable cause = ExceptionsHelper.unwrapCause(exc); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java index 49a457f0a7f55..3fcc0b4cb8b2e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java @@ -53,6 +53,7 @@ public final class ClientHelper { public static final String ENRICH_ORIGIN = "enrich"; public static final String TRANSFORM_ORIGIN = "transform"; public static final String ASYNC_SEARCH_ORIGIN = "async_search"; + public static final String ASYNC_EQL_SEARCH_ORIGIN = "async_eql_search"; public static final String IDP_ORIGIN = "idp"; private ClientHelper() {} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java index 9afb9b3a53642..607d967548c91 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java @@ -163,10 +163,10 @@ void createIndexIfNecessary(ActionListener listener) { * Stores the initial response with the original headers of the authenticated user * and the expected expiration time. */ - public void storeInitialResponse(String docId, - Map headers, - R response, - ActionListener listener) throws IOException { + public void createResponse(String docId, + Map headers, + R response, + ActionListener listener) throws IOException { Map source = new HashMap<>(); source.put(HEADERS_FIELD, headers); source.put(EXPIRATION_TIME_FIELD, response.getExpirationTime()); @@ -181,10 +181,10 @@ public void storeInitialResponse(String docId, /** * Stores the final response if the place-holder document is still present (update). */ - public void storeFinalResponse(String docId, - Map> responseHeaders, - R response, - ActionListener listener) throws IOException { + public void updateResponse(String docId, + Map> responseHeaders, + R response, + ActionListener listener) throws IOException { Map source = new HashMap<>(); source.put(RESPONSE_HEADERS_FIELD, responseHeaders); source.put(RESULT_FIELD, encodeResponse(response)); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskManagementService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskManagementService.java new file mode 100644 index 0000000000000..a143f1e118adb --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskManagementService.java @@ -0,0 +1,236 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.async; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.engine.DocumentMissingException; +import org.elasticsearch.index.engine.VersionConflictEngineException; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskAwareRequest; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.threadpool.Scheduler; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +public class AsyncTaskManagementService< + Request extends TaskAwareRequest, + Response extends ActionResponse, + T extends CancellableTask & AsyncTask> { + + private static final Logger logger = LogManager.getLogger(AsyncTaskManagementService.class); + + private final TaskManager taskManager; + private final String action; + private final AsyncTaskIndexService> asyncTaskIndexService; + private final AsyncOperation operation; + private final ThreadPool threadPool; + private final ClusterService clusterService; + private final Class taskClass; + + public interface AsyncOperation { + + T createTask(Request request, long id, String type, String action, TaskId parentTaskId, Map headers, + Map originHeaders, AsyncExecutionId asyncExecutionId); + + void operation(Request request, T task, ActionListener listener); + + Response initialResponse(T task); + + Response readResponse(StreamInput inputStream) throws IOException; + } + + /** + * Wrapper for EqlSearchRequest that creates an async version of EqlSearchTask + */ + private class AsyncRequestWrapper implements TaskAwareRequest { + private final Request request; + private final String doc; + private final String node; + + AsyncRequestWrapper(Request request, String node) { + this.request = request; + this.doc = UUIDs.randomBase64UUID(); + this.node = node; + } + + @Override + public void setParentTask(TaskId taskId) { + request.setParentTask(taskId); + } + + @Override + public TaskId getParentTask() { + return request.getParentTask(); + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return operation.createTask(request, id, type, action, parentTaskId, headers, threadPool.getThreadContext().getHeaders(), + new AsyncExecutionId(doc, new TaskId(node, id))); + } + + @Override + public String getDescription() { + return request.getDescription(); + } + } + + public AsyncTaskManagementService(String index, Client client, String origin, NamedWriteableRegistry registry, TaskManager taskManager, + String action, AsyncOperation operation, Class taskClass, + ClusterService clusterService, + ThreadPool threadPool + ) { + this.taskManager = taskManager; + this.action = action; + this.operation = operation; + this.taskClass = taskClass; + this.asyncTaskIndexService = new AsyncTaskIndexService<>(index, clusterService, threadPool.getThreadContext(), client, + origin, i -> new StoredAsyncResponse<>(operation::readResponse, i), registry); + this.clusterService = clusterService; + this.threadPool = threadPool; + } + + public void asyncExecute(Request request, TimeValue waitForCompletionTimeout, TimeValue keepAlive, boolean keepOnCompletion, + ActionListener listener) { + String nodeId = clusterService.localNode().getId(); + @SuppressWarnings("unchecked") + T searchTask = (T) taskManager.register("transport", action + "[a]", new AsyncRequestWrapper(request, nodeId)); + boolean operationStarted = false; + try { + operation.operation(request, searchTask, + wrapStoringListener(searchTask, waitForCompletionTimeout, keepAlive, keepOnCompletion, listener)); + operationStarted = true; + } finally { + // If we didn't start operation for any reason, we need to clean up the task that we have created + if (operationStarted == false) { + taskManager.unregister(searchTask); + } + } + } + + // TODO: For tests for now, will be merged into comprehensive get operation later + T getTask(AsyncExecutionId asyncExecutionId) throws IOException { + return asyncTaskIndexService.getTask(taskManager,asyncExecutionId, taskClass); + } + + // TODO: For tests for now, will be removed when the final get operation is added + void getResponse(AsyncExecutionId id, ActionListener listener) { + asyncTaskIndexService.getResponse(id, true, ActionListener.wrap(r -> { + if (r.getException() != null) { + listener.onFailure(r.getException()); + } else { + listener.onResponse(r.getResponse()); + } + }, listener::onFailure)); + } + + private ActionListener wrapStoringListener(T searchTask, + TimeValue waitForCompletionTimeout, + TimeValue keepAlive, + boolean keepOnCompletion, + ActionListener listener) { + AtomicReference> exclusiveListener = new AtomicReference<>(listener); + // This is will performed in case of timeout + Scheduler.ScheduledCancellable timeoutHandler = threadPool.schedule(() -> { + ActionListener acquiredListener = exclusiveListener.getAndSet(null); + if (acquiredListener != null) { + acquiredListener.onResponse(operation.initialResponse(searchTask)); + } + }, waitForCompletionTimeout, ThreadPool.Names.SEARCH); + // This will be performed at the end of normal execution + return ActionListener.wrap(response -> { + ActionListener acquiredListener = exclusiveListener.getAndSet(null); + if (acquiredListener != null) { + // We finished before timeout + timeoutHandler.cancel(); + if (keepOnCompletion) { + storeResults(searchTask, + new StoredAsyncResponse<>(response, threadPool.absoluteTimeInMillis() + keepAlive.getMillis()), + ActionListener.wrap(() -> acquiredListener.onResponse(response))); + } else { + taskManager.unregister(searchTask); + acquiredListener.onResponse(response); + } + } else { + // We finished after timeout - saving results + storeResults(searchTask, new StoredAsyncResponse<>(response, threadPool.absoluteTimeInMillis() + keepAlive.getMillis())); + } + }, e -> { + ActionListener acquiredListener = exclusiveListener.getAndSet(null); + if (acquiredListener != null) { + // We finished before timeout + timeoutHandler.cancel(); + if (keepOnCompletion) { + storeResults(searchTask, + new StoredAsyncResponse<>(e, threadPool.absoluteTimeInMillis() + keepAlive.getMillis()), + ActionListener.wrap(() -> acquiredListener.onFailure(e))); + } else { + taskManager.unregister(searchTask); + acquiredListener.onFailure(e); + } + } else { + // We finished after timeout - saving exception + storeResults(searchTask, new StoredAsyncResponse<>(e, threadPool.absoluteTimeInMillis() + keepAlive.getMillis())); + } + }); + } + + private void storeResults(T searchTask, StoredAsyncResponse storedResponse) { + storeResults(searchTask, storedResponse, null); + } + + private void storeResults(T searchTask, StoredAsyncResponse storedResponse, ActionListener finalListener) { + try { + asyncTaskIndexService.createResponse(searchTask.getExecutionId().getDocId(), + threadPool.getThreadContext().getHeaders(), storedResponse, ActionListener.wrap( + // We should only unregister after the result is saved + resp -> { + taskManager.unregister(searchTask); + logger.trace(() -> new ParameterizedMessage("stored eql search results for [{}]", + searchTask.getExecutionId().getEncoded())); + if (finalListener != null) { + finalListener.onResponse(null); + } + }, + exc -> { + taskManager.unregister(searchTask); + Throwable cause = ExceptionsHelper.unwrapCause(exc); + if (cause instanceof DocumentMissingException == false && + cause instanceof VersionConflictEngineException == false) { + logger.error(() -> new ParameterizedMessage("failed to store eql search results for [{}]", + searchTask.getExecutionId().getEncoded()), exc); + } + if (finalListener != null) { + finalListener.onFailure(exc); + } + })); + } catch (Exception exc) { + taskManager.unregister(searchTask); + logger.error(() -> new ParameterizedMessage("failed to store eql search results for [{}]", + searchTask.getExecutionId().getEncoded()), exc); + + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/StoredAsyncResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/StoredAsyncResponse.java new file mode 100644 index 0000000000000..1096d9f27f487 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/StoredAsyncResponse.java @@ -0,0 +1,93 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.async; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; + +import java.io.IOException; +import java.util.Objects; + +/** + * Internal class for temporary storage of eql search results + */ +public class StoredAsyncResponse implements AsyncResponse> { + private final R response; + private final Exception exception; + private final long expirationTimeMillis; + + public StoredAsyncResponse(R response, long expirationTimeMillis) { + this(response, null, expirationTimeMillis); + } + + public StoredAsyncResponse(Exception exception, long expirationTimeMillis) { + this(null, exception, expirationTimeMillis); + } + + public StoredAsyncResponse(Writeable.Reader reader, StreamInput input) throws IOException { + expirationTimeMillis = input.readLong(); + this.response = input.readOptionalWriteable(reader); + this.exception = input.readException(); + } + + private StoredAsyncResponse(R response, Exception exception, long expirationTimeMillis) { + this.response = response; + this.exception = exception; + this.expirationTimeMillis = expirationTimeMillis; + } + + @Override + public long getExpirationTime() { + return expirationTimeMillis; + } + + @Override + public StoredAsyncResponse withExpirationTime(long expirationTimeMillis) { + return new StoredAsyncResponse<>(this.response, this.exception, expirationTimeMillis); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(expirationTimeMillis); + out.writeOptionalWriteable(response); + out.writeException(exception); + } + + public R getResponse() { + return response; + } + + public Exception getException() { + return exception; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + StoredAsyncResponse response1 = (StoredAsyncResponse) o; + if (exception != null && response1.exception != null) { + if (Objects.equals(exception.getClass(), response1.exception.getClass()) == false || + Objects.equals(exception.getMessage(), response1.exception.getMessage()) == false) { + return false; + } + } else { + if (Objects.equals(exception, response1.exception) == false) { + return false; + } + } + return expirationTimeMillis == response1.expirationTimeMillis && + Objects.equals(response, response1.response); + } + + @Override + public int hashCode() { + return Objects.hash(response, exception == null ? null : exception.getClass(), + exception == null ? null : exception.getMessage(), expirationTimeMillis); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/index/RestrictedIndicesNames.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/index/RestrictedIndicesNames.java index 2d7f660bd188e..fee71c5ad43c6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/index/RestrictedIndicesNames.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/index/RestrictedIndicesNames.java @@ -26,16 +26,20 @@ public final class RestrictedIndicesNames { public static final String ASYNC_SEARCH_PREFIX = ".async-search"; private static final Automaton ASYNC_SEARCH_AUTOMATON = Automatons.patterns(ASYNC_SEARCH_PREFIX + "*"); + public static final String ASYNC_EQL_SEARCH_PREFIX = ".async-eql-search"; + private static final Automaton ASYNC_EQL_SEARCH_AUTOMATON = Automatons.patterns(ASYNC_EQL_SEARCH_PREFIX + "*"); + // public for tests public static final Set RESTRICTED_NAMES = Collections.unmodifiableSet(Sets.newHashSet(SECURITY_MAIN_ALIAS, INTERNAL_SECURITY_MAIN_INDEX_6, INTERNAL_SECURITY_MAIN_INDEX_7, INTERNAL_SECURITY_TOKENS_INDEX_7, SECURITY_TOKENS_ALIAS)); public static boolean isRestricted(String concreteIndexName) { - return RESTRICTED_NAMES.contains(concreteIndexName) || concreteIndexName.startsWith(ASYNC_SEARCH_PREFIX); + return RESTRICTED_NAMES.contains(concreteIndexName) || concreteIndexName.startsWith(ASYNC_SEARCH_PREFIX) || + concreteIndexName.startsWith(ASYNC_EQL_SEARCH_PREFIX); } public static final Automaton NAMES_AUTOMATON = Automatons.unionAndMinimize(Arrays.asList(Automatons.patterns(RESTRICTED_NAMES), - ASYNC_SEARCH_AUTOMATON)); + ASYNC_SEARCH_AUTOMATON, ASYNC_EQL_SEARCH_AUTOMATON)); private RestrictedIndicesNames() { } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/AsyncEqlSearchUser.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/AsyncEqlSearchUser.java new file mode 100644 index 0000000000000..a690fbc7d9485 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/AsyncEqlSearchUser.java @@ -0,0 +1,53 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.security.user; + +import org.elasticsearch.xpack.core.security.authz.RoleDescriptor; +import org.elasticsearch.xpack.core.security.authz.permission.Role; +import org.elasticsearch.xpack.core.security.index.RestrictedIndicesNames; +import org.elasticsearch.xpack.core.security.support.MetadataUtils; + +public class AsyncEqlSearchUser extends User { + + public static final String NAME = UsernamesField.ASYNC_SEARCH_NAME; + public static final AsyncEqlSearchUser INSTANCE = new AsyncEqlSearchUser(); + public static final String ROLE_NAME = UsernamesField.ASYNC_SEARCH_ROLE; + public static final Role ROLE = Role.builder(new RoleDescriptor(ROLE_NAME, + null, + new RoleDescriptor.IndicesPrivileges[] { + RoleDescriptor.IndicesPrivileges.builder() + .indices(RestrictedIndicesNames.ASYNC_EQL_SEARCH_PREFIX + "*") + .privileges("all") + .allowRestrictedIndices(true).build(), + }, + null, + null, + null, + MetadataUtils.DEFAULT_RESERVED_METADATA, + null), null).build(); + + private AsyncEqlSearchUser() { + super(NAME, ROLE_NAME); + } + + @Override + public boolean equals(Object o) { + return INSTANCE == o; + } + + @Override + public int hashCode() { + return System.identityHashCode(this); + } + + public static boolean is(User user) { + return INSTANCE.equals(user); + } + + public static boolean is(String principal) { + return NAME.equals(principal); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskManagementServiceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskManagementServiceTests.java new file mode 100644 index 0000000000000..69e457c0ffd3e --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskManagementServiceTests.java @@ -0,0 +1,234 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.async; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.transport.TransportService; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +// TODO: test CRUD operations +public class AsyncTaskManagementServiceTests extends ESSingleNodeTestCase { + private ClusterService clusterService; + private TransportService transportService; + + private final ExecutorService executorService = Executors.newFixedThreadPool(1); + + + public static class TestRequest extends ActionRequest { + private final String string; + + public TestRequest(String string) { + this.string = string; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + } + + public static class TestResponse extends ActionResponse { + private final String string; + private final String id; + + public TestResponse(String string, String id) { + this.string = string; + this.id = id; + } + + public TestResponse(StreamInput input) throws IOException { + this.string = input.readOptionalString(); + this.id = input.readOptionalString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalString(string); + out.writeOptionalString(id); + } + } + + public static class TestTask extends CancellableTask implements AsyncTask { + + private final Map originHeaders; + private final AsyncExecutionId asyncExecutionId; + + public TestTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers, + Map originHeaders, AsyncExecutionId asyncExecutionId) { + super(id, type, action, description, parentTaskId, headers); + this.originHeaders = originHeaders; + this.asyncExecutionId = asyncExecutionId; + } + + @Override + public boolean shouldCancelChildrenOnCancellation() { + return true; + } + + @Override + public Map getOriginHeaders() { + return originHeaders; + } + + @Override + public AsyncExecutionId getExecutionId() { + return asyncExecutionId; + } + } + + public static class TestOperation implements AsyncTaskManagementService.AsyncOperation { + + @Override + public TestTask createTask(TestRequest request, long id, String type, String action, TaskId parentTaskId, + Map headers, Map originHeaders, AsyncExecutionId asyncExecutionId) { + return new TestTask(id, type, action, request.getDescription(), parentTaskId, headers, originHeaders, asyncExecutionId); + } + + @Override + public void operation(TestRequest request, TestTask task, ActionListener listener) { + if (request.string.equals("die")) { + listener.onFailure(new IllegalArgumentException("test exception")); + } else { + listener.onResponse(new TestResponse("response for [" + request.string + "]", task.asyncExecutionId.getEncoded())); + } + } + + @Override + public TestResponse initialResponse(TestTask task) { + return new TestResponse(null, task.asyncExecutionId.getEncoded()); + } + + @Override + public TestResponse readResponse(StreamInput inputStream) throws IOException { + return new TestResponse(inputStream); + } + } + + public String index = "test-index"; + + @Before + public void setup() { + clusterService = getInstanceFromNode(ClusterService.class); + transportService = getInstanceFromNode(TransportService.class); + } + + /** + * Shutdown the executor so we don't leak threads into other test runs. + */ + @After + public void shutdownExec() { + executorService.shutdown(); + } + + private AsyncTaskManagementService createService( + AsyncTaskManagementService.AsyncOperation operation) { + return new AsyncTaskManagementService<>(index, client(), "test_origin", writableRegistry(), + transportService.getTaskManager(), "test_action", operation, TestTask.class, clusterService, transportService.getThreadPool()); + } + + public void testReturnBeforeTimeout() throws Exception { + AsyncTaskManagementService service = createService(new TestOperation()); + boolean success = randomBoolean(); + boolean keepOnCompletion = randomBoolean(); + CountDownLatch latch = new CountDownLatch(1); + TestRequest request = new TestRequest(success ? randomAlphaOfLength(10) : "die"); + service.asyncExecute(request, TimeValue.timeValueMinutes(1), TimeValue.timeValueMinutes(10), keepOnCompletion, + ActionListener.wrap(r -> { + assertThat(success, equalTo(true)); + assertThat(r.string, equalTo("response for [" + request.string + "]")); + assertThat(r.id, notNullValue()); + latch.countDown(); + }, e -> { + assertThat(success, equalTo(false)); + assertThat(e.getMessage(), equalTo("test exception")); + latch.countDown(); + })); + assertThat(latch.await(10, TimeUnit.SECONDS), equalTo(true)); + } + + @TestLogging(value = "org.elasticsearch.xpack.core.async:trace", reason = "remove me") + public void testReturnAfterTimeout() throws Exception { + CountDownLatch executionLatch = new CountDownLatch(1); + AsyncTaskManagementService service = createService(new TestOperation() { + @Override + public void operation(TestRequest request, TestTask task, ActionListener listener) { + executorService.submit(() -> { + try { + assertThat(executionLatch.await(10, TimeUnit.SECONDS), equalTo(true)); + } catch (InterruptedException ex) { + fail("Shouldn't be here"); + } + super.operation(request, task, listener); + }); + } + }); + boolean success = randomBoolean(); + boolean keepOnCompletion = randomBoolean(); + CountDownLatch latch = new CountDownLatch(1); + TestRequest request = new TestRequest(success ? randomAlphaOfLength(10) : "die"); + AtomicReference responseHolder = new AtomicReference<>(); + service.asyncExecute(request, TimeValue.timeValueMillis(0), TimeValue.timeValueMinutes(10), keepOnCompletion, + ActionListener.wrap(r -> { + assertThat(r.string, nullValue()); + assertThat(r.id, notNullValue()); + assertThat(responseHolder.getAndSet(r), nullValue()); + latch.countDown(); + }, e -> { + fail("Shouldn't be here"); + })); + assertThat(latch.await(10, TimeUnit.SECONDS), equalTo(true)); + executionLatch.countDown(); + assertThat(responseHolder.get(), notNullValue()); + AsyncExecutionId id = AsyncExecutionId.decode(responseHolder.get().id); + assertThat(service.getTask(id), notNullValue()); + + CountDownLatch responseLatch = new CountDownLatch(1); + + // Wait until task finishes + assertBusy(() -> { + TestTask t = service.getTask(id); + logger.info(t); + assertThat(t, nullValue()); + }); + + ensureGreen(index); + logger.info("Getting the the response back"); + service.getResponse(id, ActionListener.wrap( + r -> { + assertThat(r.string, equalTo("response for [" + request.string + "]")); + responseLatch.countDown(); + }, + e -> { + assertThat(e.getMessage(), equalTo("test exception")); + responseLatch.countDown(); + })); + assertThat(responseLatch.await(10, TimeUnit.SECONDS), equalTo(true)); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/StoredAsyncResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/StoredAsyncResponseTests.java new file mode 100644 index 0000000000000..ed56514cc8a2c --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/StoredAsyncResponseTests.java @@ -0,0 +1,84 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.async; + +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.io.IOException; +import java.util.Collections; +import java.util.Objects; + +public class StoredAsyncResponseTests extends AbstractWireSerializingTestCase> { + + public static class TestResponse implements Writeable { + private final String string; + + public TestResponse(String string) { + this.string = string; + } + + public TestResponse(StreamInput input) throws IOException { + this.string = input.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(string); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TestResponse that = (TestResponse) o; + return Objects.equals(string, that.string); + } + + @Override + public int hashCode() { + return Objects.hash(string); + } + } + + @Override + protected NamedWriteableRegistry getNamedWriteableRegistry() { + SearchModule searchModule = new SearchModule(Settings.EMPTY, Collections.emptyList()); + return new NamedWriteableRegistry(searchModule.getNamedWriteables()); + } + + @Override + protected NamedXContentRegistry xContentRegistry() { + SearchModule searchModule = new SearchModule(Settings.EMPTY, Collections.emptyList()); + return new NamedXContentRegistry(searchModule.getNamedXContents()); + } + + @Override + protected StoredAsyncResponse createTestInstance() { + if (randomBoolean()) { + return new StoredAsyncResponse<>(new IllegalArgumentException(randomAlphaOfLength(10)), randomNonNegativeLong()); + } else { + return new StoredAsyncResponse<>(new TestResponse(randomAlphaOfLength(10)), randomNonNegativeLong()); + } + } + + @Override + protected Writeable.Reader> instanceReader() { + return in -> new StoredAsyncResponse<>(TestResponse::new, in); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java index 803bc234b5aac..745f3b3c2382f 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java @@ -262,9 +262,12 @@ public void testSnapshotUserRole() { } assertThat(snapshotUserRole.indices().allowedIndicesMatcher(GetIndexAction.NAME).test( RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)), is(true)); + assertThat(snapshotUserRole.indices().allowedIndicesMatcher(GetIndexAction.NAME).test( + RestrictedIndicesNames.ASYNC_EQL_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)), is(true)); assertNoAccessAllowed(snapshotUserRole, RestrictedIndicesNames.RESTRICTED_NAMES); assertNoAccessAllowed(snapshotUserRole, RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); + assertNoAccessAllowed(snapshotUserRole, RestrictedIndicesNames.ASYNC_EQL_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); } public void testIngestAdminRole() { @@ -295,6 +298,7 @@ public void testIngestAdminRole() { assertNoAccessAllowed(ingestAdminRole, RestrictedIndicesNames.RESTRICTED_NAMES); assertNoAccessAllowed(ingestAdminRole, RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); + assertNoAccessAllowed(ingestAdminRole, RestrictedIndicesNames.ASYNC_EQL_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); } public void testKibanaSystemRole() { @@ -434,6 +438,7 @@ public void testKibanaSystemRole() { assertNoAccessAllowed(kibanaRole, RestrictedIndicesNames.RESTRICTED_NAMES); assertNoAccessAllowed(kibanaRole, RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); + assertNoAccessAllowed(kibanaRole, RestrictedIndicesNames.ASYNC_EQL_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); } public void testKibanaAdminRole() { @@ -523,6 +528,7 @@ public void testKibanaUserRole() { assertNoAccessAllowed(kibanaUserRole, RestrictedIndicesNames.RESTRICTED_NAMES); assertNoAccessAllowed(kibanaUserRole, RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); + assertNoAccessAllowed(kibanaUserRole, RestrictedIndicesNames.ASYNC_EQL_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); } public void testMonitoringUserRole() { @@ -571,6 +577,7 @@ public void testMonitoringUserRole() { assertNoAccessAllowed(monitoringUserRole, RestrictedIndicesNames.RESTRICTED_NAMES); assertNoAccessAllowed(monitoringUserRole, RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); + assertNoAccessAllowed(monitoringUserRole, RestrictedIndicesNames.ASYNC_EQL_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); final String kibanaApplicationWithRandomIndex = "kibana-" + randomFrom(randomAlphaOfLengthBetween(8, 24), ".kibana"); assertThat(monitoringUserRole.application().grants( @@ -646,6 +653,7 @@ public void testRemoteMonitoringAgentRole() { assertNoAccessAllowed(remoteMonitoringAgentRole, RestrictedIndicesNames.RESTRICTED_NAMES); assertNoAccessAllowed(remoteMonitoringAgentRole, RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); + assertNoAccessAllowed(remoteMonitoringAgentRole, RestrictedIndicesNames.ASYNC_EQL_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); } public void testRemoteMonitoringCollectorRole() { @@ -697,52 +705,38 @@ public void testRemoteMonitoringCollectorRole() { // These tests might need to change if we add new non-security restricted indices that the monitoring user isn't supposed to see // (but ideally, the monitoring user should see all indices). - assertThat(remoteMonitoringAgentRole.indices().allowedIndicesMatcher(GetSettingsAction.NAME) - .test(randomFrom(RestrictedIndicesNames.RESTRICTED_NAMES)), is(true)); - assertThat(remoteMonitoringAgentRole.indices().allowedIndicesMatcher(GetSettingsAction.NAME) - .test(RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)), is(true)); - assertThat(remoteMonitoringAgentRole.indices().allowedIndicesMatcher(IndicesShardStoresAction.NAME) - .test(randomFrom(RestrictedIndicesNames.RESTRICTED_NAMES)), is(true)); - assertThat(remoteMonitoringAgentRole.indices().allowedIndicesMatcher(IndicesShardStoresAction.NAME) - .test(RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)), is(true)); - assertThat(remoteMonitoringAgentRole.indices().allowedIndicesMatcher(UpgradeStatusAction.NAME) - .test(randomFrom(RestrictedIndicesNames.RESTRICTED_NAMES)), is(true)); - assertThat(remoteMonitoringAgentRole.indices().allowedIndicesMatcher(UpgradeStatusAction.NAME) - .test(RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)), is(true)); - assertThat(remoteMonitoringAgentRole.indices().allowedIndicesMatcher(RecoveryAction.NAME) - .test(randomFrom(RestrictedIndicesNames.RESTRICTED_NAMES)), is(true)); - assertThat(remoteMonitoringAgentRole.indices().allowedIndicesMatcher(RecoveryAction.NAME) - .test(RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)), is(true)); - assertThat(remoteMonitoringAgentRole.indices().allowedIndicesMatcher(IndicesStatsAction.NAME) - .test(randomFrom(RestrictedIndicesNames.RESTRICTED_NAMES)), is(true)); - assertThat(remoteMonitoringAgentRole.indices().allowedIndicesMatcher(IndicesStatsAction.NAME) - .test(RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)), is(true)); - assertThat(remoteMonitoringAgentRole.indices().allowedIndicesMatcher(IndicesSegmentsAction.NAME) - .test(randomFrom(RestrictedIndicesNames.RESTRICTED_NAMES)), is(true)); - assertThat(remoteMonitoringAgentRole.indices().allowedIndicesMatcher(IndicesSegmentsAction.NAME) - .test(RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)), is(true)); - - assertThat(remoteMonitoringAgentRole.indices().allowedIndicesMatcher(SearchAction.NAME) - .test(randomFrom(RestrictedIndicesNames.RESTRICTED_NAMES)), is(false)); - assertThat(remoteMonitoringAgentRole.indices().allowedIndicesMatcher(SearchAction.NAME) - .test(RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)), is(false)); - assertThat(remoteMonitoringAgentRole.indices().allowedIndicesMatcher(GetAction.NAME) - .test(randomFrom(RestrictedIndicesNames.RESTRICTED_NAMES)), is(false)); - assertThat(remoteMonitoringAgentRole.indices().allowedIndicesMatcher(GetAction.NAME) - .test(RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)), is(false)); - assertThat(remoteMonitoringAgentRole.indices().allowedIndicesMatcher(DeleteAction.NAME) - .test(randomFrom(RestrictedIndicesNames.RESTRICTED_NAMES)), is(false)); - assertThat(remoteMonitoringAgentRole.indices().allowedIndicesMatcher(DeleteAction.NAME) - .test(RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)), is(false)); - assertThat(remoteMonitoringAgentRole.indices().allowedIndicesMatcher(IndexAction.NAME) - .test(randomFrom(RestrictedIndicesNames.RESTRICTED_NAMES)), is(false)); - assertThat(remoteMonitoringAgentRole.indices().allowedIndicesMatcher(IndexAction.NAME) - .test(RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)), is(false)); - + for (String index : new String[] { + randomFrom(RestrictedIndicesNames.RESTRICTED_NAMES), + RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2), + RestrictedIndicesNames.ASYNC_EQL_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2) + }) { + assertThat(remoteMonitoringAgentRole.indices().allowedIndicesMatcher(GetSettingsAction.NAME) + .test(index), is(true)); + assertThat(remoteMonitoringAgentRole.indices().allowedIndicesMatcher(IndicesShardStoresAction.NAME) + .test(index), is(true)); + assertThat(remoteMonitoringAgentRole.indices().allowedIndicesMatcher(UpgradeStatusAction.NAME) + .test(index), is(true)); + assertThat(remoteMonitoringAgentRole.indices().allowedIndicesMatcher(RecoveryAction.NAME) + .test(index), is(true)); + assertThat(remoteMonitoringAgentRole.indices().allowedIndicesMatcher(IndicesStatsAction.NAME) + .test(index), is(true)); + assertThat(remoteMonitoringAgentRole.indices().allowedIndicesMatcher(IndicesSegmentsAction.NAME) + .test(index), is(true)); + + assertThat(remoteMonitoringAgentRole.indices().allowedIndicesMatcher(SearchAction.NAME) + .test(index), is(false)); + assertThat(remoteMonitoringAgentRole.indices().allowedIndicesMatcher(GetAction.NAME) + .test(index), is(false)); + assertThat(remoteMonitoringAgentRole.indices().allowedIndicesMatcher(DeleteAction.NAME) + .test(index), is(false)); + assertThat(remoteMonitoringAgentRole.indices().allowedIndicesMatcher(IndexAction.NAME) + .test(index), is(false)); + } assertMonitoringOnRestrictedIndices(remoteMonitoringAgentRole); assertNoAccessAllowed(remoteMonitoringAgentRole, RestrictedIndicesNames.RESTRICTED_NAMES); assertNoAccessAllowed(remoteMonitoringAgentRole, RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); + assertNoAccessAllowed(remoteMonitoringAgentRole, RestrictedIndicesNames.ASYNC_EQL_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); } private void assertMonitoringOnRestrictedIndices(Role role) { @@ -812,6 +806,7 @@ public void testReportingUserRole() { assertNoAccessAllowed(reportingUserRole, RestrictedIndicesNames.RESTRICTED_NAMES); assertNoAccessAllowed(reportingUserRole, RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); + assertNoAccessAllowed(reportingUserRole, RestrictedIndicesNames.ASYNC_EQL_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); } public void testKibanaDashboardOnlyUserRole() { @@ -850,6 +845,7 @@ public void testKibanaDashboardOnlyUserRole() { assertNoAccessAllowed(dashboardsOnlyUserRole, RestrictedIndicesNames.RESTRICTED_NAMES); assertNoAccessAllowed(dashboardsOnlyUserRole, RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); + assertNoAccessAllowed(dashboardsOnlyUserRole, RestrictedIndicesNames.ASYNC_EQL_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); } public void testSuperuserRole() { @@ -951,6 +947,7 @@ public void testLogstashSystemRole() { assertNoAccessAllowed(logstashSystemRole, RestrictedIndicesNames.RESTRICTED_NAMES); assertNoAccessAllowed(logstashSystemRole, RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); + assertNoAccessAllowed(logstashSystemRole, RestrictedIndicesNames.ASYNC_EQL_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); } public void testBeatsAdminRole() { @@ -992,6 +989,7 @@ public void testBeatsAdminRole() { assertNoAccessAllowed(beatsAdminRole, RestrictedIndicesNames.RESTRICTED_NAMES); assertNoAccessAllowed(beatsAdminRole, RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); + assertNoAccessAllowed(beatsAdminRole, RestrictedIndicesNames.ASYNC_EQL_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); } public void testBeatsSystemRole() { @@ -1028,6 +1026,7 @@ public void testBeatsSystemRole() { assertNoAccessAllowed(beatsSystemRole, RestrictedIndicesNames.RESTRICTED_NAMES); assertNoAccessAllowed(beatsSystemRole, RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); + assertNoAccessAllowed(beatsSystemRole, RestrictedIndicesNames.ASYNC_EQL_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); } public void testAPMSystemRole() { @@ -1068,7 +1067,7 @@ public void testAPMSystemRole() { "indices:data/write/index:op_type/" + randomAlphaOfLengthBetween(3,5)).test(index), is(false)); assertNoAccessAllowed(APMSystemRole, RestrictedIndicesNames.RESTRICTED_NAMES); - assertNoAccessAllowed(APMSystemRole, RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); + assertNoAccessAllowed(APMSystemRole, RestrictedIndicesNames.ASYNC_EQL_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); } public void testAPMUserRole() { @@ -1163,6 +1162,7 @@ public void testMachineLearningAdminRole() { assertNoAccessAllowed(role, RestrictedIndicesNames.RESTRICTED_NAMES); assertNoAccessAllowed(role, RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); + assertNoAccessAllowed(role, RestrictedIndicesNames.ASYNC_EQL_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); final String kibanaApplicationWithRandomIndex = "kibana-" + randomFrom(randomAlphaOfLengthBetween(8, 24), ".kibana"); assertThat(role.application().grants( @@ -1250,6 +1250,7 @@ public void testMachineLearningUserRole() { assertNoAccessAllowed(role, RestrictedIndicesNames.RESTRICTED_NAMES); assertNoAccessAllowed(role, RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); + assertNoAccessAllowed(role, RestrictedIndicesNames.ASYNC_EQL_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); final String kibanaApplicationWithRandomIndex = "kibana-" + randomFrom(randomAlphaOfLengthBetween(8, 24), ".kibana"); @@ -1297,7 +1298,7 @@ public void testTransformAdminRole() { assertNoAccessAllowed(role, TransformInternalIndexConstants.LATEST_INDEX_NAME); // internal use only assertNoAccessAllowed(role, RestrictedIndicesNames.RESTRICTED_NAMES); - assertNoAccessAllowed(role, RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); + assertNoAccessAllowed(role, RestrictedIndicesNames.ASYNC_EQL_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); final String kibanaApplicationWithRandomIndex = "kibana-" + randomFrom(randomAlphaOfLengthBetween(8, 24), ".kibana"); assertThat(role.application().grants( @@ -1350,7 +1351,7 @@ public void testDataFrameTransformsUserRole() { assertNoAccessAllowed(role, TransformInternalIndexConstants.LATEST_INDEX_NAME); assertNoAccessAllowed(role, RestrictedIndicesNames.RESTRICTED_NAMES); - assertNoAccessAllowed(role, RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); + assertNoAccessAllowed(role, RestrictedIndicesNames.ASYNC_EQL_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); final String kibanaApplicationWithRandomIndex = "kibana-" + randomFrom(randomAlphaOfLengthBetween(8, 24), ".kibana"); assertThat(role.application().grants( @@ -1401,7 +1402,7 @@ public void testWatcherAdminRole() { } assertNoAccessAllowed(role, RestrictedIndicesNames.RESTRICTED_NAMES); - assertNoAccessAllowed(role, RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); + assertNoAccessAllowed(role, RestrictedIndicesNames.ASYNC_EQL_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); } public void testWatcherUserRole() { @@ -1435,7 +1436,7 @@ public void testWatcherUserRole() { } assertNoAccessAllowed(role, RestrictedIndicesNames.RESTRICTED_NAMES); - assertNoAccessAllowed(role, RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); + assertNoAccessAllowed(role, RestrictedIndicesNames.ASYNC_EQL_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); } private void assertReadWriteDocsButNotDeleteIndexAllowed(Role role, String index) { @@ -1460,7 +1461,7 @@ private void assertOnlyReadAllowed(Role role, String index) { assertThat(role.indices().allowedIndicesMatcher(BulkAction.NAME).test(index), is(false)); assertNoAccessAllowed(role, RestrictedIndicesNames.RESTRICTED_NAMES); - assertNoAccessAllowed(role, RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); + assertNoAccessAllowed(role, RestrictedIndicesNames.ASYNC_EQL_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)); } private void assertNoAccessAllowed(Role role, Collection indices) { diff --git a/x-pack/plugin/eql/qa/rest/src/test/resources/rest-api-spec/test/eql/10_basic.yml b/x-pack/plugin/eql/qa/rest/src/test/resources/rest-api-spec/test/eql/10_basic.yml index 79610f784c670..ca8197e57dd94 100644 --- a/x-pack/plugin/eql/qa/rest/src/test/resources/rest-api-spec/test/eql/10_basic.yml +++ b/x-pack/plugin/eql/qa/rest/src/test/resources/rest-api-spec/test/eql/10_basic.yml @@ -26,3 +26,16 @@ setup: - match: {hits.total.relation: "eq"} - match: {hits.events.0._source.user: "SYSTEM"} +--- +"Execute some EQL in async mode": + - do: + eql.search: + index: eql_test + wait_for_completion_timeout: "0ms" + body: + query: "process where user = 'SYSTEM'" + + - match: {is_running: true} + - match: {is_partial: true} + - is_true: id + diff --git a/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AbstractEqlBlockingIntegTestCase.java b/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AbstractEqlBlockingIntegTestCase.java new file mode 100644 index 0000000000000..e8e9f7aa5ebac --- /dev/null +++ b/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AbstractEqlBlockingIntegTestCase.java @@ -0,0 +1,221 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.eql.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction; +import org.elasticsearch.action.support.ActionFilter; +import org.elasticsearch.action.support.ActionFilterChain; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexModule; +import org.elasticsearch.index.shard.SearchOperationListener; +import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.PluginsService; +import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.test.ESIntegTestCase; + +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; + +/** + * IT tests that can block EQL execution at different places + */ +@ESIntegTestCase.ClusterScope(scope = SUITE, numDataNodes = 0, numClientNodes = 0, maxNumDataNodes = 0) +public abstract class AbstractEqlBlockingIntegTestCase extends AbstractEqlIntegTestCase { + + protected List initBlockFactory(boolean searchBlock, boolean fieldCapsBlock) { + List plugins = new ArrayList<>(); + for (PluginsService pluginsService : internalCluster().getInstances(PluginsService.class)) { + plugins.addAll(pluginsService.filterPlugins(SearchBlockPlugin.class)); + } + for (SearchBlockPlugin plugin : plugins) { + plugin.reset(); + if (searchBlock) { + plugin.enableSearchBlock(); + } + if (fieldCapsBlock) { + plugin.enableFieldCapBlock(); + } + } + return plugins; + } + + protected void disableBlocks(List plugins) { + for (SearchBlockPlugin plugin : plugins) { + plugin.disableSearchBlock(); + plugin.disableFieldCapBlock(); + } + } + + protected void awaitForBlockedSearches(List plugins, String index) throws Exception { + int numberOfShards = getNumShards(index).numPrimaries; + assertBusy(() -> { + int numberOfBlockedPlugins = getNumberOfContexts(plugins); + logger.trace("The plugin blocked on {} out of {} shards", numberOfBlockedPlugins, numberOfShards); + assertThat(numberOfBlockedPlugins, greaterThan(0)); + }); + } + + protected int getNumberOfContexts(List plugins) throws Exception { + int count = 0; + for (SearchBlockPlugin plugin : plugins) { + count += plugin.contexts.get(); + } + return count; + } + + protected int getNumberOfFieldCaps(List plugins) throws Exception { + int count = 0; + for (SearchBlockPlugin plugin : plugins) { + count += plugin.fieldCaps.get(); + } + return count; + } + + protected void awaitForBlockedFieldCaps(List plugins) throws Exception { + assertBusy(() -> { + int numberOfBlockedPlugins = getNumberOfFieldCaps(plugins); + logger.trace("The plugin blocked on {} nodes", numberOfBlockedPlugins); + assertThat(numberOfBlockedPlugins, greaterThan(0)); + }); + } + + public static class SearchBlockPlugin extends Plugin implements ActionPlugin { + protected final Logger logger = LogManager.getLogger(getClass()); + + private final AtomicInteger contexts = new AtomicInteger(); + + private final AtomicInteger fieldCaps = new AtomicInteger(); + + private final AtomicBoolean shouldBlockOnSearch = new AtomicBoolean(false); + + private final AtomicBoolean shouldBlockOnFieldCapabilities = new AtomicBoolean(false); + + private final String nodeId; + + public void reset() { + contexts.set(0); + fieldCaps.set(0); + } + + public void disableSearchBlock() { + shouldBlockOnSearch.set(false); + } + + public void enableSearchBlock() { + shouldBlockOnSearch.set(true); + } + + + public void disableFieldCapBlock() { + shouldBlockOnFieldCapabilities.set(false); + } + + public void enableFieldCapBlock() { + shouldBlockOnFieldCapabilities.set(true); + } + + public SearchBlockPlugin(Settings settings, Path configPath) throws Exception { + nodeId = settings.get("node.name"); + } + + @Override + public void onIndexModule(IndexModule indexModule) { + super.onIndexModule(indexModule); + indexModule.addSearchOperationListener(new SearchOperationListener() { + @Override + public void onNewContext(SearchContext context) { + contexts.incrementAndGet(); + try { + logger.trace("blocking search on " + nodeId); + assertBusy(() -> assertFalse(shouldBlockOnSearch.get())); + logger.trace("unblocking search on " + nodeId); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + } + + @Override + public List getActionFilters() { + List list = new ArrayList<>(); + list.add(new ActionFilter() { + @Override + public int order() { + return 0; + } + + @Override + public void apply( + Task task, String action, Request request, ActionListener listener, + ActionFilterChain chain) { + if (action.equals(FieldCapabilitiesAction.NAME)) { + try { + fieldCaps.incrementAndGet(); + logger.trace("blocking field caps on " + nodeId); + assertBusy(() -> assertFalse(shouldBlockOnFieldCapabilities.get())); + logger.trace("unblocking field caps on " + nodeId); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + chain.proceed(task, action, request, listener); + } + }); + return list; + } + } + + @Override + protected Collection> nodePlugins() { + List> plugins = new ArrayList<>(super.nodePlugins()); + plugins.add(SearchBlockPlugin.class); + return plugins; + } + + protected TaskId findTaskWithXOpaqueId(String id, String action) { + ListTasksResponse tasks = client().admin().cluster().prepareListTasks().setActions(action).get(); + for (TaskInfo task : tasks.getTasks()) { + if (id.equals(task.getHeaders().get(Task.X_OPAQUE_ID))) { + return task.getTaskId(); + } + } + return null; + } + + protected TaskId cancelTaskWithXOpaqueId(String id, String action) { + TaskId taskId = findTaskWithXOpaqueId(id, action); + assertNotNull(taskId); + logger.trace("Cancelling task " + taskId); + CancelTasksResponse response = client().admin().cluster().prepareCancelTasks().setTaskId(taskId).get(); + assertThat(response.getTasks(), hasSize(1)); + assertThat(response.getTasks().get(0).getAction(), equalTo(action)); + logger.trace("Task is cancelled " + taskId); + return taskId; + } + +} diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/action/AbstractEqlIntegTestCase.java b/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AbstractEqlIntegTestCase.java similarity index 100% rename from x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/action/AbstractEqlIntegTestCase.java rename to x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AbstractEqlIntegTestCase.java diff --git a/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AsyncEqlSearchActionIT.java b/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AsyncEqlSearchActionIT.java new file mode 100644 index 0000000000000..9152cc2d94c52 --- /dev/null +++ b/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AsyncEqlSearchActionIT.java @@ -0,0 +1,250 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.eql.action; + +import org.elasticsearch.Version; +import org.elasticsearch.action.NoShardAvailableActionException; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.ByteBufferStreamInput; +import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.script.MockScriptPlugin; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.xpack.core.async.AsyncExecutionId; +import org.elasticsearch.xpack.core.async.StoredAsyncResponse; +import org.elasticsearch.xpack.eql.plugin.EqlPlugin; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.junit.After; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Base64; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Function; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class AsyncEqlSearchActionIT extends AbstractEqlBlockingIntegTestCase { + + private final ExecutorService executorService = Executors.newFixedThreadPool(1); + + NamedWriteableRegistry registry = new NamedWriteableRegistry(new SearchModule(Settings.EMPTY, + Collections.emptyList()).getNamedWriteables()); + + /** + * Shutdown the executor so we don't leak threads into other test runs. + */ + @After + public void shutdownExec() { + executorService.shutdown(); + } + + private void prepareIndex() throws Exception { + assertAcked(client().admin().indices().prepareCreate("test") + .setMapping("val", "type=integer", "event_type", "type=keyword", "@timestamp", "type=date", "i", "type=integer") + .get()); + createIndex("idx_unmapped"); + + int numDocs = randomIntBetween(6, 20); + + List builders = new ArrayList<>(); + + for (int i = 0; i < numDocs; i++) { + int fieldValue = randomIntBetween(0, 10); + builders.add(client().prepareIndex("test").setSource( + jsonBuilder().startObject() + .field("val", fieldValue) + .field("event_type", "my_event") + .field("@timestamp", "2020-04-09T12:35:48Z") + .field("i", i) + .endObject())); + } + indexRandom(true, builders); + } + + + public void testGoingAsync() throws Exception { + prepareIndex(); + + boolean success = randomBoolean(); + String query = success ? "my_event where i=1" : "my_event where 10/i=1"; + EqlSearchRequest request = new EqlSearchRequest().indices("test").query(query).eventCategoryField("event_type") + .waitForCompletionTimeout(TimeValue.timeValueMillis(1)); + + boolean customKeepAlive = randomBoolean(); + TimeValue keepAliveValue; + if (customKeepAlive) { + keepAliveValue = TimeValue.parseTimeValue(randomTimeValue(1, 5, "d"), "test"); + request.keepAlive(keepAliveValue); + } else { + keepAliveValue = EqlSearchRequest.DEFAULT_KEEP_ALIVE; + } + + List plugins = initBlockFactory(true, false); + + String opaqueId = randomAlphaOfLength(10); + logger.trace("Starting async search"); + EqlSearchResponse response = client().filterWithHeader(Collections.singletonMap(Task.X_OPAQUE_ID, opaqueId)) + .execute(EqlSearchAction.INSTANCE, request).get(); + assertThat(response.isRunning(), is(true)); + assertThat(response.isPartial(), is(true)); + assertThat(response.id(), notNullValue()); + + logger.trace("Waiting for block to be established"); + awaitForBlockedSearches(plugins, "test"); + logger.trace("Block is established"); + + String id = response.id(); + TaskId taskId = findTaskWithXOpaqueId(opaqueId, EqlSearchAction.NAME + "[a]"); + assertThat(taskId, notNullValue()); + + disableBlocks(plugins); + + assertBusy(() -> assertThat(findTaskWithXOpaqueId(opaqueId, EqlSearchAction.NAME + "[a]"), nullValue())); + StoredAsyncResponse doc = getStoredRecord(id); + // Make sure that the expiration time is not more than 1 min different from the current time + keep alive + assertThat(System.currentTimeMillis() + keepAliveValue.getMillis() - doc.getExpirationTime(), + lessThan(doc.getExpirationTime() + TimeValue.timeValueMinutes(1).getMillis())); + if (success) { + assertThat(doc.getException(), nullValue()); + assertThat(doc.getResponse(), notNullValue()); + assertThat(doc.getResponse().hits().events().size(), equalTo(1)); + } else { + assertThat(doc.getException(), notNullValue()); + assertThat(doc.getResponse(), nullValue()); + assertThat(doc.getException().getCause().getMessage(), containsString("by zero")); + } + } + + public void testFinishingBeforeTimeout() throws Exception { + prepareIndex(); + + boolean success = randomBoolean(); + boolean keepOnCompletion = randomBoolean(); + String query = success ? "my_event where i=1" : "my_event where 10/i=1"; + EqlSearchRequest request = new EqlSearchRequest().indices("test").query(query).eventCategoryField("event_type") + .waitForCompletionTimeout(TimeValue.timeValueSeconds(10)); + if (keepOnCompletion || randomBoolean()) { + request.keepOnCompletion(keepOnCompletion); + } + + if (success) { + EqlSearchResponse response = client().execute(EqlSearchAction.INSTANCE, request).get(); + assertThat(response.isRunning(), is(false)); + assertThat(response.isPartial(), is(false)); + assertThat(response.id(), notNullValue()); + assertThat(response.hits().events().size(), equalTo(1)); + if (keepOnCompletion) { + StoredAsyncResponse doc = getStoredRecord(response.id()); + assertThat(doc, notNullValue()); + assertThat(doc.getException(), nullValue()); + assertThat(doc.getResponse(), notNullValue()); + assertThat(doc.getResponse().hits().events().size(), equalTo(1)); + } + } else { + Exception ex = expectThrows(Exception.class, + () -> client().execute(EqlSearchAction.INSTANCE, request).get()); + assertThat(ex.getMessage(), containsString("by zero")); + } + + + } + + public StoredAsyncResponse getStoredRecord(String id) throws Exception { + try { + GetResponse doc = client().prepareGet(EqlPlugin.INDEX, AsyncExecutionId.decode(id).getDocId()).get(); + if (doc.isExists()) { + String value = doc.getSource().get("result").toString(); + try (ByteBufferStreamInput buf = new ByteBufferStreamInput(ByteBuffer.wrap(Base64.getDecoder().decode(value)))) { + try (StreamInput in = new NamedWriteableAwareStreamInput(buf, registry)) { + in.setVersion(Version.readVersion(in)); + return new StoredAsyncResponse<>(EqlSearchResponse::new, in); + } + } + } + } catch (IndexNotFoundException | NoShardAvailableActionException ex) { + return null; + } + return null; + } + + public static org.hamcrest.Matcher eqlSearchResponseMatcherEqualTo(EqlSearchResponse eqlSearchResponse) { + return new BaseMatcher<>() { + + @Override + public void describeTo(Description description) { + description.appendText(Strings.toString(eqlSearchResponse)); + } + + @Override + public boolean matches(Object o) { + if (eqlSearchResponse == o) { + return true; + } + if (o == null || EqlSearchResponse.class != o.getClass()) { + return false; + } + EqlSearchResponse that = (EqlSearchResponse) o; + // We don't compare took since it might deffer + return Objects.equals(eqlSearchResponse.hits(), that.hits()) + && Objects.equals(eqlSearchResponse.isTimeout(), that.isTimeout()); + } + }; + } + + public static class FakePainlessScriptPlugin extends MockScriptPlugin { + + @Override + protected Map, Object>> pluginScripts() { + Map, Object>> scripts = new HashMap<>(); + scripts.put("InternalQlScriptUtils.nullSafeFilter(InternalQlScriptUtils.eq(InternalQlScriptUtils.div(" + + "params.v0,InternalQlScriptUtils.docValue(doc,params.v1)),params.v2))", FakePainlessScriptPlugin::fail); + return scripts; + } + + public static Object fail(Map arg) { + throw new ArithmeticException("Division by zero"); + } + + public String pluginScriptLang() { + // Faking painless + return "painless"; + } + } + + @Override + protected Collection> nodePlugins() { + List> plugins = new ArrayList<>(super.nodePlugins()); + plugins.add(FakePainlessScriptPlugin.class); + return plugins; + } + +} diff --git a/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/EqlCancellationIT.java b/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/EqlCancellationIT.java index 37ac0022bb201..abe3dd0929535 100644 --- a/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/EqlCancellationIT.java +++ b/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/EqlCancellationIT.java @@ -6,50 +6,26 @@ package org.elasticsearch.xpack.eql.action; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; -import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; -import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchPhaseExecutionException; -import org.elasticsearch.action.support.ActionFilter; -import org.elasticsearch.action.support.ActionFilterChain; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.IndexModule; -import org.elasticsearch.index.shard.SearchOperationListener; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.plugins.PluginsService; -import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskCancelledException; -import org.elasticsearch.tasks.TaskId; -import org.elasticsearch.tasks.TaskInfo; import org.junit.After; -import java.nio.file.Path; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; -public class EqlCancellationIT extends AbstractEqlIntegTestCase { +public class EqlCancellationIT extends AbstractEqlBlockingIntegTestCase { private final ExecutorService executorService = Executors.newFixedThreadPool(1); @@ -96,20 +72,8 @@ public void testCancellation() throws Exception { awaitForBlockedFieldCaps(plugins); } logger.trace("Block is established"); - ListTasksResponse tasks = client().admin().cluster().prepareListTasks().setActions(EqlSearchAction.NAME).get(); - TaskId taskId = null; - for (TaskInfo task : tasks.getTasks()) { - if (id.equals(task.getHeaders().get(Task.X_OPAQUE_ID))) { - taskId = task.getTaskId(); - break; - } - } - assertNotNull(taskId); - logger.trace("Cancelling task " + taskId); - CancelTasksResponse response = client().admin().cluster().prepareCancelTasks().setTaskId(taskId).get(); - assertThat(response.getTasks(), hasSize(1)); - assertThat(response.getTasks().get(0).getAction(), equalTo(EqlSearchAction.NAME)); - logger.trace("Task is cancelled " + taskId); + cancelTaskWithXOpaqueId(id, EqlSearchAction.NAME); + disableBlocks(plugins); Exception exception = expectThrows(Exception.class, future::get); Throwable inner = ExceptionsHelper.unwrap(exception, SearchPhaseExecutionException.class); @@ -126,155 +90,4 @@ public void testCancellation() throws Exception { assertNotNull(cancellationException); } } - - private List initBlockFactory(boolean searchBlock, boolean fieldCapsBlock) { - List plugins = new ArrayList<>(); - for (PluginsService pluginsService : internalCluster().getDataNodeInstances(PluginsService.class)) { - plugins.addAll(pluginsService.filterPlugins(SearchBlockPlugin.class)); - } - for (SearchBlockPlugin plugin : plugins) { - plugin.reset(); - if (searchBlock) { - plugin.enableSearchBlock(); - } - if (fieldCapsBlock) { - plugin.enableFieldCapBlock(); - } - } - return plugins; - } - - private void disableBlocks(List plugins) { - for (SearchBlockPlugin plugin : plugins) { - plugin.disableSearchBlock(); - plugin.disableFieldCapBlock(); - } - } - - private void awaitForBlockedSearches(List plugins, String index) throws Exception { - int numberOfShards = getNumShards(index).numPrimaries; - assertBusy(() -> { - int numberOfBlockedPlugins = getNumberOfContexts(plugins); - logger.trace("The plugin blocked on {} out of {} shards", numberOfBlockedPlugins, numberOfShards); - assertThat(numberOfBlockedPlugins, greaterThan(0)); - }); - } - - private int getNumberOfContexts(List plugins) throws Exception { - int count = 0; - for (SearchBlockPlugin plugin : plugins) { - count += plugin.contexts.get(); - } - return count; - } - - private int getNumberOfFieldCaps(List plugins) throws Exception { - int count = 0; - for (SearchBlockPlugin plugin : plugins) { - count += plugin.fieldCaps.get(); - } - return count; - } - - private void awaitForBlockedFieldCaps(List plugins) throws Exception { - assertBusy(() -> { - int numberOfBlockedPlugins = getNumberOfFieldCaps(plugins); - logger.trace("The plugin blocked on {} nodes", numberOfBlockedPlugins); - assertThat(numberOfBlockedPlugins, greaterThan(0)); - }); - } - - public static class SearchBlockPlugin extends LocalStateEQLXPackPlugin { - protected final Logger logger = LogManager.getLogger(getClass()); - - private final AtomicInteger contexts = new AtomicInteger(); - - private final AtomicInteger fieldCaps = new AtomicInteger(); - - private final AtomicBoolean shouldBlockOnSearch = new AtomicBoolean(false); - - private final AtomicBoolean shouldBlockOnFieldCapabilities = new AtomicBoolean(false); - - private final String nodeId; - - public void reset() { - contexts.set(0); - fieldCaps.set(0); - } - - public void disableSearchBlock() { - shouldBlockOnSearch.set(false); - } - - public void enableSearchBlock() { - shouldBlockOnSearch.set(true); - } - - - public void disableFieldCapBlock() { - shouldBlockOnFieldCapabilities.set(false); - } - - public void enableFieldCapBlock() { - shouldBlockOnFieldCapabilities.set(true); - } - - public SearchBlockPlugin(Settings settings, Path configPath) throws Exception { - super(settings, configPath); - nodeId = settings.get("node.name"); - } - - @Override - public void onIndexModule(IndexModule indexModule) { - super.onIndexModule(indexModule); - indexModule.addSearchOperationListener(new SearchOperationListener() { - @Override - public void onNewContext(SearchContext context) { - contexts.incrementAndGet(); - try { - logger.trace("blocking search on " + nodeId); - assertBusy(() -> assertFalse(shouldBlockOnSearch.get())); - logger.trace("unblocking search on " + nodeId); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }); - } - - @Override - public List getActionFilters() { - List list = new ArrayList<>(super.getActionFilters()); - list.add(new ActionFilter() { - @Override - public int order() { - return 0; - } - - @Override - public void apply( - Task task, String action, Request request, ActionListener listener, - ActionFilterChain chain) { - if (action.equals(FieldCapabilitiesAction.NAME)) { - try { - fieldCaps.incrementAndGet(); - logger.trace("blocking field caps on " + nodeId); - assertBusy(() -> assertFalse(shouldBlockOnFieldCapabilities.get())); - logger.trace("unblocking field caps on " + nodeId); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - chain.proceed(task, action, request, listener); - } - }); - return list; - } - } - - @Override - protected Collection> nodePlugins() { - return Collections.singletonList(SearchBlockPlugin.class); - } - } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequest.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequest.java index ead9e5722c39d..5761e088f2914 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequest.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequest.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.eql.action; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; @@ -13,6 +14,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -32,11 +34,14 @@ import static org.elasticsearch.action.ValidateActions.addValidationError; import static org.elasticsearch.xpack.eql.action.RequestDefaults.FETCH_SIZE; import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_EVENT_CATEGORY; -import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_TIMESTAMP; import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_IMPLICIT_JOIN_KEY; +import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_TIMESTAMP; public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Replaceable, ToXContent { + public static long MIN_KEEP_ALIVE = TimeValue.timeValueMinutes(1).millis(); + public static TimeValue DEFAULT_KEEP_ALIVE = TimeValue.timeValueDays(5); + private String[] indices; private IndicesOptions indicesOptions = IndicesOptions.fromOptions(false, false, true, false); @@ -50,6 +55,11 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re private String query; private boolean isCaseSensitive = false; + // Async settings + private TimeValue waitForCompletionTimeout = null; + private TimeValue keepAlive = DEFAULT_KEEP_ALIVE; + private boolean keepOnCompletion; + static final String KEY_FILTER = "filter"; static final String KEY_TIMESTAMP_FIELD = "timestamp_field"; static final String KEY_EVENT_CATEGORY_FIELD = "event_category_field"; @@ -57,6 +67,9 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re static final String KEY_SIZE = "size"; static final String KEY_SEARCH_AFTER = "search_after"; static final String KEY_QUERY = "query"; + static final String KEY_WAIT_FOR_COMPLETION_TIMEOUT = "wait_for_completion_timeout"; + static final String KEY_KEEP_ALIVE = "keep_alive"; + static final String KEY_KEEP_ON_COMPLETION = "keep_on_completion"; static final String KEY_CASE_SENSITIVE = "case_sensitive"; static final ParseField FILTER = new ParseField(KEY_FILTER); @@ -66,6 +79,9 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re static final ParseField SIZE = new ParseField(KEY_SIZE); static final ParseField SEARCH_AFTER = new ParseField(KEY_SEARCH_AFTER); static final ParseField QUERY = new ParseField(KEY_QUERY); + static final ParseField WAIT_FOR_COMPLETION_TIMEOUT = new ParseField(KEY_WAIT_FOR_COMPLETION_TIMEOUT); + static final ParseField KEEP_ALIVE = new ParseField(KEY_KEEP_ALIVE); + static final ParseField KEEP_ON_COMPLETION = new ParseField(KEY_KEEP_ON_COMPLETION); static final ParseField CASE_SENSITIVE = new ParseField(KEY_CASE_SENSITIVE); private static final ObjectParser PARSER = objectParser(EqlSearchRequest::new); @@ -85,6 +101,11 @@ public EqlSearchRequest(StreamInput in) throws IOException { fetchSize = in.readVInt(); searchAfterBuilder = in.readOptionalWriteable(SearchAfterBuilder::new); query = in.readString(); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { // TODO: Remove after backport + this.waitForCompletionTimeout = in.readOptionalTimeValue(); + this.keepAlive = in.readOptionalTimeValue(); + this.keepOnCompletion = in.readBoolean(); + } isCaseSensitive = in.readBoolean(); } @@ -127,6 +148,11 @@ public ActionRequestValidationException validate() { validationException = addValidationError("size must be greater than 0", validationException); } + if (keepAlive != null && keepAlive.getMillis() < MIN_KEEP_ALIVE) { + validationException = + addValidationError("[keep_alive] must be greater than 1 minute, got:" + keepAlive.toString(), validationException); + } + return validationException; } @@ -147,6 +173,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } builder.field(KEY_QUERY, query); + if (waitForCompletionTimeout != null) { + builder.field(KEY_WAIT_FOR_COMPLETION_TIMEOUT, waitForCompletionTimeout); + } + if (keepAlive != null) { + builder.field(KEY_KEEP_ALIVE, keepAlive); + } + builder.field(KEY_KEEP_ON_COMPLETION, keepOnCompletion); builder.field(KEY_CASE_SENSITIVE, isCaseSensitive); return builder; @@ -167,6 +200,12 @@ protected static ObjectParser objectParser parser.declareField(EqlSearchRequest::setSearchAfter, SearchAfterBuilder::fromXContent, SEARCH_AFTER, ObjectParser.ValueType.OBJECT_ARRAY); parser.declareString(EqlSearchRequest::query, QUERY); + parser.declareField(EqlSearchRequest::waitForCompletionTimeout, + (p, c) -> TimeValue.parseTimeValue(p.text(), KEY_WAIT_FOR_COMPLETION_TIMEOUT), WAIT_FOR_COMPLETION_TIMEOUT, + ObjectParser.ValueType.VALUE); + parser.declareField(EqlSearchRequest::keepAlive, + (p, c) -> TimeValue.parseTimeValue(p.text(), KEY_KEEP_ALIVE), KEEP_ALIVE, ObjectParser.ValueType.VALUE); + parser.declareBoolean(EqlSearchRequest::keepOnCompletion, KEEP_ON_COMPLETION); parser.declareBoolean(EqlSearchRequest::isCaseSensitive, CASE_SENSITIVE); return parser; } @@ -236,6 +275,33 @@ public EqlSearchRequest query(String query) { return this; } + public TimeValue waitForCompletionTimeout() { + return waitForCompletionTimeout; + } + + public EqlSearchRequest waitForCompletionTimeout(TimeValue waitForCompletionTimeout) { + this.waitForCompletionTimeout = waitForCompletionTimeout; + return this; + } + + public TimeValue keepAlive() { + return keepAlive; + } + + public EqlSearchRequest keepAlive(TimeValue keepAlive) { + this.keepAlive = keepAlive; + return this; + } + + public boolean keepOnCompletion() { + return keepOnCompletion; + } + + public EqlSearchRequest keepOnCompletion(boolean keepOnCompletion) { + this.keepOnCompletion = keepOnCompletion; + return this; + } + public boolean isCaseSensitive() { return this.isCaseSensitive; } public EqlSearchRequest isCaseSensitive(boolean isCaseSensitive) { @@ -255,6 +321,11 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVInt(fetchSize); out.writeOptionalWriteable(searchAfterBuilder); out.writeString(query); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { // TODO: Remove after backport + out.writeOptionalTimeValue(waitForCompletionTimeout); + out.writeOptionalTimeValue(keepAlive); + out.writeBoolean(keepOnCompletion); + } out.writeBoolean(isCaseSensitive); } @@ -276,6 +347,8 @@ public boolean equals(Object o) { Objects.equals(implicitJoinKeyField, that.implicitJoinKeyField) && Objects.equals(searchAfterBuilder, that.searchAfterBuilder) && Objects.equals(query, that.query) && + Objects.equals(waitForCompletionTimeout, that.waitForCompletionTimeout) && + Objects.equals(keepAlive, that.keepAlive) && Objects.equals(isCaseSensitive, that.isCaseSensitive); } @@ -290,6 +363,8 @@ public int hashCode() { implicitJoinKeyField, searchAfterBuilder, query, + waitForCompletionTimeout, + keepAlive, isCaseSensitive); } @@ -310,13 +385,16 @@ public IndicesOptions indicesOptions() { @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return new EqlSearchTask(id, type, action, () -> { - StringBuilder sb = new StringBuilder(); - sb.append("indices["); - Strings.arrayToDelimitedString(indices, ",", sb); - sb.append("], "); - sb.append(query); - return sb.toString(); - }, parentTaskId, headers); + return new EqlSearchTask(id, type, action, getDescription(), parentTaskId, headers, null, null); + } + + @Override + public String getDescription() { + StringBuilder sb = new StringBuilder(); + sb.append("indices["); + Strings.arrayToDelimitedString(indices, ",", sb); + sb.append("], "); + sb.append(query); + return sb.toString(); } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchResponse.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchResponse.java index e88e6d6b8f40e..4e01799d8cdf1 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchResponse.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchResponse.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.eql.action; import org.apache.lucene.search.TotalHits; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; @@ -15,6 +16,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.InstantiatingObjectParser; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -27,43 +29,61 @@ import java.util.List; import java.util.Objects; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; + public class EqlSearchResponse extends ActionResponse implements ToXContentObject { private final Hits hits; private final long tookInMillis; private final boolean isTimeout; + private final String asyncExecutionId; + private final boolean isRunning; + private final boolean isPartial; + private static final class Fields { static final String TOOK = "took"; static final String TIMED_OUT = "timed_out"; static final String HITS = "hits"; + static final String ID = "id"; + static final String IS_RUNNING = "is_running"; + static final String IS_PARTIAL = "is_partial"; } private static final ParseField TOOK = new ParseField(Fields.TOOK); private static final ParseField TIMED_OUT = new ParseField(Fields.TIMED_OUT); private static final ParseField HITS = new ParseField(Fields.HITS); + private static final ParseField ID = new ParseField(Fields.ID); + private static final ParseField IS_RUNNING = new ParseField(Fields.IS_RUNNING); + private static final ParseField IS_PARTIAL = new ParseField(Fields.IS_PARTIAL); - private static final ConstructingObjectParser PARSER = - new ConstructingObjectParser<>("eql/search_response", true, - args -> { - int i = 0; - Hits hits = (Hits) args[i++]; - Long took = (Long) args[i++]; - Boolean timeout = (Boolean) args[i]; - return new EqlSearchResponse(hits, took, timeout); - }); - + private static final InstantiatingObjectParser PARSER; static { - PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> Hits.fromXContent(p), HITS); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOOK); - PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), TIMED_OUT); + InstantiatingObjectParser.Builder parser = + InstantiatingObjectParser.builder("eql/search_response", true, EqlSearchResponse.class); + parser.declareObject(constructorArg(), (p, c) -> Hits.fromXContent(p), HITS); + parser.declareLong(constructorArg(), TOOK); + parser.declareBoolean(constructorArg(), TIMED_OUT); + parser.declareString(optionalConstructorArg(), ID); + parser.declareBoolean(constructorArg(), IS_RUNNING); + parser.declareBoolean(constructorArg(), IS_PARTIAL); + PARSER = parser.build(); } public EqlSearchResponse(Hits hits, long tookInMillis, boolean isTimeout) { + this(hits, tookInMillis, isTimeout, null, false, false); + } + + public EqlSearchResponse(Hits hits, long tookInMillis, boolean isTimeout, String asyncExecutionId, + boolean isRunning, boolean isPartial) { super(); this.hits = hits == null ? Hits.EMPTY : hits; this.tookInMillis = tookInMillis; this.isTimeout = isTimeout; + this.asyncExecutionId = asyncExecutionId; + this.isRunning = isRunning; + this.isPartial = isPartial; } public EqlSearchResponse(StreamInput in) throws IOException { @@ -71,6 +91,15 @@ public EqlSearchResponse(StreamInput in) throws IOException { tookInMillis = in.readVLong(); isTimeout = in.readBoolean(); hits = new Hits(in); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { //TODO: Remove after backport + asyncExecutionId = in.readOptionalString(); + isPartial = in.readBoolean(); + isRunning = in.readBoolean(); + } else { + asyncExecutionId = null; + isPartial = false; + isRunning = false; + } } public static EqlSearchResponse fromXContent(XContentParser parser) { @@ -82,6 +111,11 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(tookInMillis); out.writeBoolean(isTimeout); hits.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { //TODO: Remove after backport + out.writeOptionalString(asyncExecutionId); + out.writeBoolean(isPartial); + out.writeBoolean(isRunning); + } } @Override @@ -92,6 +126,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } private XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException { + if (asyncExecutionId != null) { + builder.field(ID.getPreferredName(), asyncExecutionId); + } + builder.field(IS_PARTIAL.getPreferredName(), isPartial); + builder.field(IS_RUNNING.getPreferredName(), isRunning); builder.field(TOOK.getPreferredName(), tookInMillis); builder.field(TIMED_OUT.getPreferredName(), isTimeout); hits.toXContent(builder, params); @@ -110,6 +149,18 @@ public Hits hits() { return hits; } + public String id() { + return asyncExecutionId; + } + + public boolean isRunning() { + return isRunning; + } + + public boolean isPartial() { + return isPartial; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -121,12 +172,13 @@ public boolean equals(Object o) { EqlSearchResponse that = (EqlSearchResponse) o; return Objects.equals(hits, that.hits) && Objects.equals(tookInMillis, that.tookInMillis) - && Objects.equals(isTimeout, that.isTimeout); + && Objects.equals(isTimeout, that.isTimeout) + && Objects.equals(asyncExecutionId, that.asyncExecutionId); } @Override public int hashCode() { - return Objects.hash(hits, tookInMillis, isTimeout); + return Objects.hash(hits, tookInMillis, isTimeout, asyncExecutionId); } @Override @@ -253,9 +305,9 @@ private static final class Fields { }); static { - PARSER.declareInt(ConstructingObjectParser.constructorArg(), COUNT); - PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), KEYS); - PARSER.declareFloat(ConstructingObjectParser.constructorArg(), PERCENT); + PARSER.declareInt(constructorArg(), COUNT); + PARSER.declareStringArray(constructorArg(), KEYS); + PARSER.declareFloat(constructorArg(), PERCENT); } public Count(int count, List keys, float percent) { diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchTask.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchTask.java index e0c813718cd13..5730117f80d9c 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchTask.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchTask.java @@ -8,17 +8,22 @@ import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.xpack.core.async.AsyncExecutionId; +import org.elasticsearch.xpack.core.async.AsyncTask; import java.util.Map; -import java.util.function.Supplier; -public class EqlSearchTask extends CancellableTask { - private final Supplier descriptionSupplier; +public class EqlSearchTask extends CancellableTask implements AsyncTask { + private final String description; + private final AsyncExecutionId asyncExecutionId; + private final Map originHeaders; - public EqlSearchTask(long id, String type, String action, Supplier descriptionSupplier, TaskId parentTaskId, - Map headers) { + public EqlSearchTask(long id, String type, String action, String description, TaskId parentTaskId, + Map headers, Map originHeaders, AsyncExecutionId asyncExecutionId) { super(id, type, action, null, parentTaskId, headers); - this.descriptionSupplier = descriptionSupplier; + this.description = description; + this.asyncExecutionId = asyncExecutionId; + this.originHeaders = originHeaders; } @Override @@ -28,6 +33,16 @@ public boolean shouldCancelChildrenOnCancellation() { @Override public String getDescription() { - return descriptionSupplier.get(); + return description; + } + + @Override + public Map getOriginHeaders() { + return originHeaders; + } + + @Override + public AsyncExecutionId getExecutionId() { + return asyncExecutionId; } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java index c63fa17efd1a9..db811132b0d45 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java @@ -46,6 +46,7 @@ import java.util.function.Supplier; public class EqlPlugin extends Plugin implements ActionPlugin { + public static final String INDEX = ".async-eql-search"; private final boolean enabled; diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/RestEqlSearchAction.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/RestEqlSearchAction.java index 7e26b7859bbab..dfe2e8feefe5b 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/RestEqlSearchAction.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/RestEqlSearchAction.java @@ -46,6 +46,14 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli eqlRequest = EqlSearchRequest.fromXContent(parser); eqlRequest.indices(Strings.splitStringByCommaToArray(request.param("index"))); eqlRequest.indicesOptions(IndicesOptions.fromRequest(request, eqlRequest.indicesOptions())); + if (request.hasParam("wait_for_completion_timeout")) { + eqlRequest.waitForCompletionTimeout( + request.paramAsTime("wait_for_completion_timeout", eqlRequest.waitForCompletionTimeout())); + } + if (request.hasParam("keep_alive")) { + eqlRequest.keepAlive(request.paramAsTime("keep_alive", eqlRequest.keepAlive())); + } + eqlRequest.keepOnCompletion(request.paramAsBoolean("keep_on_completion", eqlRequest.keepOnCompletion())); } return channel -> client.execute(EqlSearchAction.INSTANCE, eqlRequest, new RestResponseListener<>(channel) { diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java index bb5e9bfd8a46b..6f86de1b07da5 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java @@ -8,8 +8,11 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.time.DateUtils; import org.elasticsearch.common.unit.TimeValue; @@ -19,6 +22,8 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.async.AsyncExecutionId; +import org.elasticsearch.xpack.core.async.AsyncTaskManagementService; import org.elasticsearch.xpack.core.security.SecurityContext; import org.elasticsearch.xpack.eql.action.EqlSearchAction; import org.elasticsearch.xpack.eql.action.EqlSearchRequest; @@ -29,32 +34,72 @@ import org.elasticsearch.xpack.eql.session.EqlConfiguration; import org.elasticsearch.xpack.eql.session.Results; +import java.io.IOException; import java.time.ZoneId; +import java.util.Map; import static org.elasticsearch.action.ActionListener.wrap; +import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_EQL_SEARCH_ORIGIN; + +public class TransportEqlSearchAction extends HandledTransportAction + implements AsyncTaskManagementService.AsyncOperation { -public class TransportEqlSearchAction extends HandledTransportAction { private final SecurityContext securityContext; private final ClusterService clusterService; private final PlanExecutor planExecutor; + private final ThreadPool threadPool; + private final AsyncTaskManagementService asyncTaskManagementService; @Inject public TransportEqlSearchAction(Settings settings, ClusterService clusterService, TransportService transportService, - ThreadPool threadPool, ActionFilters actionFilters, PlanExecutor planExecutor) { + ThreadPool threadPool, ActionFilters actionFilters, PlanExecutor planExecutor, + NamedWriteableRegistry registry, Client client) { super(EqlSearchAction.NAME, transportService, actionFilters, EqlSearchRequest::new); this.securityContext = XPackSettings.SECURITY_ENABLED.get(settings) ? new SecurityContext(settings, threadPool.getThreadContext()) : null; this.clusterService = clusterService; this.planExecutor = planExecutor; + this.threadPool = threadPool; + + this.asyncTaskManagementService = new AsyncTaskManagementService<>(EqlPlugin.INDEX, client, ASYNC_EQL_SEARCH_ORIGIN, registry, + taskManager, EqlSearchAction.INSTANCE.name(), this, EqlSearchTask.class, clusterService, threadPool); } @Override - protected void doExecute(Task task, EqlSearchRequest request, ActionListener listener) { - operation(planExecutor, (EqlSearchTask) task, request, username(securityContext), clusterName(clusterService), + public EqlSearchTask createTask(EqlSearchRequest request, long id, String type, String action, TaskId parentTaskId, + Map headers, Map originHeaders, AsyncExecutionId asyncExecutionId) { + return new EqlSearchTask(id, type, action, request.getDescription(), parentTaskId, headers, originHeaders, asyncExecutionId); + } + + @Override + public void operation(EqlSearchRequest request, EqlSearchTask task, ActionListener listener) { + operation(planExecutor, task, request, username(securityContext), clusterName(clusterService), clusterService.localNode().getId(), listener); } + @Override + public EqlSearchResponse initialResponse(EqlSearchTask task) { + return new EqlSearchResponse(EqlSearchResponse.Hits.EMPTY, + threadPool.relativeTimeInMillis() - task.getStartTime(), false, task.getExecutionId().getEncoded(), true, true); + } + + @Override + public EqlSearchResponse readResponse(StreamInput inputStream) throws IOException { + return new EqlSearchResponse(inputStream); + } + + @Override + protected void doExecute(Task task, EqlSearchRequest request, ActionListener listener) { + if (request.waitForCompletionTimeout() != null && request.waitForCompletionTimeout().getMillis() >= 0) { + asyncTaskManagementService.asyncExecute(request, request.waitForCompletionTimeout(), request.keepAlive(), + request.keepOnCompletion(), listener); + } else { + operation(planExecutor, (EqlSearchTask) task, request, username(securityContext), clusterName(clusterService), + clusterService.localNode().getId(), listener); + } + } + public static void operation(PlanExecutor planExecutor, EqlSearchTask task, EqlSearchRequest request, String username, String clusterName, String nodeId, ActionListener listener) { // TODO: these should be sent by the client @@ -70,15 +115,19 @@ public static void operation(PlanExecutor planExecutor, EqlSearchTask task, EqlS .implicitJoinKey(request.implicitJoinKeyField()); EqlConfiguration cfg = new EqlConfiguration(request.indices(), zoneId, username, clusterName, filter, timeout, request.fetchSize(), - includeFrozen, request.isCaseSensitive(), clientId, new TaskId(nodeId, task.getId()), task::isCancelled); - planExecutor.eql(cfg, request.query(), params, wrap(r -> listener.onResponse(createResponse(r)), listener::onFailure)); + includeFrozen, request.isCaseSensitive(), clientId, new TaskId(nodeId, task.getId()), task); + planExecutor.eql(cfg, request.query(), params, wrap(r -> listener.onResponse(createResponse(r, task.getExecutionId())), + listener::onFailure)); } - static EqlSearchResponse createResponse(Results results) { + static EqlSearchResponse createResponse(Results results, AsyncExecutionId id) { EqlSearchResponse.Hits hits = new EqlSearchResponse.Hits(results.searchHits(), results.sequences(), results.counts(), results - .totalHits()); - - return new EqlSearchResponse(hits, results.tookTime().getMillis(), results.timedOut()); + .totalHits()); + if (id != null) { + return new EqlSearchResponse(hits, results.tookTime().getMillis(), results.timedOut(), id.getEncoded(), false, false); + } else { + return new EqlSearchResponse(hits, results.tookTime().getMillis(), results.timedOut()); + } } static String username(SecurityContext securityContext) { diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EqlConfiguration.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EqlConfiguration.java index a81c80b80c2d6..e0bd8f4082a3a 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EqlConfiguration.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EqlConfiguration.java @@ -10,9 +10,9 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.xpack.eql.action.EqlSearchTask; import java.time.ZoneId; -import java.util.function.Supplier; public class EqlConfiguration extends org.elasticsearch.xpack.ql.session.Configuration { @@ -21,17 +21,15 @@ public class EqlConfiguration extends org.elasticsearch.xpack.ql.session.Configu private final int size; private final String clientId; private final boolean includeFrozenIndices; - private final Supplier isCancelled; private final TaskId taskId; + private final EqlSearchTask task; private final boolean isCaseSensitive; @Nullable private final QueryBuilder filter; public EqlConfiguration(String[] indices, ZoneId zi, String username, String clusterName, QueryBuilder filter, TimeValue requestTimeout, - int size, boolean includeFrozen, boolean isCaseSensitive, String clientId, TaskId taskId, - Supplier isCancelled) { - + int size, boolean includeFrozen, boolean isCaseSensitive, String clientId, TaskId taskId, EqlSearchTask task) { super(zi, username, clusterName); this.indices = indices; @@ -42,7 +40,7 @@ public EqlConfiguration(String[] indices, ZoneId zi, String username, String clu this.includeFrozenIndices = includeFrozen; this.isCaseSensitive = isCaseSensitive; this.taskId = taskId; - this.isCancelled = isCancelled; + this.task = task; } public String[] indices() { @@ -74,7 +72,7 @@ public boolean isCaseSensitive() { } public boolean isCancelled() { - return isCancelled.get(); + return task.isCancelled(); } public TaskId getTaskId() { diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/EqlTestUtils.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/EqlTestUtils.java index b2cacf9775347..ff0fa90a369b3 100644 --- a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/EqlTestUtils.java +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/EqlTestUtils.java @@ -8,6 +8,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.xpack.core.async.AsyncExecutionId; import org.elasticsearch.xpack.eql.action.EqlSearchAction; import org.elasticsearch.xpack.eql.action.EqlSearchTask; import org.elasticsearch.xpack.eql.session.EqlConfiguration; @@ -28,7 +29,7 @@ private EqlTestUtils() { public static final EqlConfiguration TEST_CFG = new EqlConfiguration(new String[]{"none"}, org.elasticsearch.xpack.ql.util.DateUtils.UTC, "nobody", "cluster", null, TimeValue.timeValueSeconds(30), -1, false, false, "", - new TaskId(randomAlphaOfLength(10), randomNonNegativeLong()), () -> false); + new TaskId(randomAlphaOfLength(10), randomNonNegativeLong()), randomTask()); public static EqlConfiguration randomConfiguration() { return new EqlConfiguration(new String[]{randomAlphaOfLength(16)}, @@ -42,7 +43,7 @@ public static EqlConfiguration randomConfiguration() { randomBoolean(), randomAlphaOfLength(16), new TaskId(randomAlphaOfLength(10), randomNonNegativeLong()), - () -> false); + randomTask()); } public static EqlConfiguration randomConfigurationWithCaseSensitive(boolean isCaseSensitive) { @@ -57,10 +58,11 @@ public static EqlConfiguration randomConfigurationWithCaseSensitive(boolean isCa isCaseSensitive, randomAlphaOfLength(16), new TaskId(randomAlphaOfLength(10), randomNonNegativeLong()), - () -> false); + randomTask()); } public static EqlSearchTask randomTask() { - return new EqlSearchTask(randomLong(), "transport", EqlSearchAction.NAME, () -> "", null, Collections.emptyMap()); + return new EqlSearchTask(randomLong(), "transport", EqlSearchAction.NAME, "", null, Collections.emptyMap(), Collections.emptyMap(), + new AsyncExecutionId("", new TaskId(randomAlphaOfLength(10), 1))); } } diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/action/EqlSearchResponseTests.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/action/EqlSearchResponseTests.java index f4afe8128378e..4cc10e7814a71 100644 --- a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/action/EqlSearchResponseTests.java +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/action/EqlSearchResponseTests.java @@ -35,11 +35,7 @@ static List randomEvents() { @Override protected EqlSearchResponse createTestInstance() { - TotalHits totalHits = null; - if (randomBoolean()) { - totalHits = new TotalHits(randomIntBetween(100, 1000), TotalHits.Relation.EQUAL_TO); - } - return createRandomInstance(totalHits); + return randomEqlSearchResponse(); } @Override @@ -47,12 +43,25 @@ protected Writeable.Reader instanceReader() { return EqlSearchResponse::new; } + public static EqlSearchResponse randomEqlSearchResponse() { + TotalHits totalHits = null; + if (randomBoolean()) { + totalHits = new TotalHits(randomIntBetween(100, 1000), TotalHits.Relation.EQUAL_TO); + } + return createRandomInstance(totalHits); + } + public static EqlSearchResponse createRandomEventsResponse(TotalHits totalHits) { EqlSearchResponse.Hits hits = null; if (randomBoolean()) { hits = new EqlSearchResponse.Hits(randomEvents(), null, null, totalHits); } - return new EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean()); + if (randomBoolean()) { + return new EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean()); + } else { + return new EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean(), + randomAlphaOfLength(10), randomBoolean(), randomBoolean()); + } } public static EqlSearchResponse createRandomSequencesResponse(TotalHits totalHits) { @@ -72,7 +81,12 @@ public static EqlSearchResponse createRandomSequencesResponse(TotalHits totalHit if (randomBoolean()) { hits = new EqlSearchResponse.Hits(null, seq, null, totalHits); } - return new EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean()); + if (randomBoolean()) { + return new EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean()); + } else { + return new EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean(), + randomAlphaOfLength(10), randomBoolean(), randomBoolean()); + } } public static EqlSearchResponse createRandomCountResponse(TotalHits totalHits) { @@ -92,7 +106,12 @@ public static EqlSearchResponse createRandomCountResponse(TotalHits totalHits) { if (randomBoolean()) { hits = new EqlSearchResponse.Hits(null, null, cn, totalHits); } - return new EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean()); + if (randomBoolean()) { + return new EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean()); + } else { + return new EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean(), + randomAlphaOfLength(10), randomBoolean(), randomBoolean()); + } } public static EqlSearchResponse createRandomInstance(TotalHits totalHits) { diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchActionTests.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchActionTests.java new file mode 100644 index 0000000000000..77ec076424152 --- /dev/null +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchActionTests.java @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.eql.plugin; + +import org.elasticsearch.client.Client; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.Before; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TransportEqlSearchActionTests extends ESTestCase { + + private XPackLicenseState licenseState; + private Client client; + + + @Before + public void init() throws Exception { + licenseState = mock(XPackLicenseState.class); + client = mock(Client.class); + ThreadPool threadPool = mock(ThreadPool.class); + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + when(threadPool.getThreadContext()).thenReturn(threadContext); + when(client.threadPool()).thenReturn(threadPool); + } + +} diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java index 77d8c9d9737bb..f6992e83ca7e3 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java @@ -12,6 +12,7 @@ import org.elasticsearch.xpack.core.security.authc.Authentication; import org.elasticsearch.xpack.core.security.authc.AuthenticationField; import org.elasticsearch.xpack.core.security.support.Automatons; +import org.elasticsearch.xpack.core.security.user.AsyncEqlSearchUser; import org.elasticsearch.xpack.core.security.user.AsyncSearchUser; import org.elasticsearch.xpack.core.security.user.XPackSecurityUser; import org.elasticsearch.xpack.core.security.user.XPackUser; @@ -20,6 +21,7 @@ import java.util.function.Predicate; import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_EQL_SEARCH_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.ENRICH_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.IDP_ORIGIN; @@ -123,6 +125,9 @@ public static void switchUserBasedOnActionOriginAndExecute(ThreadContext threadC case ASYNC_SEARCH_ORIGIN: securityContext.executeAsUser(AsyncSearchUser.INSTANCE, consumer, Version.CURRENT); break; + case ASYNC_EQL_SEARCH_ORIGIN: + securityContext.executeAsUser(AsyncEqlSearchUser.INSTANCE, consumer, Version.CURRENT); + break; default: assert false : "action.origin [" + actionOrigin + "] is unknown!"; throw new IllegalStateException("action.origin [" + actionOrigin + "] should always be a known value"); diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/accesscontrol/IndicesPermissionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/accesscontrol/IndicesPermissionTests.java index 9d2905e165976..5ebcb6c9fe5ad 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/accesscontrol/IndicesPermissionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/accesscontrol/IndicesPermissionTests.java @@ -323,8 +323,9 @@ public void testSecurityIndicesPermissions() { } public void testAsyncSearchIndicesPermissions() { + String prefix = randomFrom(RestrictedIndicesNames.ASYNC_SEARCH_PREFIX, RestrictedIndicesNames.ASYNC_EQL_SEARCH_PREFIX); final Settings indexSettings = Settings.builder().put("index.version.created", Version.CURRENT).build(); - final String asyncSearchIndex = RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2); + final String asyncSearchIndex = prefix + randomAlphaOfLengthBetween(0, 2); final Metadata metadata = new Metadata.Builder() .put(new IndexMetadata.Builder(asyncSearchIndex) .settings(indexSettings) diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/user/AsyncEqlSearchUserTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/user/AsyncEqlSearchUserTests.java new file mode 100644 index 0000000000000..3444540a26f86 --- /dev/null +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/user/AsyncEqlSearchUserTests.java @@ -0,0 +1,64 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.security.user; + +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsAction; +import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; +import org.elasticsearch.action.admin.cluster.stats.ClusterStatsAction; +import org.elasticsearch.action.delete.DeleteAction; +import org.elasticsearch.action.get.GetAction; +import org.elasticsearch.action.index.IndexAction; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.xpack.core.security.authc.Authentication; +import org.elasticsearch.xpack.core.security.index.RestrictedIndicesNames; +import org.elasticsearch.xpack.core.security.user.AsyncEqlSearchUser; +import org.elasticsearch.xpack.core.security.user.AsyncEqlSearchUser; +import org.elasticsearch.xpack.core.watcher.transport.actions.get.GetWatchAction; +import org.hamcrest.Matchers; + +import java.util.Arrays; +import java.util.function.Predicate; + +import static org.mockito.Mockito.mock; + +public class AsyncEqlSearchUserTests extends ESTestCase { + + public void testAsyncEqlSearchUserCannotAccessNonRestrictedIndices() { + for (String action : Arrays.asList(GetAction.NAME, DeleteAction.NAME, SearchAction.NAME, IndexAction.NAME)) { + Predicate predicate = AsyncEqlSearchUser.ROLE.indices().allowedIndicesMatcher(action); + String index = randomAlphaOfLengthBetween(3, 12); + if (false == RestrictedIndicesNames.isRestricted(index)) { + assertThat(predicate.test(index), Matchers.is(false)); + } + index = "." + randomAlphaOfLengthBetween(3, 12); + if (false == RestrictedIndicesNames.isRestricted(index)) { + assertThat(predicate.test(index), Matchers.is(false)); + } + } + } + + public void testAsyncEqlSearchUserCanAccessOnlyAsyncEqlSearchRestrictedIndices() { + for (String action : Arrays.asList(GetAction.NAME, DeleteAction.NAME, SearchAction.NAME, IndexAction.NAME)) { + final Predicate predicate = AsyncEqlSearchUser.ROLE.indices().allowedIndicesMatcher(action); + for (String index : RestrictedIndicesNames.RESTRICTED_NAMES) { + assertThat(predicate.test(index), Matchers.is(false)); + } + assertThat(predicate.test(RestrictedIndicesNames.ASYNC_EQL_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 3)), + Matchers.is(true)); + assertThat(predicate.test(RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 3)), + Matchers.is(false)); + } + } + + public void testAsyncEqlSearchUserHasNoClusterPrivileges() { + for (String action : Arrays.asList(ClusterStateAction.NAME, GetWatchAction.NAME, ClusterStatsAction.NAME, NodesStatsAction.NAME)) { + assertThat(AsyncEqlSearchUser.ROLE.cluster().check(action, mock(TransportRequest.class), mock(Authentication.class)), + Matchers.is(false)); + } + } +} diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/user/AsyncSearchUserTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/user/AsyncSearchUserTests.java index 38722cf8a422a..315d95a3317e0 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/user/AsyncSearchUserTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/user/AsyncSearchUserTests.java @@ -48,6 +48,8 @@ public void testAsyncSearchUserCanAccessOnlyAsyncSearchRestrictedIndices() { assertThat(predicate.test(index), Matchers.is(false)); } assertThat(predicate.test(RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 3)), Matchers.is(true)); + assertThat(predicate.test(RestrictedIndicesNames.ASYNC_EQL_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 3)), + Matchers.is(false)); } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/user/XPackUserTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/user/XPackUserTests.java index dc34ffad8d65f..f2a773d4d1436 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/user/XPackUserTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/user/XPackUserTests.java @@ -44,6 +44,8 @@ public void testXPackUserCannotAccessRestrictedIndices() { assertThat(predicate.test(index), Matchers.is(false)); } assertThat(predicate.test(RestrictedIndicesNames.ASYNC_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)), Matchers.is(false)); + assertThat(predicate.test(RestrictedIndicesNames.ASYNC_EQL_SEARCH_PREFIX + randomAlphaOfLengthBetween(0, 2)), + Matchers.is(false)); } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/eql.search.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/eql.search.json index b9ba460d6a997..c371851deeb53 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/api/eql.search.json +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/eql.search.json @@ -22,7 +22,22 @@ } ] }, - "params":{}, + "params":{ + "wait_for_completion_timeout":{ + "type":"time", + "description":"Specify the time that the request should block waiting for the final response" + }, + "keep_on_completion":{ + "type":"boolean", + "description":"Control whether the response should be stored in the cluster if it completed within the provided [wait_for_completion] time (default: false)", + "default":false + }, + "keep_alive": { + "type": "time", + "description": "Update the time interval in which the results (partial or final) for this search will be available", + "default": "5d" + } + }, "body":{ "description":"Eql request body. Use the `query` to limit the query scope.", "required":true