Skip to content

Commit

Permalink
Implements per-executor queue limit configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
franz1981 committed Sep 25, 2024
1 parent 9be3116 commit e98891b
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 16 deletions.
60 changes: 47 additions & 13 deletions src/main/java/org/jboss/threads/EnhancedQueueExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,12 @@ public final class EnhancedQueueExecutor extends AbstractExecutorService impleme
@SuppressWarnings("unused") // used by field updater
volatile int peakQueueSize;

// =======================================================
// Size and queue limit tracking
// =======================================================

private final boolean queueLimited;

private final LongAdder submittedTaskCounter = new LongAdder();
private final LongAdder completedTaskCounter = new LongAdder();
private final LongAdder rejectedTaskCounter = new LongAdder();
Expand Down Expand Up @@ -423,7 +429,9 @@ private static final class RuntimeFields {
// thread stat
setThreadStatusPlain(withCoreSize(withMaxSize(withAllowCoreTimeout(0L, builder.allowsCoreThreadTimeOut()), maxSize), coreSize));
timeoutNanos = TimeUtil.clampedPositiveNanos(keepAliveTime);
setQueueSizePlain(withMaxQueueSize(withCurrentQueueSize(0L, 0), builder.getMaximumQueueSize()));
this.queueLimited = builder.getQueueLimited();
final int maximumQueueSize = getQueueLimited() ? builder.getMaximumQueueSize() : Integer.MAX_VALUE;
setQueueSizePlain(withMaxQueueSize(withCurrentQueueSize(0L, 0), maximumQueueSize));
mxBean = new MXBeanImpl();
if (! DISABLE_MBEAN && builder.isRegisterMBean()) {
this.acc = getContext();
Expand All @@ -436,6 +444,10 @@ private static final class RuntimeFields {
}
}

private boolean getQueueLimited() {
return !NO_QUEUE_LIMIT && queueLimited;
}

static final class MBeanRegisterAction implements PrivilegedAction<ObjectInstance> {
private final String finalName;
private final MXBeanImpl mxBean;
Expand Down Expand Up @@ -479,6 +491,7 @@ public static final class Builder {
private boolean registerMBean = REGISTER_MBEAN;
private String mBeanName;
private ContextHandler<?> contextHandler = ContextHandler.NONE;
private boolean queueLimited = true;

/**
* Construct a new instance.
Expand Down Expand Up @@ -702,7 +715,8 @@ public int getMaximumQueueSize() {

/**
* Set the maximum queue size.
* This has no impact when {@code jboss.threads.eqe.unlimited-queue} is set.
* This has no impact when {@code jboss.threads.eqe.unlimited-queue} is set or
* {@link #setQueueLimited(boolean)} is set to {@code false}.
*
* @param maxQueueSize the maximum queue size (must be ≥ 0)
* @return this builder
Expand All @@ -715,6 +729,24 @@ public Builder setMaximumQueueSize(final int maxQueueSize) {
return this;
}

/**
* @return {@code false} if the queue limit and size tracking are suppressed for performance, {@code true} otherwise
*/
public boolean getQueueLimited() {
return queueLimited;
}

/**
* It set to {@code false} suppress queue limit and size tracking for performance.<br>
* It has no effects if {@code jboss.threads.eqe.unlimited-queue} is set.
*
* @return this builder
*/
public Builder setQueueLimited(final boolean limit) {
this.queueLimited = limit;
return this;
}

/**
* Get the handoff executor.
*
Expand Down Expand Up @@ -896,7 +928,7 @@ public List<Runnable> shutdownNow() {
if (compareAndSetHead(head, taskNode)) {
// save from GC nepotism
head.setNextOrdered(head);
if (! NO_QUEUE_LIMIT) decreaseQueueSize();
if (getQueueLimited()) decreaseQueueSize();
head = taskNode;
list.add(taskNode.task.handoff());
}
Expand Down Expand Up @@ -1366,7 +1398,7 @@ public int getMaximumQueueSize() {
public void setMaximumQueueSize(final int maxQueueSize) {
Assert.checkMinimumParameter("maxQueueSize", 0, maxQueueSize);
Assert.checkMaximumParameter("maxQueueSize", Integer.MAX_VALUE, maxQueueSize);
if (NO_QUEUE_LIMIT) return;
if (!getQueueLimited()) return;
long oldVal;
do {
oldVal = getQueueSizeVolatile();
Expand Down Expand Up @@ -1427,10 +1459,11 @@ public void setTerminationTask(final Runnable terminationTask) {
/**
* Get an estimate of the current queue size.
*
* @return an estimate of the current queue size or -1 when {@code jboss.threads.eqe.unlimited-queue} is enabled
* @return an estimate of the current queue size or -1 when {@code jboss.threads.eqe.unlimited-queue} is enabled or
* {@link Builder#setQueueLimited(boolean)} is set to {@code false}
*/
public int getQueueSize() {
return NO_QUEUE_LIMIT ? -1 : currentQueueSizeOf(getQueueSizeVolatile());
return !getQueueLimited() ? -1 : currentQueueSizeOf(getQueueSizeVolatile());
}

/**
Expand All @@ -1455,10 +1488,11 @@ public int getActiveCount() {
* Get an estimate of the peak size of the queue.
*
* return an estimate of the peak size of the queue or -1 when {@code jboss.threads.eqe.statistics}
* is disabled or {@code jboss.threads.eqe.unlimited-queue} is enabled
* is disabled or {@code jboss.threads.eqe.unlimited-queue} is enabled or {@link Builder#setQueueLimited(boolean)}
* is set to {@code false}
*/
public int getLargestQueueSize() {
return UPDATE_STATISTICS && !NO_QUEUE_LIMIT ? peakQueueSize : -1;
return UPDATE_STATISTICS && getQueueLimited() ? peakQueueSize : -1;
}

/**
Expand Down Expand Up @@ -1660,7 +1694,7 @@ private QNode getOrAddNode(PoolThreadNode nextPoolThreadNode) {
// to save dragging head::next into the old generation.
// Clean-up cannot just null out next
head.setNextOrdered(head);
if (!NO_QUEUE_LIMIT) decreaseQueueSize();
if (getQueueLimited()) decreaseQueueSize();
return taskNode;
}
} else if (headNext instanceof PoolThreadNode || headNext == null) {
Expand Down Expand Up @@ -1841,7 +1875,7 @@ private int tryExecute(final Task runnable) {
}
assert tr == AT_NO;
// no; try to enqueue
if (!NO_QUEUE_LIMIT && !increaseQueueSize()) {
if (getQueueLimited() && !increaseQueueSize()) {
// queue is full
// OK last effort to create a thread, disregarding growth limit
tr = tryAllocateThread(0.0f);
Expand Down Expand Up @@ -1875,7 +1909,7 @@ private int tryExecute(final Task runnable) {
return EXE_OK;
}
// we failed; we have to drop the queue size back down again to compensate before we can retry
if (!NO_QUEUE_LIMIT) decreaseQueueSize();
if (getQueueLimited()) decreaseQueueSize();
} else if (tailNext instanceof PoolThreadNode) {
final QNode tailNextNext = tailNext.getNext();
// state change ex1:
Expand Down Expand Up @@ -2567,11 +2601,11 @@ public int getLargestQueueSize() {
}

public boolean isQueueBounded() {
return ! NO_QUEUE_LIMIT;
return getQueueLimited();
}

public boolean isQueueSizeModifiable() {
return ! NO_QUEUE_LIMIT;
return getQueueLimited();
}

public boolean isShutdown() {
Expand Down
96 changes: 93 additions & 3 deletions src/test/java/org/jboss/threads/EnhancedQueueExecutorTest.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
package org.jboss.threads;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

public class EnhancedQueueExecutorTest {
private int coreSize = 3;
private int maxSize = coreSize * 2;
Expand Down Expand Up @@ -41,6 +44,93 @@ public void run() {
}
}

@Test
public void testMaximumQueueSize() throws InterruptedException {
var builder = (new EnhancedQueueExecutor.Builder())
.setMaximumQueueSize(1)
.setCorePoolSize(1)
.setMaximumPoolSize(1);
assertTrue(builder.getQueueLimited());
var executor = builder.build();
CountDownLatch executeEnqueuedTask = new CountDownLatch(1);
AtomicInteger count = new AtomicInteger();
CountDownLatch enqueuedTask = new CountDownLatch(1);
CountDownLatch executedEnqueuedTask = new CountDownLatch(1);
executor.execute(() -> {
count.incrementAndGet();
try {
enqueuedTask.countDown();
executeEnqueuedTask.await();
} catch (InterruptedException ignored) {
}
});
enqueuedTask.await();
assertEquals(1, count.get());
assertEquals(0, executor.getQueueSize());
// this is going to cause the queue size to be == 1
executor.execute(() -> {
count.incrementAndGet();
executedEnqueuedTask.countDown();
});
assertEquals(1, count.get());
assertEquals(1, executor.getQueueSize());
try {
executor.execute(count::incrementAndGet);
fail("Expected RejectedExecutionException");
} catch (RejectedExecutionException e) {
// expected
}
assertEquals(1, count.get());
assertEquals(1, executor.getQueueSize());
executeEnqueuedTask.countDown();
executedEnqueuedTask.await();
assertEquals(2, count.get());
assertEquals(0, executor.getQueueSize());
executor.shutdown();
}

@Test
public void testNoQueueLimit() throws InterruptedException {
var builder = (new EnhancedQueueExecutor.Builder())
.setQueueLimited(false)
.setMaximumQueueSize(1)
.setCorePoolSize(1)
.setMaximumPoolSize(1);
assertFalse(builder.getQueueLimited());
var executor = builder.build();
assertEquals(Integer.MAX_VALUE, executor.getMaximumQueueSize());
CountDownLatch executeEnqueuedTasks = new CountDownLatch(1);
AtomicInteger count = new AtomicInteger();
CountDownLatch enqueuedTask = new CountDownLatch(1);
CountDownLatch executedEnqueuedTasks = new CountDownLatch(2);
executor.execute(() -> {
count.incrementAndGet();
try {
enqueuedTask.countDown();
executeEnqueuedTasks.await();
} catch (InterruptedException ignored) {
}
});
enqueuedTask.await();
executor.execute(() -> {
count.incrementAndGet();
executedEnqueuedTasks.countDown();
});
assertEquals(1, count.get());
assertEquals(-1, executor.getQueueSize());
executor.execute(() -> {
count.incrementAndGet();
executedEnqueuedTasks.countDown();
});
assertEquals(1, count.get());
assertEquals(-1, executor.getQueueSize());
executeEnqueuedTasks.countDown();
executedEnqueuedTasks.await();
assertEquals(3, count.get());
assertEquals(-1, executor.getQueueSize());
executor.shutdown();
}

/**
* Test that unused threads are being reused. Scenario:
* <ul>
Expand Down

0 comments on commit e98891b

Please sign in to comment.