From e82c1bcecac590e44a51ed351fb461c8aa2b3a27 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Wed, 13 May 2020 11:27:59 -0400 Subject: [PATCH] EQL: Switch to RestCancellableNodeClient in EQL search Switches to RestCancellableNodeClient wrapper for eql search operation in order to detect clients closing the connection and cancelling the operation. Relates to #49638 --- x-pack/plugin/eql/build.gradle | 2 + .../AbstractEqlBlockingIntegTestCase.java | 21 ++- .../eql/action/RestEqlCancellationIT.java | 147 ++++++++++++++++++ .../xpack/eql/plugin/RestEqlSearchAction.java | 20 ++- 4 files changed, 181 insertions(+), 9 deletions(-) create mode 100644 x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/RestEqlCancellationIT.java diff --git a/x-pack/plugin/eql/build.gradle b/x-pack/plugin/eql/build.gradle index abdd75c13d555..53582002b653f 100644 --- a/x-pack/plugin/eql/build.gradle +++ b/x-pack/plugin/eql/build.gradle @@ -41,6 +41,8 @@ dependencies { testCompile project(path: ':modules:reindex', configuration: 'runtime') testCompile project(path: ':modules:parent-join', configuration: 'runtime') testCompile project(path: ':modules:analysis-common', configuration: 'runtime') + testCompile project(path: ':modules:transport-netty4', configuration: 'runtime') // for http in RestEqlCancellationIT + testCompile project(path: ':plugins:transport-nio', configuration: 'runtime') // for http in RestEqlCancellationIT } diff --git a/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AbstractEqlBlockingIntegTestCase.java b/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AbstractEqlBlockingIntegTestCase.java index e8e9f7aa5ebac..e35c4738b4b36 100644 --- a/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AbstractEqlBlockingIntegTestCase.java +++ b/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AbstractEqlBlockingIntegTestCase.java @@ -64,8 +64,18 @@ protected List initBlockFactory(boolean searchBlock, boolean } protected void disableBlocks(List plugins) { + disableFieldCapBlocks(plugins); + disableSearchBlocks(plugins); + } + + protected void disableSearchBlocks(List plugins) { for (SearchBlockPlugin plugin : plugins) { plugin.disableSearchBlock(); + } + } + + protected void disableFieldCapBlocks(List plugins) { + for (SearchBlockPlugin plugin : plugins) { plugin.disableFieldCapBlock(); } } @@ -198,10 +208,19 @@ protected Collection> nodePlugins() { } protected TaskId findTaskWithXOpaqueId(String id, String action) { + TaskInfo taskInfo = getTaskInfoWithXOpaqueId(id, action); + if (taskInfo != null) { + return taskInfo.getTaskId(); + } else { + return null; + } + } + + protected TaskInfo getTaskInfoWithXOpaqueId(String id, String action) { ListTasksResponse tasks = client().admin().cluster().prepareListTasks().setActions(action).get(); for (TaskInfo task : tasks.getTasks()) { if (id.equals(task.getHeaders().get(Task.X_OPAQUE_ID))) { - return task.getTaskId(); + return task; } } return null; diff --git a/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/RestEqlCancellationIT.java b/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/RestEqlCancellationIT.java new file mode 100644 index 0000000000000..dab26dfa2dbd0 --- /dev/null +++ b/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/RestEqlCancellationIT.java @@ -0,0 +1,147 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.eql.action; + +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.client.Cancellable; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseListener; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.Netty4Plugin; +import org.elasticsearch.transport.nio.NioTransportPlugin; +import org.junit.BeforeClass; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class RestEqlCancellationIT extends AbstractEqlBlockingIntegTestCase { + + private static String nodeHttpTypeKey; + + @SuppressWarnings("unchecked") + @BeforeClass + public static void setUpTransport() { + nodeHttpTypeKey = getHttpTypeKey(randomFrom(Netty4Plugin.class, NioTransportPlugin.class)); + } + + @Override + protected boolean addMockHttpTransport() { + return false; // enable http + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(NetworkModule.HTTP_TYPE_KEY, nodeHttpTypeKey).build(); + } + + private static String getHttpTypeKey(Class clazz) { + if (clazz.equals(NioTransportPlugin.class)) { + return NioTransportPlugin.NIO_HTTP_TRANSPORT_NAME; + } else { + assert clazz.equals(Netty4Plugin.class); + return Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME; + } + } + + @Override + protected Collection> nodePlugins() { + List> plugins = new ArrayList<>(super.nodePlugins()); + plugins.add(getTestTransportPlugin()); + plugins.add(Netty4Plugin.class); + plugins.add(NioTransportPlugin.class); + return plugins; + } + + public void testRestCancellation() throws Exception { + assertAcked(client().admin().indices().prepareCreate("test") + .setMapping("val", "type=integer", "event_type", "type=keyword", "@timestamp", "type=date") + .get()); + createIndex("idx_unmapped"); + + int numDocs = randomIntBetween(6, 20); + + List builders = new ArrayList<>(); + + for (int i = 0; i < numDocs; i++) { + int fieldValue = randomIntBetween(0, 10); + builders.add(client().prepareIndex("test").setSource( + jsonBuilder().startObject() + .field("val", fieldValue).field("event_type", "my_event").field("@timestamp", "2020-04-09T12:35:48Z") + .endObject())); + } + + indexRandom(true, builders); + + // We are cancelling during both mapping and searching but we cancel during mapping so we should never reach the second block + List plugins = initBlockFactory(true, true); + org.elasticsearch.client.eql.EqlSearchRequest eqlSearchRequest = + new org.elasticsearch.client.eql.EqlSearchRequest("test", "my_event where val=1").eventCategoryField("event_type"); + String id = randomAlphaOfLength(10); + + Request request = new Request("GET", "/test/_eql/search"); + request.setJsonEntity(Strings.toString(eqlSearchRequest)); + request.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader(Task.X_OPAQUE_ID, id)); + logger.trace("Preparing search"); + + CountDownLatch latch = new CountDownLatch(1); + AtomicReference error = new AtomicReference<>(); + Cancellable cancellable = getRestClient().performRequestAsync(request, new ResponseListener() { + @Override + public void onSuccess(Response response) { + latch.countDown(); + } + + @Override + public void onFailure(Exception exception) { + error.set(exception); + latch.countDown(); + } + }); + + logger.trace("Waiting for block to be established"); + awaitForBlockedFieldCaps(plugins); + logger.trace("Block is established"); + assertThat(getTaskInfoWithXOpaqueId(id, EqlSearchAction.NAME), notNullValue()); + cancellable.cancel(); + logger.trace("Request is cancelled"); + disableFieldCapBlocks(plugins); + // The task should be cancelled before ever reaching search blocks + assertBusy(() -> { + assertThat(getTaskInfoWithXOpaqueId(id, EqlSearchAction.NAME), nullValue()); + }); + // Make sure it didn't reach search blocks + assertThat(getNumberOfContexts(plugins), equalTo(0)); + disableSearchBlocks(plugins); + + latch.await(); + assertThat(error.get(), instanceOf(CancellationException.class)); + } + + @Override + protected boolean ignoreExternalCluster() { + return true; + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/RestEqlSearchAction.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/RestEqlSearchAction.java index dfe2e8feefe5b..19352d4b364cc 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/RestEqlSearchAction.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/RestEqlSearchAction.java @@ -16,6 +16,7 @@ import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestCancellableNodeClient; import org.elasticsearch.rest.action.RestResponseListener; import org.elasticsearch.xpack.eql.action.EqlSearchAction; import org.elasticsearch.xpack.eql.action.EqlSearchRequest; @@ -56,14 +57,17 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli eqlRequest.keepOnCompletion(request.paramAsBoolean("keep_on_completion", eqlRequest.keepOnCompletion())); } - return channel -> client.execute(EqlSearchAction.INSTANCE, eqlRequest, new RestResponseListener<>(channel) { - @Override - public RestResponse buildResponse(EqlSearchResponse response) throws Exception { - XContentBuilder builder = channel.newBuilder(request.getXContentType(), XContentType.JSON, true); - response.toXContent(builder, request); - return new BytesRestResponse(RestStatus.OK, builder); - } - }); + return channel -> { + RestCancellableNodeClient cancellableClient = new RestCancellableNodeClient(client, request.getHttpChannel()); + cancellableClient.execute(EqlSearchAction.INSTANCE, eqlRequest, new RestResponseListener<>(channel) { + @Override + public RestResponse buildResponse(EqlSearchResponse response) throws Exception { + XContentBuilder builder = channel.newBuilder(request.getXContentType(), XContentType.JSON, true); + response.toXContent(builder, request); + return new BytesRestResponse(RestStatus.OK, builder); + } + }); + }; } @Override