Skip to content

Commit

Permalink
Remove one level of pointer-chasing from thread body
Browse files Browse the repository at this point in the history
  • Loading branch information
dmlloyd committed Sep 26, 2024
1 parent c38841c commit 7099668
Showing 1 changed file with 110 additions and 113 deletions.
223 changes: 110 additions & 113 deletions src/main/java/org/jboss/threads/EnhancedQueueExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -1581,133 +1581,134 @@ final class ThreadBody implements Runnable {
* exit it is removed.
*/
public void run() {
final Thread currentThread = Thread.currentThread();
final LongAdder spinMisses = EnhancedQueueExecutor.this.spinMisses;
runningThreads.add(currentThread);
runningThreads.add(currentThread());

// run the initial task
nullToNop(getAndClearInitialTask()).run();

// Eagerly allocate a PoolThreadNode for the next time it's needed
PoolThreadNode nextPoolThreadNode = new PoolThreadNode(currentThread);
// main loop
processingQueue: for (;;) {
TaskNode head;
QNode headNext;
for (;;) {
head = getHead();
headNext = head.getNext();
// headNext == head can happen if another consumer has already consumed head:
// retry with a fresh head
if (headNext != head) {
if (headNext == null || headNext instanceof PoolThreadNode) {
nextPoolThreadNode.setNextRelaxed(headNext);
if (head.compareAndSetNext(headNext, nextPoolThreadNode)) {
// pool thread node was added
final PoolThreadNode newNode = nextPoolThreadNode;
// at this point, we are registered into the queue
long start = System.nanoTime();
long elapsed = 0L;
waitingForTask: for (;;) {
Runnable task = newNode.getTask();
assert task != ACCEPTED && task != GAVE_UP && task != null;
if (task != WAITING && task != EXIT) {
if (newNode.compareAndSetTask(task, ACCEPTED)) {
// we have a task to run, so run it and then abandon the node
task.run();
// nextPoolThreadNode has been added to the queue, a new node is required for next time.
nextPoolThreadNode = new PoolThreadNode(currentThread);
// rerun outer
continue processingQueue;
}
// we had a task to run, but we failed to CAS it for some reason, so retry
if (UPDATE_STATISTICS) spinMisses.increment();
continue waitingForTask;
} else {
final long timeoutNanos = EnhancedQueueExecutor.this.timeoutNanos;
long oldVal = getThreadStatus();
if (elapsed >= timeoutNanos || task == EXIT || currentSizeOf(oldVal) > maxSizeOf(oldVal)) {
// try to exit this thread, if we are allowed
if (task == EXIT ||
isShutdownRequested(oldVal) ||
isAllowCoreTimeout(oldVal) ||
currentSizeOf(oldVal) > coreSizeOf(oldVal)
) {
if (newNode.compareAndSetTask(task, GAVE_UP)) {
for (;;) {
if (tryDeallocateThread(oldVal)) {
// clear to exit.
runningThreads.remove(currentThread);
return;
}
if (UPDATE_STATISTICS) spinMisses.increment();
oldVal = getThreadStatus();
Runnable initial = initialTask;
if (initial != null) {
this.initialTask = null;
initial.run();
}

runThreadBody();
}
}

private void runThreadBody() {
final LongAdder spinMisses = this.spinMisses;
// Eagerly allocate a PoolThreadNode for the next time it's needed
PoolThreadNode nextPoolThreadNode = new PoolThreadNode(currentThread());
// main loop
processingQueue: for (;;) {
TaskNode head;
QNode headNext;
for (;;) {
head = getHead();
headNext = head.getNext();
// headNext == head can happen if another consumer has already consumed head:
// retry with a fresh head
if (headNext != head) {
if (headNext == null || headNext instanceof PoolThreadNode) {
nextPoolThreadNode.setNextRelaxed(headNext);
if (head.compareAndSetNext(headNext, nextPoolThreadNode)) {
// pool thread node was added
final PoolThreadNode newNode = nextPoolThreadNode;
// at this point, we are registered into the queue
long start = System.nanoTime();
long elapsed = 0L;
waitingForTask: for (;;) {
Runnable task = newNode.getTask();
assert task != ACCEPTED && task != GAVE_UP && task != null;
if (task != WAITING && task != EXIT) {
if (newNode.compareAndSetTask(task, ACCEPTED)) {
// we have a task to run, so run it and then abandon the node
task.run();
// nextPoolThreadNode has been added to the queue, a new node is required for next time.
nextPoolThreadNode = new PoolThreadNode(currentThread());
// rerun outer
continue processingQueue;
}
// we had a task to run, but we failed to CAS it for some reason, so retry
if (UPDATE_STATISTICS) spinMisses.increment();
continue waitingForTask;
} else {
final long timeoutNanos = EnhancedQueueExecutor.this.timeoutNanos;
long oldVal = getThreadStatus();
if (elapsed >= timeoutNanos || task == EXIT || currentSizeOf(oldVal) > maxSizeOf(oldVal)) {
// try to exit this thread, if we are allowed
if (task == EXIT ||
isShutdownRequested(oldVal) ||
isAllowCoreTimeout(oldVal) ||
currentSizeOf(oldVal) > coreSizeOf(oldVal)
) {
if (newNode.compareAndSetTask(task, GAVE_UP)) {
for (;;) {
if (tryDeallocateThread(oldVal)) {
// clear to exit.
runningThreads.remove(currentThread());
return;
}
//throw Assert.unreachableCode();
if (UPDATE_STATISTICS) spinMisses.increment();
oldVal = getThreadStatus();
}
continue waitingForTask;
} else {
if (elapsed >= timeoutNanos) {
newNode.park(EnhancedQueueExecutor.this);
} else {
newNode.park(EnhancedQueueExecutor.this, timeoutNanos - elapsed);
}
Thread.interrupted();
elapsed = System.nanoTime() - start;
// retry inner
continue waitingForTask;
//throw Assert.unreachableCode();
}
//throw Assert.unreachableCode();
continue waitingForTask;
} else {
assert task == WAITING;
newNode.park(EnhancedQueueExecutor.this, timeoutNanos - elapsed);
if (elapsed >= timeoutNanos) {
newNode.park(EnhancedQueueExecutor.this);
} else {
newNode.park(EnhancedQueueExecutor.this, timeoutNanos - elapsed);
}
Thread.interrupted();
elapsed = System.nanoTime() - start;
// retry inner
continue waitingForTask;
}
//throw Assert.unreachableCode();
} else {
assert task == WAITING;
newNode.park(EnhancedQueueExecutor.this, timeoutNanos - elapsed);
Thread.interrupted();
elapsed = System.nanoTime() - start;
// retry inner
continue waitingForTask;
}
//throw Assert.unreachableCode();
} // :waitingForTask
}
//throw Assert.unreachableCode();
} else if (headNext != null) {
// GC Nepotism:
// save dragging headNext into old generation
// (although being a PoolThreadNode it won't make a big difference)
nextPoolThreadNode.setNextRelaxed(null);
}
} else if (headNext instanceof TaskNode taskNode) {
if (compareAndSetHead(head, taskNode)) {
// save from GC Nepotism: generational GCs don't like
// cross-generational references, so better to "clean-up" head::next
// to save dragging head::next into the old generation.
// Clean-up cannot just null out next
head.setNextOrdered(head);
if (getQueueLimited()) decreaseQueueSize();
// task node was removed
taskNode.getAndClearTask().run();
continue;
}
} else {
assert headNext instanceof TerminateWaiterNode;
// we're shutting down!
runningThreads.remove(currentThread);
deallocateThread();
return;
} // :waitingForTask
//throw Assert.unreachableCode();
} else if (headNext != null) {
// GC Nepotism:
// save dragging headNext into old generation
// (although being a PoolThreadNode it won't make a big difference)
nextPoolThreadNode.setNextRelaxed(null);
}
} else if (headNext instanceof TaskNode taskNode) {
if (compareAndSetHead(head, taskNode)) {
// save from GC Nepotism: generational GCs don't like
// cross-generational references, so better to "clean-up" head::next
// to save dragging head::next into the old generation.
// Clean-up cannot just null out next
head.setNextOrdered(head);
if (getQueueLimited()) decreaseQueueSize();
// task node was removed
taskNode.getAndClearTask().run();
continue;
}
} else {
assert headNext instanceof TerminateWaiterNode;
// we're shutting down!
runningThreads.remove(currentThread());
deallocateThread();
return;
}
if (UPDATE_STATISTICS) EnhancedQueueExecutor.this.spinMisses.increment();
}
} // :processingQueue
//throw Assert.unreachableCode();
}

private Runnable getAndClearInitialTask() {
Runnable initial = initialTask;
this.initialTask = null;
return initial;
}
if (UPDATE_STATISTICS) EnhancedQueueExecutor.this.spinMisses.increment();
}
} // :processingQueue
//throw Assert.unreachableCode();
}

// =======================================================
Expand Down Expand Up @@ -2283,10 +2284,6 @@ void rejectShutdown(final Task task) {
}
}

static Runnable nullToNop(final Runnable task) {
return task == null ? NullRunnable.getInstance() : task;
}

// =======================================================
// Node classes
// =======================================================
Expand Down

0 comments on commit 7099668

Please sign in to comment.