diff --git a/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java b/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java index 1fc4d60..6fee98e 100644 --- a/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java +++ b/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java @@ -1581,133 +1581,134 @@ final class ThreadBody implements Runnable { * exit it is removed. */ public void run() { - final Thread currentThread = Thread.currentThread(); - final LongAdder spinMisses = EnhancedQueueExecutor.this.spinMisses; - runningThreads.add(currentThread); + runningThreads.add(currentThread()); // run the initial task - nullToNop(getAndClearInitialTask()).run(); - - // Eagerly allocate a PoolThreadNode for the next time it's needed - PoolThreadNode nextPoolThreadNode = new PoolThreadNode(currentThread); - // main loop - processingQueue: for (;;) { - TaskNode head; - QNode headNext; - for (;;) { - head = getHead(); - headNext = head.getNext(); - // headNext == head can happen if another consumer has already consumed head: - // retry with a fresh head - if (headNext != head) { - if (headNext == null || headNext instanceof PoolThreadNode) { - nextPoolThreadNode.setNextRelaxed(headNext); - if (head.compareAndSetNext(headNext, nextPoolThreadNode)) { - // pool thread node was added - final PoolThreadNode newNode = nextPoolThreadNode; - // at this point, we are registered into the queue - long start = System.nanoTime(); - long elapsed = 0L; - waitingForTask: for (;;) { - Runnable task = newNode.getTask(); - assert task != ACCEPTED && task != GAVE_UP && task != null; - if (task != WAITING && task != EXIT) { - if (newNode.compareAndSetTask(task, ACCEPTED)) { - // we have a task to run, so run it and then abandon the node - task.run(); - // nextPoolThreadNode has been added to the queue, a new node is required for next time. - nextPoolThreadNode = new PoolThreadNode(currentThread); - // rerun outer - continue processingQueue; - } - // we had a task to run, but we failed to CAS it for some reason, so retry - if (UPDATE_STATISTICS) spinMisses.increment(); - continue waitingForTask; - } else { - final long timeoutNanos = EnhancedQueueExecutor.this.timeoutNanos; - long oldVal = getThreadStatus(); - if (elapsed >= timeoutNanos || task == EXIT || currentSizeOf(oldVal) > maxSizeOf(oldVal)) { - // try to exit this thread, if we are allowed - if (task == EXIT || - isShutdownRequested(oldVal) || - isAllowCoreTimeout(oldVal) || - currentSizeOf(oldVal) > coreSizeOf(oldVal) - ) { - if (newNode.compareAndSetTask(task, GAVE_UP)) { - for (;;) { - if (tryDeallocateThread(oldVal)) { - // clear to exit. - runningThreads.remove(currentThread); - return; - } - if (UPDATE_STATISTICS) spinMisses.increment(); - oldVal = getThreadStatus(); + Runnable initial = initialTask; + if (initial != null) { + this.initialTask = null; + initial.run(); + } + + runThreadBody(); + } + } + + private void runThreadBody() { + final LongAdder spinMisses = this.spinMisses; + // Eagerly allocate a PoolThreadNode for the next time it's needed + PoolThreadNode nextPoolThreadNode = new PoolThreadNode(currentThread()); + // main loop + processingQueue: for (;;) { + TaskNode head; + QNode headNext; + for (;;) { + head = getHead(); + headNext = head.getNext(); + // headNext == head can happen if another consumer has already consumed head: + // retry with a fresh head + if (headNext != head) { + if (headNext == null || headNext instanceof PoolThreadNode) { + nextPoolThreadNode.setNextRelaxed(headNext); + if (head.compareAndSetNext(headNext, nextPoolThreadNode)) { + // pool thread node was added + final PoolThreadNode newNode = nextPoolThreadNode; + // at this point, we are registered into the queue + long start = System.nanoTime(); + long elapsed = 0L; + waitingForTask: for (;;) { + Runnable task = newNode.getTask(); + assert task != ACCEPTED && task != GAVE_UP && task != null; + if (task != WAITING && task != EXIT) { + if (newNode.compareAndSetTask(task, ACCEPTED)) { + // we have a task to run, so run it and then abandon the node + task.run(); + // nextPoolThreadNode has been added to the queue, a new node is required for next time. + nextPoolThreadNode = new PoolThreadNode(currentThread()); + // rerun outer + continue processingQueue; + } + // we had a task to run, but we failed to CAS it for some reason, so retry + if (UPDATE_STATISTICS) spinMisses.increment(); + continue waitingForTask; + } else { + final long timeoutNanos = EnhancedQueueExecutor.this.timeoutNanos; + long oldVal = getThreadStatus(); + if (elapsed >= timeoutNanos || task == EXIT || currentSizeOf(oldVal) > maxSizeOf(oldVal)) { + // try to exit this thread, if we are allowed + if (task == EXIT || + isShutdownRequested(oldVal) || + isAllowCoreTimeout(oldVal) || + currentSizeOf(oldVal) > coreSizeOf(oldVal) + ) { + if (newNode.compareAndSetTask(task, GAVE_UP)) { + for (;;) { + if (tryDeallocateThread(oldVal)) { + // clear to exit. + runningThreads.remove(currentThread()); + return; } - //throw Assert.unreachableCode(); + if (UPDATE_STATISTICS) spinMisses.increment(); + oldVal = getThreadStatus(); } - continue waitingForTask; - } else { - if (elapsed >= timeoutNanos) { - newNode.park(EnhancedQueueExecutor.this); - } else { - newNode.park(EnhancedQueueExecutor.this, timeoutNanos - elapsed); - } - Thread.interrupted(); - elapsed = System.nanoTime() - start; - // retry inner - continue waitingForTask; + //throw Assert.unreachableCode(); } - //throw Assert.unreachableCode(); + continue waitingForTask; } else { - assert task == WAITING; - newNode.park(EnhancedQueueExecutor.this, timeoutNanos - elapsed); + if (elapsed >= timeoutNanos) { + newNode.park(EnhancedQueueExecutor.this); + } else { + newNode.park(EnhancedQueueExecutor.this, timeoutNanos - elapsed); + } Thread.interrupted(); elapsed = System.nanoTime() - start; // retry inner continue waitingForTask; } //throw Assert.unreachableCode(); + } else { + assert task == WAITING; + newNode.park(EnhancedQueueExecutor.this, timeoutNanos - elapsed); + Thread.interrupted(); + elapsed = System.nanoTime() - start; + // retry inner + continue waitingForTask; } //throw Assert.unreachableCode(); - } // :waitingForTask + } //throw Assert.unreachableCode(); - } else if (headNext != null) { - // GC Nepotism: - // save dragging headNext into old generation - // (although being a PoolThreadNode it won't make a big difference) - nextPoolThreadNode.setNextRelaxed(null); - } - } else if (headNext instanceof TaskNode taskNode) { - if (compareAndSetHead(head, taskNode)) { - // save from GC Nepotism: generational GCs don't like - // cross-generational references, so better to "clean-up" head::next - // to save dragging head::next into the old generation. - // Clean-up cannot just null out next - head.setNextOrdered(head); - if (getQueueLimited()) decreaseQueueSize(); - // task node was removed - taskNode.getAndClearTask().run(); - continue; - } - } else { - assert headNext instanceof TerminateWaiterNode; - // we're shutting down! - runningThreads.remove(currentThread); - deallocateThread(); - return; + } // :waitingForTask + //throw Assert.unreachableCode(); + } else if (headNext != null) { + // GC Nepotism: + // save dragging headNext into old generation + // (although being a PoolThreadNode it won't make a big difference) + nextPoolThreadNode.setNextRelaxed(null); } + } else if (headNext instanceof TaskNode taskNode) { + if (compareAndSetHead(head, taskNode)) { + // save from GC Nepotism: generational GCs don't like + // cross-generational references, so better to "clean-up" head::next + // to save dragging head::next into the old generation. + // Clean-up cannot just null out next + head.setNextOrdered(head); + if (getQueueLimited()) decreaseQueueSize(); + // task node was removed + taskNode.getAndClearTask().run(); + continue; + } + } else { + assert headNext instanceof TerminateWaiterNode; + // we're shutting down! + runningThreads.remove(currentThread()); + deallocateThread(); + return; } - if (UPDATE_STATISTICS) EnhancedQueueExecutor.this.spinMisses.increment(); } - } // :processingQueue - //throw Assert.unreachableCode(); - } - - private Runnable getAndClearInitialTask() { - Runnable initial = initialTask; - this.initialTask = null; - return initial; - } + if (UPDATE_STATISTICS) EnhancedQueueExecutor.this.spinMisses.increment(); + } + } // :processingQueue + //throw Assert.unreachableCode(); } // ======================================================= @@ -2283,10 +2284,6 @@ void rejectShutdown(final Task task) { } } - static Runnable nullToNop(final Runnable task) { - return task == null ? NullRunnable.getInstance() : task; - } - // ======================================================= // Node classes // =======================================================