Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EQL: Adds an ability to start an asynchronous EQL search #56147

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Validatable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand All @@ -44,13 +45,21 @@ public class EqlSearchRequest implements Validatable, ToXContentObject {
private SearchAfterBuilder searchAfterBuilder;
private String query;

// Async settings
private TimeValue waitForCompletionTimeout;
private boolean keepOnCompletion;
private TimeValue keepAlive;

static final String KEY_FILTER = "filter";
static final String KEY_TIMESTAMP_FIELD = "timestamp_field";
static final String KEY_EVENT_CATEGORY_FIELD = "event_category_field";
static final String KEY_IMPLICIT_JOIN_KEY_FIELD = "implicit_join_key_field";
static final String KEY_SIZE = "size";
static final String KEY_SEARCH_AFTER = "search_after";
static final String KEY_QUERY = "query";
static final String KEY_WAIT_FOR_COMPLETION_TIMEOUT = "wait_for_completion_timeout";
static final String KEY_KEEP_ALIVE = "keep_alive";
static final String KEY_KEEP_ON_COMPLETION = "keep_on_completion";

public EqlSearchRequest(String indices, String query) {
indices(indices);
Expand All @@ -75,6 +84,13 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
}

builder.field(KEY_QUERY, query);
if (waitForCompletionTimeout != null) {
builder.field(KEY_WAIT_FOR_COMPLETION_TIMEOUT, waitForCompletionTimeout);
}
if (keepAlive != null) {
builder.field(KEY_KEEP_ALIVE, keepAlive);
}
builder.field(KEY_KEEP_ON_COMPLETION, keepOnCompletion);
builder.endObject();
return builder;
}
Expand Down Expand Up @@ -166,6 +182,32 @@ public EqlSearchRequest query(String query) {
return this;
}

public TimeValue waitForCompletionTimeout() {
return waitForCompletionTimeout;
}

public EqlSearchRequest waitForCompletionTimeout(TimeValue waitForCompletionTimeout) {
this.waitForCompletionTimeout = waitForCompletionTimeout;
return this;
}

public Boolean keepOnCompletion() {
return keepOnCompletion;
}

public void keepOnCompletion(Boolean keepOnCompletion) {
this.keepOnCompletion = keepOnCompletion;
}

public TimeValue keepAlive() {
return keepAlive;
}

public EqlSearchRequest keepAlive(TimeValue keepAlive) {
this.keepAlive = keepAlive;
return this;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -183,7 +225,10 @@ public boolean equals(Object o) {
Objects.equals(eventCategoryField, that.eventCategoryField) &&
Objects.equals(implicitJoinKeyField, that.implicitJoinKeyField) &&
Objects.equals(searchAfterBuilder, that.searchAfterBuilder) &&
Objects.equals(query, that.query);
Objects.equals(query, that.query) &&
Objects.equals(waitForCompletionTimeout, that.waitForCompletionTimeout) &&
Objects.equals(keepAlive, that.keepAlive) &&
Objects.equals(keepOnCompletion, that.keepOnCompletion);
}

@Override
Expand All @@ -197,7 +242,10 @@ public int hashCode() {
eventCategoryField,
implicitJoinKeyField,
searchAfterBuilder,
query);
query,
waitForCompletionTimeout,
keepAlive,
keepOnCompletion);
}

public String[] indices() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.InstantiatingObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
Expand All @@ -32,43 +33,56 @@
import java.util.List;
import java.util.Objects;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

public class EqlSearchResponse {

private final Hits hits;
private final long tookInMillis;
private final boolean isTimeout;
private final String asyncExecutionId;
private final boolean isRunning;
private final boolean isPartial;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is is_partial needed in eql given that no partial results are returned? We are regretting to have added this flag in the response of async search, and we will likely remove it, so it would be good to make sure that it's is needed in eql.


private static final class Fields {
static final String TOOK = "took";
static final String TIMED_OUT = "timed_out";
static final String HITS = "hits";
static final String ID = "id";
static final String IS_RUNNING = "is_running";
static final String IS_PARTIAL = "is_partial";
}

private static final ParseField TOOK = new ParseField(Fields.TOOK);
private static final ParseField TIMED_OUT = new ParseField(Fields.TIMED_OUT);
private static final ParseField HITS = new ParseField(Fields.HITS);
private static final ParseField ID = new ParseField(Fields.ID);
private static final ParseField IS_RUNNING = new ParseField(Fields.IS_RUNNING);
private static final ParseField IS_PARTIAL = new ParseField(Fields.IS_PARTIAL);

private static final ConstructingObjectParser<EqlSearchResponse, Void> PARSER =
new ConstructingObjectParser<>("eql/search_response", true,
args -> {
int i = 0;
Hits hits = (Hits) args[i++];
Long took = (Long) args[i++];
Boolean timeout = (Boolean) args[i];
return new EqlSearchResponse(hits, took, timeout);
});

private static final InstantiatingObjectParser<EqlSearchResponse, Void> PARSER;
static {
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> Hits.fromXContent(p), HITS);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOOK);
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), TIMED_OUT);
InstantiatingObjectParser.Builder<EqlSearchResponse, Void> parser =
InstantiatingObjectParser.builder("eql/search_response", true, EqlSearchResponse.class);
parser.declareObject(constructorArg(), (p, c) -> Hits.fromXContent(p), HITS);
parser.declareLong(constructorArg(), TOOK);
parser.declareBoolean(constructorArg(), TIMED_OUT);
parser.declareString(optionalConstructorArg(), ID);
parser.declareBoolean(constructorArg(), IS_RUNNING);
parser.declareBoolean(constructorArg(), IS_PARTIAL);
PARSER = parser.build();
}

public EqlSearchResponse(Hits hits, long tookInMillis, boolean isTimeout) {
public EqlSearchResponse(Hits hits, long tookInMillis, boolean isTimeout, String asyncExecutionId,
boolean isRunning, boolean isPartial) {
super();
this.hits = hits == null ? Hits.EMPTY : hits;
this.tookInMillis = tookInMillis;
this.isTimeout = isTimeout;
this.asyncExecutionId = asyncExecutionId;
this.isRunning = isRunning;
this.isPartial = isPartial;
}

public static EqlSearchResponse fromXContent(XContentParser parser) {
Expand All @@ -87,6 +101,18 @@ public Hits hits() {
return hits;
}

public String id() {
return asyncExecutionId;
}

public boolean isRunning() {
return isRunning;
}

public boolean isPartial() {
return isPartial;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,12 @@ public static org.elasticsearch.xpack.eql.action.EqlSearchResponse createRandomE
if (randomBoolean()) {
hits = new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Hits(randomEvents(), null, null, totalHits);
}
return new org.elasticsearch.xpack.eql.action.EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean());
if (randomBoolean()) {
return new org.elasticsearch.xpack.eql.action.EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean());
} else {
return new org.elasticsearch.xpack.eql.action.EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean(),
randomAlphaOfLength(10), randomBoolean(), randomBoolean());
}
}

public static org.elasticsearch.xpack.eql.action.EqlSearchResponse createRandomSequencesResponse(TotalHits totalHits) {
Expand All @@ -77,7 +82,12 @@ public static org.elasticsearch.xpack.eql.action.EqlSearchResponse createRandomS
if (randomBoolean()) {
hits = new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Hits(null, seq, null, totalHits);
}
return new org.elasticsearch.xpack.eql.action.EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean());
if (randomBoolean()) {
return new org.elasticsearch.xpack.eql.action.EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean());
} else {
return new org.elasticsearch.xpack.eql.action.EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean(),
randomAlphaOfLength(10), randomBoolean(), randomBoolean());
}
}

public static org.elasticsearch.xpack.eql.action.EqlSearchResponse createRandomCountResponse(TotalHits totalHits) {
Expand All @@ -97,7 +107,12 @@ public static org.elasticsearch.xpack.eql.action.EqlSearchResponse createRandomC
if (randomBoolean()) {
hits = new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Hits(null, null, cn, totalHits);
}
return new org.elasticsearch.xpack.eql.action.EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean());
if (randomBoolean()) {
return new org.elasticsearch.xpack.eql.action.EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean());
} else {
return new org.elasticsearch.xpack.eql.action.EqlSearchResponse(hits, randomIntBetween(0, 1001), randomBoolean(),
randomAlphaOfLength(10), randomBoolean(), randomBoolean());
}
}

public static org.elasticsearch.xpack.eql.action.EqlSearchResponse createRandomInstance(TotalHits totalHits) {
Expand Down
6 changes: 4 additions & 2 deletions docs/reference/eql/eql-search-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,9 @@ the https://en.wikipedia.org/wiki/Unix_time[Unix epoch], in ascending order.
]
}
]
}
},
"is_partial": false,
"is_running": false
}
----
// TESTRESPONSE[s/"took": 6/"took": $body.took/]
// TESTRESPONSE[s/"took": 6/"took": $body.took/]
4 changes: 3 additions & 1 deletion docs/reference/eql/search.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ https://en.wikipedia.org/wiki/Unix_time[Unix epoch], in ascending order.
]
}
]
}
},
"is_partial": false,
"is_running": false
}
----
// TESTRESPONSE[s/"took": 60/"took": $body.took/]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void onResponse(AsyncSearchResponse searchResponse) {
// creates the fallback response if the node crashes/restarts in the middle of the request
// TODO: store intermediate results ?
AsyncSearchResponse initialResp = searchResponse.clone(searchResponse.getId());
store.storeInitialResponse(docId, searchTask.getOriginHeaders(), initialResp,
store.createResponse(docId, searchTask.getOriginHeaders(), initialResp,
new ActionListener<>() {
@Override
public void onResponse(IndexResponse r) {
Expand Down Expand Up @@ -191,7 +191,7 @@ private void onFinalResponse(CancellableTask submitTask,
}

try {
store.storeFinalResponse(searchTask.getExecutionId().getDocId(), threadContext.getResponseHeaders(),response,
store.updateResponse(searchTask.getExecutionId().getDocId(), threadContext.getResponseHeaders(),response,
ActionListener.wrap(resp -> unregisterTaskAndMoveOn(searchTask, nextAction),
exc -> {
Throwable cause = ExceptionsHelper.unwrapCause(exc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public final class ClientHelper {
public static final String ENRICH_ORIGIN = "enrich";
public static final String TRANSFORM_ORIGIN = "transform";
public static final String ASYNC_SEARCH_ORIGIN = "async_search";
public static final String ASYNC_EQL_SEARCH_ORIGIN = "async_eql_search";
public static final String IDP_ORIGIN = "idp";

private ClientHelper() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,10 @@ void createIndexIfNecessary(ActionListener<Void> listener) {
* Stores the initial response with the original headers of the authenticated user
* and the expected expiration time.
*/
public void storeInitialResponse(String docId,
Map<String, String> headers,
R response,
ActionListener<IndexResponse> listener) throws IOException {
public void createResponse(String docId,
Map<String, String> headers,
R response,
ActionListener<IndexResponse> listener) throws IOException {
Map<String, Object> source = new HashMap<>();
source.put(HEADERS_FIELD, headers);
source.put(EXPIRATION_TIME_FIELD, response.getExpirationTime());
Expand All @@ -181,10 +181,10 @@ public void storeInitialResponse(String docId,
/**
* Stores the final response if the place-holder document is still present (update).
*/
public void storeFinalResponse(String docId,
Map<String, List<String>> responseHeaders,
R response,
ActionListener<UpdateResponse> listener) throws IOException {
public void updateResponse(String docId,
Map<String, List<String>> responseHeaders,
R response,
ActionListener<UpdateResponse> listener) throws IOException {
Map<String, Object> source = new HashMap<>();
source.put(RESPONSE_HEADERS_FIELD, responseHeaders);
source.put(RESULT_FIELD, encodeResponse(response));
Expand Down
Loading