diff --git a/modules/kibana/src/internalClusterTest/java/org/elasticsearch/kibana/KibanaThreadPoolIT.java b/modules/kibana/src/internalClusterTest/java/org/elasticsearch/kibana/KibanaThreadPoolIT.java index 0a13f002bb649..fa529fe07673e 100644 --- a/modules/kibana/src/internalClusterTest/java/org/elasticsearch/kibana/KibanaThreadPoolIT.java +++ b/modules/kibana/src/internalClusterTest/java/org/elasticsearch/kibana/KibanaThreadPoolIT.java @@ -23,6 +23,7 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPoolStats; import java.util.Arrays; import java.util.Collection; @@ -121,8 +122,32 @@ private void assertThreadPoolsBlocked() { () -> client().prepareIndex(USER_INDEX).setSource(Map.of("foo", "bar")).get() ); assertThat(e1.getMessage(), startsWith("rejected execution of TimedRunnable")); - var e2 = expectThrows(EsRejectedExecutionException.class, () -> client().prepareGet(USER_INDEX, "id").get()); - assertThat(e2.getMessage(), startsWith("rejected execution of ActionRunnable")); + + final var getFuture = client().prepareGet(USER_INDEX, "id").execute(); + // response handling is force-executed on GET pool, so we must + // (a) wait for that task to be enqueued, expanding the queue beyond its configured limit, and + // (b) check for the exception in the background + + try { + assertTrue(waitUntil(() -> { + if (getFuture.isDone()) { + return true; + } + for (ThreadPool threadPool : internalCluster().getInstances(ThreadPool.class)) { + for (ThreadPoolStats.Stats stats : threadPool.stats().stats()) { + if (stats.name().equals(ThreadPool.Names.GET) && stats.queue() > 1) { + return true; + } + } + } + return false; + })); + } catch (Exception e) { + fail(e); + } + + new Thread(() -> expectThrows(EsRejectedExecutionException.class, () -> getFuture.actionGet(SAFE_AWAIT_TIMEOUT))).start(); + // intentionally commented out this test until https://github.com/elastic/elasticsearch/issues/97916 is fixed // var e3 = expectThrows( // SearchPhaseExecutionException.class, diff --git a/server/src/internalClusterTest/java/org/elasticsearch/get/GetActionIT.java b/server/src/internalClusterTest/java/org/elasticsearch/get/GetActionIT.java index d14d302fab495..6c7754932af68 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/get/GetActionIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/get/GetActionIT.java @@ -10,6 +10,8 @@ package org.elasticsearch.get; import org.apache.lucene.index.DirectoryReader; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.bulk.BulkRequestBuilder; @@ -37,6 +39,7 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalSettingsPlugin; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.xcontent.XContentType; import org.junit.Before; @@ -50,6 +53,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.function.UnaryOperator; import static java.util.Collections.singleton; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -104,25 +109,27 @@ public void testSimpleGet() { ); ensureGreen(); - GetResponse response = client().prepareGet(indexOrAlias(), "1").get(); + final Function, GetResponse> docGetter = op -> getDocument(indexOrAlias(), "1", op); + + GetResponse response = docGetter.apply(UnaryOperator.identity()); assertThat(response.isExists(), equalTo(false)); logger.info("--> index doc 1"); prepareIndex("test").setId("1").setSource("field1", "value1", "field2", "value2").get(); logger.info("--> non realtime get 1"); - response = client().prepareGet(indexOrAlias(), "1").setRealtime(false).get(); + response = docGetter.apply(r -> r.setRealtime(false)); assertThat(response.isExists(), equalTo(false)); logger.info("--> realtime get 1"); - response = client().prepareGet(indexOrAlias(), "1").get(); + response = docGetter.apply(UnaryOperator.identity()); assertThat(response.isExists(), equalTo(true)); assertThat(response.getIndex(), equalTo("test")); assertThat(response.getSourceAsMap().get("field1").toString(), equalTo("value1")); assertThat(response.getSourceAsMap().get("field2").toString(), equalTo("value2")); logger.info("--> realtime get 1 (no source, implicit)"); - response = client().prepareGet(indexOrAlias(), "1").setStoredFields(Strings.EMPTY_ARRAY).get(); + response = docGetter.apply(r -> r.setStoredFields(Strings.EMPTY_ARRAY)); assertThat(response.isExists(), equalTo(true)); assertThat(response.getIndex(), equalTo("test")); Set fields = new HashSet<>(response.getFields().keySet()); @@ -130,7 +137,7 @@ public void testSimpleGet() { assertThat(response.getSourceAsBytesRef(), nullValue()); logger.info("--> realtime get 1 (no source, explicit)"); - response = client().prepareGet(indexOrAlias(), "1").setFetchSource(false).get(); + response = docGetter.apply(r -> r.setFetchSource(false)); assertThat(response.isExists(), equalTo(true)); assertThat(response.getIndex(), equalTo("test")); fields = new HashSet<>(response.getFields().keySet()); @@ -138,14 +145,14 @@ public void testSimpleGet() { assertThat(response.getSourceAsBytesRef(), nullValue()); logger.info("--> realtime get 1 (no type)"); - response = client().prepareGet(indexOrAlias(), "1").get(); + response = docGetter.apply(UnaryOperator.identity()); assertThat(response.isExists(), equalTo(true)); assertThat(response.getIndex(), equalTo("test")); assertThat(response.getSourceAsMap().get("field1").toString(), equalTo("value1")); assertThat(response.getSourceAsMap().get("field2").toString(), equalTo("value2")); logger.info("--> realtime fetch of field"); - response = client().prepareGet(indexOrAlias(), "1").setStoredFields("field1").get(); + response = docGetter.apply(r -> r.setStoredFields("field1")); assertThat(response.isExists(), equalTo(true)); assertThat(response.getIndex(), equalTo("test")); assertThat(response.getSourceAsBytesRef(), nullValue()); @@ -153,7 +160,7 @@ public void testSimpleGet() { assertThat(response.getField("field2"), nullValue()); logger.info("--> realtime fetch of field & source"); - response = client().prepareGet(indexOrAlias(), "1").setStoredFields("field1").setFetchSource("field1", null).get(); + response = docGetter.apply(r -> r.setStoredFields("field1").setFetchSource("field1", null)); assertThat(response.isExists(), equalTo(true)); assertThat(response.getIndex(), equalTo("test")); assertThat(response.getSourceAsMap(), hasKey("field1")); @@ -162,7 +169,7 @@ public void testSimpleGet() { assertThat(response.getField("field2"), nullValue()); logger.info("--> realtime get 1"); - response = client().prepareGet(indexOrAlias(), "1").get(); + response = docGetter.apply(UnaryOperator.identity()); assertThat(response.isExists(), equalTo(true)); assertThat(response.getIndex(), equalTo("test")); assertThat(response.getSourceAsMap().get("field1").toString(), equalTo("value1")); @@ -172,14 +179,14 @@ public void testSimpleGet() { refresh(); logger.info("--> non realtime get 1 (loaded from index)"); - response = client().prepareGet(indexOrAlias(), "1").setRealtime(false).get(); + response = docGetter.apply(r -> r.setRealtime(false)); assertThat(response.isExists(), equalTo(true)); assertThat(response.getIndex(), equalTo("test")); assertThat(response.getSourceAsMap().get("field1").toString(), equalTo("value1")); assertThat(response.getSourceAsMap().get("field2").toString(), equalTo("value2")); logger.info("--> realtime fetch of field (loaded from index)"); - response = client().prepareGet(indexOrAlias(), "1").setStoredFields("field1").get(); + response = docGetter.apply(r -> r.setStoredFields("field1")); assertThat(response.isExists(), equalTo(true)); assertThat(response.getIndex(), equalTo("test")); assertThat(response.getSourceAsBytesRef(), nullValue()); @@ -187,7 +194,7 @@ public void testSimpleGet() { assertThat(response.getField("field2"), nullValue()); logger.info("--> realtime fetch of field & source (loaded from index)"); - response = client().prepareGet(indexOrAlias(), "1").setStoredFields("field1").setFetchSource(true).get(); + response = docGetter.apply(r -> r.setStoredFields("field1").setFetchSource(true)); assertThat(response.isExists(), equalTo(true)); assertThat(response.getIndex(), equalTo("test")); assertThat(response.getSourceAsBytesRef(), not(nullValue())); @@ -198,7 +205,7 @@ public void testSimpleGet() { prepareIndex("test").setId("1").setSource("field1", "value1_1", "field2", "value2_1").get(); logger.info("--> realtime get 1"); - response = client().prepareGet(indexOrAlias(), "1").get(); + response = docGetter.apply(UnaryOperator.identity()); assertThat(response.isExists(), equalTo(true)); assertThat(response.getIndex(), equalTo("test")); assertThat(response.getSourceAsMap().get("field1").toString(), equalTo("value1_1")); @@ -207,7 +214,7 @@ public void testSimpleGet() { logger.info("--> update doc 1 again"); prepareIndex("test").setId("1").setSource("field1", "value1_2", "field2", "value2_2").get(); - response = client().prepareGet(indexOrAlias(), "1").get(); + response = docGetter.apply(UnaryOperator.identity()); assertThat(response.isExists(), equalTo(true)); assertThat(response.getIndex(), equalTo("test")); assertThat(response.getSourceAsMap().get("field1").toString(), equalTo("value1_2")); @@ -216,7 +223,7 @@ public void testSimpleGet() { DeleteResponse deleteResponse = client().prepareDelete("test", "1").get(); assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult()); - response = client().prepareGet(indexOrAlias(), "1").get(); + response = docGetter.apply(UnaryOperator.identity()); assertThat(response.isExists(), equalTo(false)); } @@ -232,14 +239,34 @@ public void testGetWithAliasPointingToMultipleIndices() { DocWriteResponse indexResponse = prepareIndex("index1").setId("id").setSource(Collections.singletonMap("foo", "bar")).get(); assertThat(indexResponse.status().getStatus(), equalTo(RestStatus.CREATED.getStatus())); - IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, client().prepareGet("alias1", "_alias_id")); - assertThat(exception.getMessage(), endsWith("can't execute a single index op")); + assertThat( + asInstanceOf(IllegalArgumentException.class, getDocumentFailure("alias1", "_alias_id", r -> r)).getMessage(), + endsWith("can't execute a single index op") + ); } static String indexOrAlias() { return randomBoolean() ? "test" : "alias"; } + private static GetResponse getDocument(String index, String id, UnaryOperator requestOperator) { + return safeAwait(l -> getDocumentAsync(index, id, requestOperator, l)); + } + + private static Throwable getDocumentFailure(String index, String id, UnaryOperator requestOperator) { + return ExceptionsHelper.unwrapCause(safeAwaitFailure(GetResponse.class, l -> getDocumentAsync(index, id, requestOperator, l))); + } + + private static void getDocumentAsync( + String index, + String id, + UnaryOperator requestOperator, + ActionListener listener + ) { + requestOperator.apply(client().prepareGet(index, id)) + .execute(ActionListener.runBefore(listener, () -> ThreadPool.assertCurrentThreadPool(ThreadPool.Names.GET))); + } + public void testSimpleMultiGet() throws Exception { assertAcked( prepareCreate("test").addAlias(new Alias("alias").writeIndex(randomFrom(true, false, null))) @@ -311,13 +338,14 @@ public void testGetDocWithMultivaluedFields() throws Exception { assertAcked(prepareCreate("test").setMapping(mapping1)); ensureGreen(); - GetResponse response = client().prepareGet("test", "1").get(); - assertThat(response.isExists(), equalTo(false)); + final Function, GetResponse> docGetter = op -> getDocument("test", "1", op); + + GetResponse response = docGetter.apply(UnaryOperator.identity()); assertThat(response.isExists(), equalTo(false)); prepareIndex("test").setId("1").setSource(jsonBuilder().startObject().array("field", "1", "2").endObject()).get(); - response = client().prepareGet("test", "1").setStoredFields("field").get(); + response = docGetter.apply(r -> r.setStoredFields("field")); assertThat(response.isExists(), equalTo(true)); assertThat(response.getId(), equalTo("1")); Set fields = new HashSet<>(response.getFields().keySet()); @@ -328,7 +356,7 @@ public void testGetDocWithMultivaluedFields() throws Exception { // Now test values being fetched from stored fields. refresh(); - response = client().prepareGet("test", "1").setStoredFields("field").get(); + response = docGetter.apply(r -> r.setStoredFields("field")); assertThat(response.isExists(), equalTo(true)); assertThat(response.getId(), equalTo("1")); fields = new HashSet<>(response.getFields().keySet()); @@ -342,7 +370,9 @@ public void testGetWithVersion() { assertAcked(prepareCreate("test").addAlias(new Alias("alias")).setSettings(Settings.builder().put("index.refresh_interval", -1))); ensureGreen(); - GetResponse response = client().prepareGet("test", "1").get(); + final Function, GetResponse> docGetter = op -> getDocument(indexOrAlias(), "1", op); + + GetResponse response = docGetter.apply(UnaryOperator.identity()); assertThat(response.isExists(), equalTo(false)); logger.info("--> index doc 1"); @@ -350,64 +380,52 @@ public void testGetWithVersion() { // From translog: - response = client().prepareGet(indexOrAlias(), "1").setVersion(Versions.MATCH_ANY).get(); + response = docGetter.apply(r -> r.setVersion(Versions.MATCH_ANY)); assertThat(response.isExists(), equalTo(true)); assertThat(response.getId(), equalTo("1")); assertThat(response.getVersion(), equalTo(1L)); - response = client().prepareGet(indexOrAlias(), "1").setVersion(1).get(); + response = docGetter.apply(r -> r.setVersion(1)); assertThat(response.isExists(), equalTo(true)); assertThat(response.getId(), equalTo("1")); assertThat(response.getVersion(), equalTo(1L)); - try { - client().prepareGet(indexOrAlias(), "1").setVersion(2).get(); - fail(); - } catch (VersionConflictEngineException e) { - // all good - } + assertThat(getDocumentFailure(indexOrAlias(), "1", r -> r.setVersion(2)), instanceOf(VersionConflictEngineException.class)); // From Lucene index: refresh(); - response = client().prepareGet(indexOrAlias(), "1").setVersion(Versions.MATCH_ANY).setRealtime(false).get(); + response = docGetter.apply(r -> r.setVersion(Versions.MATCH_ANY).setRealtime(false)); assertThat(response.isExists(), equalTo(true)); assertThat(response.getId(), equalTo("1")); assertThat(response.getIndex(), equalTo("test")); assertThat(response.getVersion(), equalTo(1L)); - response = client().prepareGet(indexOrAlias(), "1").setVersion(1).setRealtime(false).get(); + response = docGetter.apply(r -> r.setVersion(1).setRealtime(false)); assertThat(response.isExists(), equalTo(true)); assertThat(response.getId(), equalTo("1")); assertThat(response.getIndex(), equalTo("test")); assertThat(response.getVersion(), equalTo(1L)); - try { - client().prepareGet(indexOrAlias(), "1").setVersion(2).setRealtime(false).get(); - fail(); - } catch (VersionConflictEngineException e) { - // all good - } + assertThat( + getDocumentFailure(indexOrAlias(), "1", r -> r.setVersion(2).setRealtime(false)), + instanceOf(VersionConflictEngineException.class) + ); logger.info("--> index doc 1 again, so increasing the version"); prepareIndex("test").setId("1").setSource("field1", "value1", "field2", "value2").get(); // From translog: - response = client().prepareGet(indexOrAlias(), "1").setVersion(Versions.MATCH_ANY).get(); + response = docGetter.apply(r -> r.setVersion(Versions.MATCH_ANY)); assertThat(response.isExists(), equalTo(true)); assertThat(response.getId(), equalTo("1")); assertThat(response.getIndex(), equalTo("test")); assertThat(response.getVersion(), equalTo(2L)); - try { - client().prepareGet(indexOrAlias(), "1").setVersion(1).get(); - fail(); - } catch (VersionConflictEngineException e) { - // all good - } + assertThat(getDocumentFailure(indexOrAlias(), "1", r -> r.setVersion(1)), instanceOf(VersionConflictEngineException.class)); - response = client().prepareGet(indexOrAlias(), "1").setVersion(2).get(); + response = docGetter.apply(r -> r.setVersion(2)); assertThat(response.isExists(), equalTo(true)); assertThat(response.getId(), equalTo("1")); assertThat(response.getIndex(), equalTo("test")); @@ -416,20 +434,18 @@ public void testGetWithVersion() { // From Lucene index: refresh(); - response = client().prepareGet(indexOrAlias(), "1").setVersion(Versions.MATCH_ANY).setRealtime(false).get(); + response = docGetter.apply(r -> r.setVersion(Versions.MATCH_ANY).setRealtime(false)); assertThat(response.isExists(), equalTo(true)); assertThat(response.getId(), equalTo("1")); assertThat(response.getIndex(), equalTo("test")); assertThat(response.getVersion(), equalTo(2L)); - try { - client().prepareGet(indexOrAlias(), "1").setVersion(1).setRealtime(false).get(); - fail(); - } catch (VersionConflictEngineException e) { - // all good - } + assertThat( + getDocumentFailure(indexOrAlias(), "1", r -> r.setVersion(1).setRealtime(false)), + instanceOf(VersionConflictEngineException.class) + ); - response = client().prepareGet(indexOrAlias(), "1").setVersion(2).setRealtime(false).get(); + response = docGetter.apply(r -> r.setVersion(2).setRealtime(false)); assertThat(response.isExists(), equalTo(true)); assertThat(response.getId(), equalTo("1")); assertThat(response.getIndex(), equalTo("test")); @@ -572,15 +588,15 @@ public void testGetFieldsNonLeafField() throws Exception { .setSource(jsonBuilder().startObject().startObject("field1").field("field2", "value1").endObject().endObject()) .get(); - IllegalArgumentException exc = expectThrows( + IllegalArgumentException exc = asInstanceOf( IllegalArgumentException.class, - client().prepareGet(indexOrAlias(), "1").setStoredFields("field1") + getDocumentFailure(indexOrAlias(), "1", r -> r.setStoredFields("field1")) ); assertThat(exc.getMessage(), equalTo("field [field1] isn't a leaf field")); flush(); - exc = expectThrows(IllegalArgumentException.class, client().prepareGet(indexOrAlias(), "1").setStoredFields("field1")); + exc = asInstanceOf(IllegalArgumentException.class, getDocumentFailure(indexOrAlias(), "1", r -> r.setStoredFields("field1"))); assertThat(exc.getMessage(), equalTo("field [field1] isn't a leaf field")); } @@ -650,13 +666,13 @@ public void testGetFieldsComplexField() throws Exception { logger.info("checking real time retrieval"); String field = "field1.field2.field3.field4"; - GetResponse getResponse = client().prepareGet("my-index", "1").setStoredFields(field).get(); + GetResponse getResponse = getDocument("my-index", "1", r -> r.setStoredFields(field)); assertThat(getResponse.isExists(), equalTo(true)); assertThat(getResponse.getField(field).getValues().size(), equalTo(2)); assertThat(getResponse.getField(field).getValues().get(0).toString(), equalTo("value1")); assertThat(getResponse.getField(field).getValues().get(1).toString(), equalTo("value2")); - getResponse = client().prepareGet("my-index", "1").setStoredFields(field).get(); + getResponse = getDocument("my-index", "1", r -> r.setStoredFields(field)); assertThat(getResponse.isExists(), equalTo(true)); assertThat(getResponse.getField(field).getValues().size(), equalTo(2)); assertThat(getResponse.getField(field).getValues().get(0).toString(), equalTo("value1")); @@ -681,7 +697,7 @@ public void testGetFieldsComplexField() throws Exception { logger.info("checking post-flush retrieval"); - getResponse = client().prepareGet("my-index", "1").setStoredFields(field).get(); + getResponse = getDocument("my-index", "1", r -> r.setStoredFields(field)); assertThat(getResponse.isExists(), equalTo(true)); assertThat(getResponse.getField(field).getValues().size(), equalTo(2)); assertThat(getResponse.getField(field).getValues().get(0).toString(), equalTo("value1")); @@ -854,7 +870,7 @@ public void testAvoidWrappingSearcherInMultiGet() { // start tracking translog locations in the live version map { index("test", "0", Map.of("f", "empty")); - client().prepareGet("test", "0").setRealtime(true).get(); + getDocument("test", "0", r -> r.setRealtime(true)); refresh("test"); } Map indexedDocs = new HashMap<>(); @@ -909,7 +925,7 @@ public void testAvoidWrappingSearcherInMultiGet() { } public void testGetRemoteIndex() { - IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, client().prepareGet("cluster:index", "id")); + IllegalArgumentException iae = asInstanceOf(IllegalArgumentException.class, getDocumentFailure("cluster:index", "id", r -> r)); assertEquals( "Cross-cluster calls are not supported in this context but remote indices were requested: [cluster:index]", iae.getMessage() @@ -984,10 +1000,12 @@ private GetResponse multiGetDocument(String index, String docId, String field, @ } private GetResponse getDocument(String index, String docId, String field, @Nullable String routing) { - GetRequestBuilder getRequestBuilder = client().prepareGet().setIndex(index).setId(docId).setStoredFields(field); - if (routing != null) { - getRequestBuilder.setRouting(routing); - } - return getRequestBuilder.get(); + return getDocument(index, docId, r -> { + r.setStoredFields(field); + if (routing != null) { + r.setRouting(routing); + } + return r; + }); } } diff --git a/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java b/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java index c82efa9a422aa..1f9b1b4a95cab 100644 --- a/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java @@ -37,7 +37,6 @@ import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequestHandler; -import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import java.io.IOException; @@ -183,7 +182,7 @@ public void start() { clusterService.localNode(), transportShardAction, internalRequest.request(), - new ActionListenerResponseHandler<>(listener, reader, TransportResponseHandler.TRANSPORT_WORKER) + new ActionListenerResponseHandler<>(listener, reader, executor) ); } else { perform(null); @@ -236,7 +235,7 @@ private void perform(@Nullable final Exception currentFailure) { node, transportShardAction, internalRequest.request(), - new ActionListenerResponseHandler<>(listener, reader, TransportResponseHandler.TRANSPORT_WORKER) { + new ActionListenerResponseHandler<>(listener, reader, executor) { @Override public void handleException(TransportException exp) { onFailure(shardRouting, exp);