+
+# end::documentation[]
diff --git a/jetty-core/jetty-server/src/main/config/modules/threadpool-virtual.mod b/jetty-core/jetty-server/src/main/config/modules/threadpool-virtual.mod
index 54a2912b4f13..88ab2a0c2749 100644
--- a/jetty-core/jetty-server/src/main/config/modules/threadpool-virtual.mod
+++ b/jetty-core/jetty-server/src/main/config/modules/threadpool-virtual.mod
@@ -1,5 +1,6 @@
[description]
-Enables and configures the Server ThreadPool with support for virtual threads in Java 21 or later.
+Enables and configures the Server ThreadPool with support for virtual threads to be used for blocking tasks.
+Only supported in Java 21 or later.
[depends]
logging
diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/VirtualThreads.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/VirtualThreads.java
index 8218e6174ba3..08ffd1f000a3 100644
--- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/VirtualThreads.java
+++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/VirtualThreads.java
@@ -32,21 +32,9 @@
public class VirtualThreads
{
private static final Logger LOG = LoggerFactory.getLogger(VirtualThreads.class);
- private static final Executor executor = probeVirtualThreadExecutor();
+ private static final Executor executor = getNamedVirtualThreadsExecutor(null);
private static final Method isVirtualThread = probeIsVirtualThread();
- private static Executor probeVirtualThreadExecutor()
- {
- try
- {
- return (Executor)Executors.class.getMethod("newVirtualThreadPerTaskExecutor").invoke(null);
- }
- catch (Throwable x)
- {
- return null;
- }
- }
-
private static Method probeIsVirtualThread()
{
try
@@ -131,7 +119,8 @@ public static Executor getNamedVirtualThreadsExecutor(String namePrefix)
{
Class> builderClass = Class.forName("java.lang.Thread$Builder");
Object threadBuilder = Thread.class.getMethod("ofVirtual").invoke(null);
- threadBuilder = builderClass.getMethod("name", String.class, long.class).invoke(threadBuilder, namePrefix, 0L);
+ if (StringUtil.isNotBlank(namePrefix))
+ threadBuilder = builderClass.getMethod("name", String.class, long.class).invoke(threadBuilder, namePrefix, 0L);
ThreadFactory factory = (ThreadFactory)builderClass.getMethod("factory").invoke(threadBuilder);
return (Executor)Executors.class.getMethod("newThreadPerTaskExecutor", ThreadFactory.class).invoke(null, factory);
}
diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java
index 2078d92609ea..71ebd2534c50 100644
--- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java
+++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java
@@ -449,7 +449,7 @@ public void setReservedThreads(int reservedThreads)
}
/**
- * @return the name of the this thread pool
+ * @return the name of this thread pool
*/
@ManagedAttribute("name of the thread pool")
public String getName()
@@ -460,7 +460,7 @@ public String getName()
/**
* Sets the name of this thread pool, used as a prefix for the thread names.
*
- * @param name the name of the this thread pool
+ * @param name the name of this thread pool
*/
public void setName(String name)
{
@@ -835,7 +835,7 @@ public void join() throws InterruptedException
while (isStopping())
{
- Thread.sleep(1);
+ Thread.onSpinWait();
}
}
diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/TrackingExecutor.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/TrackingExecutor.java
new file mode 100644
index 000000000000..9714d5490741
--- /dev/null
+++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/TrackingExecutor.java
@@ -0,0 +1,102 @@
+//
+// ========================================================================
+// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
+//
+// This program and the accompanying materials are made available under the
+// terms of the Eclipse Public License v. 2.0 which is available at
+// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+// which is available at https://www.apache.org/licenses/LICENSE-2.0.
+//
+// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+// ========================================================================
+//
+
+package org.eclipse.jetty.util.thread;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+import org.eclipse.jetty.util.annotation.ManagedAttribute;
+import org.eclipse.jetty.util.annotation.ManagedObject;
+import org.eclipse.jetty.util.component.Dumpable;
+
+@ManagedObject("Tracking Executor wrapper")
+public class TrackingExecutor implements Executor, Dumpable
+{
+ private final Executor _threadFactoryExecutor;
+ private final Set _threads = ConcurrentHashMap.newKeySet();
+ private boolean _detailed;
+
+ public TrackingExecutor(Executor executor, boolean detailed)
+ {
+ _threadFactoryExecutor = executor;
+ _detailed = detailed;
+ }
+
+ @Override
+ public void execute(Runnable task)
+ {
+ _threadFactoryExecutor.execute(() ->
+ {
+ Thread thread = Thread.currentThread();
+ try
+ {
+ _threads.add(thread);
+ task.run();
+ }
+ finally
+ {
+ _threads.remove(thread);
+ }
+ });
+ }
+
+ @Override
+ public void dump(Appendable out, String indent) throws IOException
+ {
+ Object[] threads = _threads.stream().map(DumpableThread::new).toArray();
+ Dumpable.dumpObjects(out, indent, _threadFactoryExecutor.toString() + " size=" + threads.length, threads);
+ }
+
+ public void setDetailedDump(boolean detailedDump)
+ {
+ _detailed = detailedDump;
+ }
+
+ @ManagedAttribute("reports additional details in the dump")
+ public boolean isDetailedDump()
+ {
+ return _detailed;
+ }
+
+ public int size()
+ {
+ return _threads.size();
+ }
+
+ private class DumpableThread implements Dumpable
+ {
+ private final Thread _thread;
+
+ private DumpableThread(Thread thread)
+ {
+ _thread = thread;
+ }
+
+ @Override
+ public void dump(Appendable out, String indent) throws IOException
+ {
+ if (_detailed)
+ {
+ Object[] stack = _thread.getStackTrace();
+ Dumpable.dumpObjects(out, indent, _thread.toString(), stack);
+ }
+ else
+ {
+ Dumpable.dumpObject(out, _thread);
+ }
+ }
+ }
+}
diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/VirtualThreadPool.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/VirtualThreadPool.java
new file mode 100644
index 000000000000..bd90805b98da
--- /dev/null
+++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/VirtualThreadPool.java
@@ -0,0 +1,226 @@
+//
+// ========================================================================
+// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
+//
+// This program and the accompanying materials are made available under the
+// terms of the Eclipse Public License v. 2.0 which is available at
+// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+// which is available at https://www.apache.org/licenses/LICENSE-2.0.
+//
+// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+// ========================================================================
+//
+
+package org.eclipse.jetty.util.thread;
+
+import java.util.Objects;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+
+import org.eclipse.jetty.util.StringUtil;
+import org.eclipse.jetty.util.VirtualThreads;
+import org.eclipse.jetty.util.annotation.ManagedAttribute;
+import org.eclipse.jetty.util.annotation.ManagedObject;
+import org.eclipse.jetty.util.component.ContainerLifeCycle;
+import org.eclipse.jetty.util.component.Dumpable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of {@link ThreadPool} interface that does not pool, but instead uses {@link VirtualThreads}.
+ */
+@ManagedObject("A thread non-pool for virtual threads")
+public class VirtualThreadPool extends ContainerLifeCycle implements ThreadPool, Dumpable, TryExecutor, VirtualThreads.Configurable
+{
+ private static final Logger LOG = LoggerFactory.getLogger(VirtualThreadPool.class);
+
+ private final AutoLock.WithCondition _joinLock = new AutoLock.WithCondition();
+ private String _name = null;
+ private Executor _virtualExecutor;
+ private Thread _main;
+ private boolean _externalExecutor;
+ private boolean _tracking;
+ private boolean _detailedDump;
+
+ public VirtualThreadPool()
+ {
+ if (!VirtualThreads.areSupported())
+ throw new IllegalStateException("Virtual Threads not supported");
+ }
+
+ /**
+ * @return the name of this thread pool
+ */
+ @ManagedAttribute("name of the thread pool")
+ public String getName()
+ {
+ return _name;
+ }
+
+ /**
+ * Sets the name of this thread pool, used as a prefix for the thread names.
+ *
+ * @param name the name of this thread pool
+ */
+ public void setName(String name)
+ {
+ if (isRunning())
+ throw new IllegalStateException(getState());
+ if (StringUtil.isBlank(name) && name != null)
+ throw new IllegalArgumentException("Blank name");
+ _name = name;
+ }
+
+ /**
+ * Get if this pool is tracking virtual threads.
+ * @return {@code true} if the virtual threads will be tracked.
+ * @see TrackingExecutor
+ */
+ @ManagedAttribute("virtual threads are tracked")
+ public boolean isTracking()
+ {
+ return _tracking;
+ }
+
+ public void setTracking(boolean tracking)
+ {
+ if (isRunning())
+ throw new IllegalStateException(getState());
+ _tracking = tracking;
+ }
+
+ @ManagedAttribute("reports additional details in the dump")
+ public boolean isDetailedDump()
+ {
+ return _detailedDump;
+ }
+
+ public void setDetailedDump(boolean detailedDump)
+ {
+ _detailedDump = detailedDump;
+ if (_virtualExecutor instanceof TrackingExecutor trackingExecutor)
+ trackingExecutor.setDetailedDump(detailedDump);
+ }
+
+ @Override
+ protected void doStart() throws Exception
+ {
+ _main = new Thread("jetty-virtual-thread-pool-keepalive")
+ {
+ @Override
+ public void run()
+ {
+ try (AutoLock.WithCondition l = _joinLock.lock())
+ {
+ while (isRunning())
+ {
+ l.await();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ _main.start();
+
+ if (_virtualExecutor == null)
+ {
+ _externalExecutor = false;
+ _virtualExecutor = Objects.requireNonNull(StringUtil.isBlank(_name)
+ ? VirtualThreads.getDefaultVirtualThreadsExecutor()
+ : VirtualThreads.getNamedVirtualThreadsExecutor(_name));
+ }
+ if (_tracking && !(_virtualExecutor instanceof TrackingExecutor))
+ _virtualExecutor = new TrackingExecutor(_virtualExecutor, _detailedDump);
+ addBean(_virtualExecutor);
+ super.doStart();
+ }
+
+ @Override
+ protected void doStop() throws Exception
+ {
+ super.doStop();
+ removeBean(_virtualExecutor);
+ if (!_externalExecutor)
+ _virtualExecutor = null;
+ _main = null;
+
+ try (AutoLock.WithCondition l = _joinLock.lock())
+ {
+ l.signalAll();
+ }
+ }
+
+ @Override
+ public Executor getVirtualThreadsExecutor()
+ {
+ return _virtualExecutor;
+ }
+
+ @Override
+ public void setVirtualThreadsExecutor(Executor executor)
+ {
+ if (isRunning())
+ throw new IllegalStateException(getState());
+ _externalExecutor = executor != null;
+ _virtualExecutor = executor;
+ }
+
+ @Override
+ public void join() throws InterruptedException
+ {
+ try (AutoLock.WithCondition l = _joinLock.lock())
+ {
+ while (isRunning())
+ {
+ l.await();
+ }
+ }
+
+ while (isStopping())
+ {
+ Thread.onSpinWait();
+ }
+ }
+
+ @Override
+ public int getThreads()
+ {
+ return _virtualExecutor instanceof TrackingExecutor tracking ? tracking.size() : -1;
+ }
+
+ @Override
+ public int getIdleThreads()
+ {
+ return 0;
+ }
+
+ @Override
+ public boolean isLowOnThreads()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean tryExecute(Runnable task)
+ {
+ try
+ {
+ _virtualExecutor.execute(task);
+ return true;
+ }
+ catch (RejectedExecutionException e)
+ {
+ LOG.warn("tryExecute {} failed", _name, e);
+ }
+ return false;
+ }
+
+ @Override
+ public void execute(Runnable task)
+ {
+ _virtualExecutor.execute(task);
+ }
+}
diff --git a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/VirtualThreadPoolTest.java b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/VirtualThreadPoolTest.java
new file mode 100644
index 000000000000..1f9ef396a77f
--- /dev/null
+++ b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/VirtualThreadPoolTest.java
@@ -0,0 +1,195 @@
+//
+// ========================================================================
+// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
+//
+// This program and the accompanying materials are made available under the
+// terms of the Eclipse Public License v. 2.0 which is available at
+// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+// which is available at https://www.apache.org/licenses/LICENSE-2.0.
+//
+// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+// ========================================================================
+//
+
+package org.eclipse.jetty.util.thread;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.eclipse.jetty.util.StringUtil;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledForJreRange;
+import org.junit.jupiter.api.condition.JRE;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DisabledForJreRange(max = JRE.JAVA_20)
+public class VirtualThreadPoolTest
+{
+ @Test
+ public void testNamed() throws Exception
+ {
+ VirtualThreadPool vtp = new VirtualThreadPool();
+ vtp.setName("namedV");
+ vtp.start();
+
+ CompletableFuture name = new CompletableFuture<>();
+ vtp.execute(() -> name.complete(Thread.currentThread().getName()));
+
+ assertThat(name.get(5, TimeUnit.SECONDS), startsWith("namedV"));
+
+ vtp.stop();
+ }
+
+ @Test
+ public void testJoin() throws Exception
+ {
+ VirtualThreadPool vtp = new VirtualThreadPool();
+ vtp.start();
+
+ CountDownLatch running = new CountDownLatch(1);
+ CountDownLatch joined = new CountDownLatch(1);
+
+ vtp.execute(() ->
+ {
+ try
+ {
+ running.countDown();
+ vtp.join();
+ joined.countDown();
+ }
+ catch (Throwable t)
+ {
+ throw new RuntimeException(t);
+ }
+ });
+
+ assertTrue(running.await(5, TimeUnit.SECONDS));
+ assertThat(joined.getCount(), is(1L));
+ vtp.stop();
+ assertTrue(joined.await(5, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testExecute() throws Exception
+ {
+ VirtualThreadPool vtp = new VirtualThreadPool();
+ vtp.start();
+
+ CountDownLatch ran = new CountDownLatch(1);
+ vtp.execute(ran::countDown);
+ assertTrue(ran.await(5, TimeUnit.SECONDS));
+ vtp.stop();
+ }
+
+ @Test
+ public void testTry() throws Exception
+ {
+ VirtualThreadPool vtp = new VirtualThreadPool();
+ vtp.start();
+
+ CountDownLatch ran = new CountDownLatch(1);
+ assertTrue(vtp.tryExecute(ran::countDown));
+ assertTrue(ran.await(5, TimeUnit.SECONDS));
+ vtp.stop();
+ }
+
+ @Test
+ public void testTrackingDump() throws Exception
+ {
+ VirtualThreadPool vtp = new VirtualThreadPool();
+ vtp.setTracking(true);
+ vtp.start();
+
+ assertThat(vtp.getVirtualThreadsExecutor(), instanceOf(TrackingExecutor.class));
+ TrackingExecutor trackingExecutor = (TrackingExecutor)vtp.getVirtualThreadsExecutor();
+ assertThat(trackingExecutor.size(), is(0));
+
+ CountDownLatch running = new CountDownLatch(4);
+ Waiter waiter = new Waiter(running, false);
+ Waiter spinner = new Waiter(running, true);
+ try
+ {
+ vtp.execute(waiter);
+ vtp.execute(spinner);
+ vtp.execute(waiter);
+ vtp.execute(spinner);
+
+ assertTrue(running.await(5, TimeUnit.SECONDS));
+ assertThat(trackingExecutor.size(), is(4));
+
+ vtp.setDetailedDump(false);
+ String dump = vtp.dump();
+ assertThat(count(dump, "VirtualThread[#"), is(4));
+ assertThat(count(dump, "/runnable@"), is(2));
+ assertThat(count(dump, "waiting"), is(2));
+ assertThat(count(dump, "VirtualThreadPoolTest.java"), is(0));
+
+ vtp.setDetailedDump(true);
+ dump = vtp.dump();
+ assertThat(count(dump, "VirtualThread[#"), is(4));
+ assertThat(count(dump, "/runnable@"), is(2));
+ assertThat(count(dump, "waiting"), is(2));
+ assertThat(count(dump, "VirtualThreadPoolTest.java"), is(4));
+ assertThat(count(dump, "CountDownLatch.await("), is(2));
+ }
+ finally
+ {
+ waiter.countDown();
+ spinner.countDown();
+ vtp.stop();
+ }
+ }
+
+ public static int count(String str, String subStr)
+ {
+ if (StringUtil.isEmpty(str))
+ return 0;
+
+ int count = 0;
+ int idx = 0;
+
+ while ((idx = str.indexOf(subStr, idx)) != -1)
+ {
+ count++;
+ idx += subStr.length();
+ }
+
+ return count;
+ }
+
+ private static class Waiter extends CountDownLatch implements Runnable
+ {
+ private final CountDownLatch _running;
+ private final boolean _spin;
+
+ public Waiter(CountDownLatch running, boolean spin)
+ {
+ super(1);
+ _running = running;
+ _spin = spin;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ _running.countDown();
+ while (_spin && getCount() > 0)
+ Thread.onSpinWait();
+ if (!await(10, TimeUnit.SECONDS))
+ throw new IllegalStateException();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
diff --git a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/AdaptiveExecutionStrategyTest.java b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategyTest.java
similarity index 93%
rename from jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/AdaptiveExecutionStrategyTest.java
rename to jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategyTest.java
index f8874ae2de1b..636622b55468 100644
--- a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/AdaptiveExecutionStrategyTest.java
+++ b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategyTest.java
@@ -11,7 +11,7 @@
// ========================================================================
//
-package org.eclipse.jetty.util.thread;
+package org.eclipse.jetty.util.thread.strategy;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
@@ -20,7 +20,10 @@
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.logging.StacklessLogging;
-import org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy;
+import org.eclipse.jetty.util.thread.ExecutionStrategy;
+import org.eclipse.jetty.util.thread.Invocable;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
diff --git a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecutionStrategyTest.java b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecutionStrategyTest.java
index a12c1f6527df..86f0b1cc726e 100644
--- a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecutionStrategyTest.java
+++ b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecutionStrategyTest.java
@@ -15,6 +15,7 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
@@ -23,10 +24,15 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
+import org.eclipse.jetty.util.VirtualThreads;
+import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.ExecutionStrategy.Producer;
+import org.eclipse.jetty.util.thread.ExecutorThreadPool;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.eclipse.jetty.util.thread.ThreadPool;
+import org.eclipse.jetty.util.thread.VirtualThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
@@ -39,22 +45,27 @@
public class ExecutionStrategyTest
{
- public static Stream strategies()
+ public static Stream pooledStrategies()
{
return Stream.of(
+ QueuedThreadPool.class,
+ ExecutorThreadPool.class,
+ VirtualThreads.getDefaultVirtualThreadsExecutor() == null ? null : VirtualThreadPool.class)
+ .filter(Objects::nonNull)
+ .flatMap(tp -> Stream.of(
ProduceExecuteConsume.class,
ExecuteProduceConsume.class,
- AdaptiveExecutionStrategy.class
- ).map(Arguments::of);
+ AdaptiveExecutionStrategy.class)
+ .map(s -> Arguments.of(tp, s)));
}
- QueuedThreadPool _threads = new QueuedThreadPool(20);
- List strategies = new ArrayList<>();
+ List