Skip to content

Commit

Permalink
Dispatch TransportSingleShardAction response handling (elastic#113630)
Browse files Browse the repository at this point in the history
We shouldn't be handling these responses on the transport worker.

Closes elastic#110408
  • Loading branch information
DaveCTurner authored Oct 2, 2024
1 parent 6e06b9d commit baef138
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolStats;

import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -121,8 +122,32 @@ private void assertThreadPoolsBlocked() {
() -> client().prepareIndex(USER_INDEX).setSource(Map.of("foo", "bar")).get()
);
assertThat(e1.getMessage(), startsWith("rejected execution of TimedRunnable"));
var e2 = expectThrows(EsRejectedExecutionException.class, () -> client().prepareGet(USER_INDEX, "id").get());
assertThat(e2.getMessage(), startsWith("rejected execution of ActionRunnable"));

final var getFuture = client().prepareGet(USER_INDEX, "id").execute();
// response handling is force-executed on GET pool, so we must
// (a) wait for that task to be enqueued, expanding the queue beyond its configured limit, and
// (b) check for the exception in the background

try {
assertTrue(waitUntil(() -> {
if (getFuture.isDone()) {
return true;
}
for (ThreadPool threadPool : internalCluster().getInstances(ThreadPool.class)) {
for (ThreadPoolStats.Stats stats : threadPool.stats().stats()) {
if (stats.name().equals(ThreadPool.Names.GET) && stats.queue() > 1) {
return true;
}
}
}
return false;
}));
} catch (Exception e) {
fail(e);
}

new Thread(() -> expectThrows(EsRejectedExecutionException.class, () -> getFuture.actionGet(SAFE_AWAIT_TIMEOUT))).start();

// intentionally commented out this test until https://github.com/elastic/elasticsearch/issues/97916 is fixed
// var e3 = expectThrows(
// SearchPhaseExecutionException.class,
Expand Down
Loading

0 comments on commit baef138

Please sign in to comment.