Skip to content

Commit

Permalink
EQL: Protocol support for async mode
Browse files Browse the repository at this point in the history
Adds a protocol support for async mode. The actual implementation
will added when more details of query handling are clear.

Relates to elastic#49581
  • Loading branch information
imotov committed Jan 7, 2020
1 parent 3777324 commit f3c73ea
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,18 @@ setup:
- match: {took: 0}
- match: {hits.total.value: 0}

---
# Testing async execution
"Execute some EQL async":
- do:
eql.search:
index: eql_test
wait_for_completion: false
body:
rule: "process where user = 'SYSTEM'"

- match: {task: '/.+:\d+/'}
- set: {task: task}


# TODO: Add get task when task result is stored
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
private int fetchSize = 50;
private SearchAfterBuilder searchAfterBuilder;
private String rule;
private boolean waitForCompletion;

static final String KEY_QUERY = "query";
static final String KEY_TIMESTAMP_FIELD = "timestamp_field";
Expand Down Expand Up @@ -75,6 +76,7 @@ public EqlSearchRequest(StreamInput in) throws IOException {
fetchSize = in.readVInt();
searchAfterBuilder = in.readOptionalWriteable(SearchAfterBuilder::new);
rule = in.readString();
waitForCompletion = in.readBoolean();
}

@Override
Expand Down Expand Up @@ -224,6 +226,15 @@ public EqlSearchRequest rule(String rule) {
return this;
}

public boolean waitForCompletion() {
return this.waitForCompletion;
}

public EqlSearchRequest waitForCompletion(boolean waitForCompletion) {
this.waitForCompletion = waitForCompletion;
return this;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand All @@ -236,6 +247,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(fetchSize);
out.writeOptionalWriteable(searchAfterBuilder);
out.writeString(rule);
out.writeBoolean(waitForCompletion);
}

@Override
Expand All @@ -256,7 +268,8 @@ public boolean equals(Object o) {
Objects.equals(eventTypeField, that.eventTypeField) &&
Objects.equals(implicitJoinKeyField, that.implicitJoinKeyField) &&
Objects.equals(searchAfterBuilder, that.searchAfterBuilder) &&
Objects.equals(rule, that.rule);
Objects.equals(rule, that.rule) &&
Objects.equals(waitForCompletion, that.waitForCompletion);
}

@Override
Expand All @@ -270,7 +283,8 @@ public int hashCode() {
eventTypeField,
implicitJoinKeyField,
searchAfterBuilder,
rule);
rule,
waitForCompletion);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -79,48 +81,83 @@
*/
public class EqlSearchResponse extends ActionResponse implements ToXContentObject {

private Hits hits;
private long tookInMillis;
private boolean isTimeout;
private final TaskId taskId;
private final Hits hits;
private final long tookInMillis;
private final boolean isTimeout;

private static final class Fields {
static final String TOOK = "took";
static final String TIMED_OUT = "timed_out";
static final String HITS = "hits";
static final String TASK = "task";
}

private static final ParseField TOOK = new ParseField(Fields.TOOK);
private static final ParseField TIMED_OUT = new ParseField(Fields.TIMED_OUT);
private static final ParseField HITS = new ParseField(Fields.HITS);
private static final ParseField TASK = new ParseField(Fields.TASK);

private static final ConstructingObjectParser<EqlSearchResponse, Void> PARSER =
new ConstructingObjectParser<>("eql/search_response", false,
args -> {
if (args[0] != null) {
if (args[1] != null || args[2] != null || args[3] != null) {
throw new IllegalArgumentException("Unexpected elements in async response");
}
return new EqlSearchResponse(new TaskId((String)args[0]));
} else {
if (args[1] == null) {
throw new IllegalArgumentException("Missing hits");
}
if (args[2] == null) {
throw new IllegalArgumentException("Missing took time");
}
if (args[3] == null) {
throw new IllegalArgumentException("Missing timeout");
}
return new EqlSearchResponse((Hits)args[1], (long)args[2], (boolean)args[3]);
}
});

private static final ObjectParser<EqlSearchResponse, Void> PARSER = objectParser(EqlSearchResponse::new);

private static <R extends EqlSearchResponse> ObjectParser<R, Void> objectParser(Supplier<R> supplier) {
ObjectParser<R, Void> parser = new ObjectParser<>("eql/search_response", false, supplier);
parser.declareLong(EqlSearchResponse::took, TOOK);
parser.declareBoolean(EqlSearchResponse::isTimeout, TIMED_OUT);
parser.declareObject(EqlSearchResponse::hits,
(p, c) -> Hits.fromXContent(p), HITS);
return parser;
static {
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), TASK);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> Hits.fromXContent(p), HITS);
PARSER.declareLong(ConstructingObjectParser. optionalConstructorArg(), TOOK);
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), TIMED_OUT);
}

// Constructor for parser from json
protected EqlSearchResponse() {
super();
public EqlSearchResponse(TaskId taskId) {
this.taskId = taskId;
this.hits = null;
this.tookInMillis = 0;
this.isTimeout = false;
}

public EqlSearchResponse(Hits hits, long tookInMillis, boolean isTimeout) {
super();
this.hits(hits);
if (hits == null) {
this.hits = new Hits((Events)null, null);
} else {
this.hits = hits;
}
this.taskId = null;
this.tookInMillis = tookInMillis;
this.isTimeout = isTimeout;
}

public EqlSearchResponse(StreamInput in) throws IOException {
super(in);
tookInMillis = in.readVLong();
isTimeout = in.readBoolean();
hits = new Hits(in);
if (in.readBoolean()) {
taskId = TaskId.readFromStream(in);
this.hits = null;
this.tookInMillis = 0;
this.isTimeout = false;
} else {
taskId = null;
tookInMillis = in.readVLong();
isTimeout = in.readBoolean();
hits = new Hits(in);
}
}

public static EqlSearchResponse fromXContent(XContentParser parser) {
Expand All @@ -129,53 +166,46 @@ public static EqlSearchResponse fromXContent(XContentParser parser) {

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(tookInMillis);
out.writeBoolean(isTimeout);
hits.writeTo(out);
if (taskId != null) {
out.writeBoolean(true);
taskId.writeTo(out);
} else {
out.writeBoolean(false);
out.writeVLong(tookInMillis);
out.writeBoolean(isTimeout);
hits.writeTo(out);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
innerToXContent(builder, params);
if (taskId != null) {
builder.field("task", taskId.toString());
} else {
builder.field(TOOK.getPreferredName(), tookInMillis);
builder.field(TIMED_OUT.getPreferredName(), isTimeout);
hits.toXContent(builder, params);
}
return builder.endObject();
}

private XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(TOOK.getPreferredName(), tookInMillis);
builder.field(TIMED_OUT.getPreferredName(), isTimeout);
hits.toXContent(builder, params);
return builder;
public TaskId taskId() {
return taskId;
}

public long took() {
return tookInMillis;
}

public void took(long tookInMillis) {
this.tookInMillis = tookInMillis;
}

public boolean isTimeout() {
return isTimeout;
}

public void isTimeout(boolean isTimeout) {
this.isTimeout = isTimeout;
}

public Hits hits() {
return hits;
}

public void hits(Hits hits) {
if (hits == null) {
this.hits = new Hits((Events)null, null);
} else {
this.hits = hits;
}
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -185,14 +215,15 @@ public boolean equals(Object o) {
return false;
}
EqlSearchResponse that = (EqlSearchResponse) o;
return Objects.equals(hits, that.hits)
return Objects.equals(taskId, that.taskId) &&
Objects.equals(hits, that.hits)
&& Objects.equals(tookInMillis, that.tookInMillis)
&& Objects.equals(isTimeout, that.isTimeout);
}

@Override
public int hashCode() {
return Objects.hash(hits, tookInMillis, isTimeout);
return Objects.hash(taskId, hits, tookInMillis, isTimeout);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,15 @@
*/
package org.elasticsearch.xpack.eql.plugin;

import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;

import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.action.RestResponseListener;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.eql.action.EqlSearchAction;
import org.elasticsearch.xpack.eql.action.EqlSearchRequest;
import org.elasticsearch.xpack.eql.action.EqlSearchResponse;

import java.io.IOException;

Expand All @@ -43,16 +36,10 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
try (XContentParser parser = request.contentOrSourceParamParser()) {
eqlRequest = EqlSearchRequest.fromXContent(parser);
eqlRequest.indices(Strings.splitStringByCommaToArray(request.param("index")));
eqlRequest.waitForCompletion(request.paramAsBoolean("wait_for_completion", true));
}

return channel -> client.execute(EqlSearchAction.INSTANCE, eqlRequest, new RestResponseListener<>(channel) {
@Override
public RestResponse buildResponse(EqlSearchResponse response) throws Exception {
XContentBuilder builder = channel.newBuilder(request.getXContentType(), XContentType.JSON, true);
response.toXContent(builder, request);
return new BytesRestResponse(RestStatus.OK, builder);
}
});
return channel -> client.execute(EqlSearchAction.INSTANCE, eqlRequest, new RestToXContentListener<>(channel));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackSettings;
Expand All @@ -25,6 +27,7 @@
public class TransportEqlSearchAction extends HandledTransportAction<EqlSearchRequest, EqlSearchResponse> {
private final SecurityContext securityContext;
private final ClusterService clusterService;
private final TaskManager taskManager;

@Inject
public TransportEqlSearchAction(Settings settings, ClusterService clusterService, TransportService transportService,
Expand All @@ -34,11 +37,17 @@ public TransportEqlSearchAction(Settings settings, ClusterService clusterService
this.securityContext = XPackSettings.SECURITY_ENABLED.get(settings) ?
new SecurityContext(settings, threadPool.getThreadContext()) : null;
this.clusterService = clusterService;
this.taskManager = transportService.getTaskManager();
}

@Override
protected void doExecute(Task task, EqlSearchRequest request, ActionListener<EqlSearchResponse> listener) {
operation(request, listener);
if (request.waitForCompletion()) {
operation(request, listener);
} else {
// TODO: This is for the test only - we need to create and return another task here during execution
listener.onResponse(new EqlSearchResponse(new TaskId(clusterService.localNode().getId(), task.getId())));
}
}

public static void operation(EqlSearchRequest request, ActionListener<EqlSearchResponse> listener) {
Expand Down
Loading

0 comments on commit f3c73ea

Please sign in to comment.