From 205675dbcdcb1e6ae3ddeab533022943a9ac8345 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 2 Dec 2024 12:52:49 +0100 Subject: [PATCH] Fix race in AbstractSearchAsyncAction request throttling (#116264) (#117638) We had a race here where the non-blocking pending execution would be starved of executing threads. This happened when all the current holders of permits from the semaphore would release their permit after a producer thread failed to acquire a permit and then enqueued its task. => need to peek the queue again after releasing the permit and try to acquire a new permit if there's work left to be done to avoid this scenario. --- .../search/AbstractSearchAsyncAction.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index d62f16079088f..f48194a6fa78d 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -49,8 +49,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; -import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -792,7 +792,7 @@ protected final ShardSearchRequest buildShardSearchRequest(SearchShardIterator s private static final class PendingExecutions { private final Semaphore semaphore; - private final LinkedTransferQueue> queue = new LinkedTransferQueue<>(); + private final ConcurrentLinkedQueue> queue = new ConcurrentLinkedQueue<>(); PendingExecutions(int permits) { assert permits > 0 : "not enough permits: " + permits; @@ -811,11 +811,10 @@ void submit(Consumer task) { } } } - } private void executeAndRelease(Consumer task) { - while (task != null) { + do { final SubscribableListener onDone = new SubscribableListener<>(); task.accept(() -> onDone.onResponse(null)); if (onDone.isDone()) { @@ -838,13 +837,21 @@ public void onFailure(Exception e) { }); return; } - } + } while (task != null); } private Consumer pollNextTaskOrReleasePermit() { var task = queue.poll(); if (task == null) { semaphore.release(); + while (queue.peek() != null && semaphore.tryAcquire()) { + task = queue.poll(); + if (task == null) { + semaphore.release(); + } else { + return task; + } + } } return task; }