From e98891bb17b2e1bd082478bb477c815b85f6bcad Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Mon, 23 Sep 2024 22:17:39 +0200 Subject: [PATCH] Implements per-executor queue limit configuration --- .../jboss/threads/EnhancedQueueExecutor.java | 60 +++++++++--- .../threads/EnhancedQueueExecutorTest.java | 96 ++++++++++++++++++- 2 files changed, 140 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java b/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java index dd80d76..70eb941 100644 --- a/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java +++ b/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java @@ -292,6 +292,12 @@ public final class EnhancedQueueExecutor extends AbstractExecutorService impleme @SuppressWarnings("unused") // used by field updater volatile int peakQueueSize; + // ======================================================= + // Size and queue limit tracking + // ======================================================= + + private final boolean queueLimited; + private final LongAdder submittedTaskCounter = new LongAdder(); private final LongAdder completedTaskCounter = new LongAdder(); private final LongAdder rejectedTaskCounter = new LongAdder(); @@ -423,7 +429,9 @@ private static final class RuntimeFields { // thread stat setThreadStatusPlain(withCoreSize(withMaxSize(withAllowCoreTimeout(0L, builder.allowsCoreThreadTimeOut()), maxSize), coreSize)); timeoutNanos = TimeUtil.clampedPositiveNanos(keepAliveTime); - setQueueSizePlain(withMaxQueueSize(withCurrentQueueSize(0L, 0), builder.getMaximumQueueSize())); + this.queueLimited = builder.getQueueLimited(); + final int maximumQueueSize = getQueueLimited() ? builder.getMaximumQueueSize() : Integer.MAX_VALUE; + setQueueSizePlain(withMaxQueueSize(withCurrentQueueSize(0L, 0), maximumQueueSize)); mxBean = new MXBeanImpl(); if (! DISABLE_MBEAN && builder.isRegisterMBean()) { this.acc = getContext(); @@ -436,6 +444,10 @@ private static final class RuntimeFields { } } + private boolean getQueueLimited() { + return !NO_QUEUE_LIMIT && queueLimited; + } + static final class MBeanRegisterAction implements PrivilegedAction { private final String finalName; private final MXBeanImpl mxBean; @@ -479,6 +491,7 @@ public static final class Builder { private boolean registerMBean = REGISTER_MBEAN; private String mBeanName; private ContextHandler contextHandler = ContextHandler.NONE; + private boolean queueLimited = true; /** * Construct a new instance. @@ -702,7 +715,8 @@ public int getMaximumQueueSize() { /** * Set the maximum queue size. - * This has no impact when {@code jboss.threads.eqe.unlimited-queue} is set. + * This has no impact when {@code jboss.threads.eqe.unlimited-queue} is set or + * {@link #setQueueLimited(boolean)} is set to {@code false}. * * @param maxQueueSize the maximum queue size (must be ≥ 0) * @return this builder @@ -715,6 +729,24 @@ public Builder setMaximumQueueSize(final int maxQueueSize) { return this; } + /** + * @return {@code false} if the queue limit and size tracking are suppressed for performance, {@code true} otherwise + */ + public boolean getQueueLimited() { + return queueLimited; + } + + /** + * It set to {@code false} suppress queue limit and size tracking for performance.
+ * It has no effects if {@code jboss.threads.eqe.unlimited-queue} is set. + * + * @return this builder + */ + public Builder setQueueLimited(final boolean limit) { + this.queueLimited = limit; + return this; + } + /** * Get the handoff executor. * @@ -896,7 +928,7 @@ public List shutdownNow() { if (compareAndSetHead(head, taskNode)) { // save from GC nepotism head.setNextOrdered(head); - if (! NO_QUEUE_LIMIT) decreaseQueueSize(); + if (getQueueLimited()) decreaseQueueSize(); head = taskNode; list.add(taskNode.task.handoff()); } @@ -1366,7 +1398,7 @@ public int getMaximumQueueSize() { public void setMaximumQueueSize(final int maxQueueSize) { Assert.checkMinimumParameter("maxQueueSize", 0, maxQueueSize); Assert.checkMaximumParameter("maxQueueSize", Integer.MAX_VALUE, maxQueueSize); - if (NO_QUEUE_LIMIT) return; + if (!getQueueLimited()) return; long oldVal; do { oldVal = getQueueSizeVolatile(); @@ -1427,10 +1459,11 @@ public void setTerminationTask(final Runnable terminationTask) { /** * Get an estimate of the current queue size. * - * @return an estimate of the current queue size or -1 when {@code jboss.threads.eqe.unlimited-queue} is enabled + * @return an estimate of the current queue size or -1 when {@code jboss.threads.eqe.unlimited-queue} is enabled or + * {@link Builder#setQueueLimited(boolean)} is set to {@code false} */ public int getQueueSize() { - return NO_QUEUE_LIMIT ? -1 : currentQueueSizeOf(getQueueSizeVolatile()); + return !getQueueLimited() ? -1 : currentQueueSizeOf(getQueueSizeVolatile()); } /** @@ -1455,10 +1488,11 @@ public int getActiveCount() { * Get an estimate of the peak size of the queue. * * return an estimate of the peak size of the queue or -1 when {@code jboss.threads.eqe.statistics} - * is disabled or {@code jboss.threads.eqe.unlimited-queue} is enabled + * is disabled or {@code jboss.threads.eqe.unlimited-queue} is enabled or {@link Builder#setQueueLimited(boolean)} + * is set to {@code false} */ public int getLargestQueueSize() { - return UPDATE_STATISTICS && !NO_QUEUE_LIMIT ? peakQueueSize : -1; + return UPDATE_STATISTICS && getQueueLimited() ? peakQueueSize : -1; } /** @@ -1660,7 +1694,7 @@ private QNode getOrAddNode(PoolThreadNode nextPoolThreadNode) { // to save dragging head::next into the old generation. // Clean-up cannot just null out next head.setNextOrdered(head); - if (!NO_QUEUE_LIMIT) decreaseQueueSize(); + if (getQueueLimited()) decreaseQueueSize(); return taskNode; } } else if (headNext instanceof PoolThreadNode || headNext == null) { @@ -1841,7 +1875,7 @@ private int tryExecute(final Task runnable) { } assert tr == AT_NO; // no; try to enqueue - if (!NO_QUEUE_LIMIT && !increaseQueueSize()) { + if (getQueueLimited() && !increaseQueueSize()) { // queue is full // OK last effort to create a thread, disregarding growth limit tr = tryAllocateThread(0.0f); @@ -1875,7 +1909,7 @@ private int tryExecute(final Task runnable) { return EXE_OK; } // we failed; we have to drop the queue size back down again to compensate before we can retry - if (!NO_QUEUE_LIMIT) decreaseQueueSize(); + if (getQueueLimited()) decreaseQueueSize(); } else if (tailNext instanceof PoolThreadNode) { final QNode tailNextNext = tailNext.getNext(); // state change ex1: @@ -2567,11 +2601,11 @@ public int getLargestQueueSize() { } public boolean isQueueBounded() { - return ! NO_QUEUE_LIMIT; + return getQueueLimited(); } public boolean isQueueSizeModifiable() { - return ! NO_QUEUE_LIMIT; + return getQueueLimited(); } public boolean isShutdown() { diff --git a/src/test/java/org/jboss/threads/EnhancedQueueExecutorTest.java b/src/test/java/org/jboss/threads/EnhancedQueueExecutorTest.java index 562c25d..56778c8 100644 --- a/src/test/java/org/jboss/threads/EnhancedQueueExecutorTest.java +++ b/src/test/java/org/jboss/threads/EnhancedQueueExecutorTest.java @@ -1,12 +1,10 @@ package org.jboss.threads; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - import java.time.Duration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -14,6 +12,11 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + public class EnhancedQueueExecutorTest { private int coreSize = 3; private int maxSize = coreSize * 2; @@ -41,6 +44,93 @@ public void run() { } } + @Test + public void testMaximumQueueSize() throws InterruptedException { + var builder = (new EnhancedQueueExecutor.Builder()) + .setMaximumQueueSize(1) + .setCorePoolSize(1) + .setMaximumPoolSize(1); + assertTrue(builder.getQueueLimited()); + var executor = builder.build(); + CountDownLatch executeEnqueuedTask = new CountDownLatch(1); + AtomicInteger count = new AtomicInteger(); + CountDownLatch enqueuedTask = new CountDownLatch(1); + CountDownLatch executedEnqueuedTask = new CountDownLatch(1); + executor.execute(() -> { + count.incrementAndGet(); + try { + enqueuedTask.countDown(); + executeEnqueuedTask.await(); + } catch (InterruptedException ignored) { + } + }); + enqueuedTask.await(); + assertEquals(1, count.get()); + assertEquals(0, executor.getQueueSize()); + // this is going to cause the queue size to be == 1 + executor.execute(() -> { + count.incrementAndGet(); + executedEnqueuedTask.countDown(); + }); + assertEquals(1, count.get()); + assertEquals(1, executor.getQueueSize()); + try { + executor.execute(count::incrementAndGet); + fail("Expected RejectedExecutionException"); + } catch (RejectedExecutionException e) { + // expected + } + assertEquals(1, count.get()); + assertEquals(1, executor.getQueueSize()); + executeEnqueuedTask.countDown(); + executedEnqueuedTask.await(); + assertEquals(2, count.get()); + assertEquals(0, executor.getQueueSize()); + executor.shutdown(); + } + + @Test + public void testNoQueueLimit() throws InterruptedException { + var builder = (new EnhancedQueueExecutor.Builder()) + .setQueueLimited(false) + .setMaximumQueueSize(1) + .setCorePoolSize(1) + .setMaximumPoolSize(1); + assertFalse(builder.getQueueLimited()); + var executor = builder.build(); + assertEquals(Integer.MAX_VALUE, executor.getMaximumQueueSize()); + CountDownLatch executeEnqueuedTasks = new CountDownLatch(1); + AtomicInteger count = new AtomicInteger(); + CountDownLatch enqueuedTask = new CountDownLatch(1); + CountDownLatch executedEnqueuedTasks = new CountDownLatch(2); + executor.execute(() -> { + count.incrementAndGet(); + try { + enqueuedTask.countDown(); + executeEnqueuedTasks.await(); + } catch (InterruptedException ignored) { + } + }); + enqueuedTask.await(); + executor.execute(() -> { + count.incrementAndGet(); + executedEnqueuedTasks.countDown(); + }); + assertEquals(1, count.get()); + assertEquals(-1, executor.getQueueSize()); + executor.execute(() -> { + count.incrementAndGet(); + executedEnqueuedTasks.countDown(); + }); + assertEquals(1, count.get()); + assertEquals(-1, executor.getQueueSize()); + executeEnqueuedTasks.countDown(); + executedEnqueuedTasks.await(); + assertEquals(3, count.get()); + assertEquals(-1, executor.getQueueSize()); + executor.shutdown(); + } + /** * Test that unused threads are being reused. Scenario: *