diff --git a/extensions/virtual-threads/deployment/src/main/java/io/quarkus/virtual/threads/VirtualThreadsProcessor.java b/extensions/virtual-threads/deployment/src/main/java/io/quarkus/virtual/threads/VirtualThreadsProcessor.java index 136bfa1f2bff8..762a777c536fc 100644 --- a/extensions/virtual-threads/deployment/src/main/java/io/quarkus/virtual/threads/VirtualThreadsProcessor.java +++ b/extensions/virtual-threads/deployment/src/main/java/io/quarkus/virtual/threads/VirtualThreadsProcessor.java @@ -1,10 +1,17 @@ package io.quarkus.virtual.threads; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; + +import io.quarkus.arc.deployment.SyntheticBeanBuildItem; +import io.quarkus.arc.processor.BuiltinScope; +import io.quarkus.deployment.annotations.BuildProducer; import io.quarkus.deployment.annotations.BuildStep; import io.quarkus.deployment.annotations.ExecutionTime; import io.quarkus.deployment.annotations.Record; import io.quarkus.deployment.builditem.LaunchModeBuildItem; import io.quarkus.deployment.builditem.ShutdownContextBuildItem; +import io.smallrye.common.annotation.Identifier; public class VirtualThreadsProcessor { @@ -12,8 +19,25 @@ public class VirtualThreadsProcessor { @Record(ExecutionTime.STATIC_INIT) public void setup(VirtualThreadsConfig config, VirtualThreadsRecorder recorder, ShutdownContextBuildItem shutdownContextBuildItem, - LaunchModeBuildItem launchModeBuildItem) { + LaunchModeBuildItem launchModeBuildItem, + BuildProducer producer) { recorder.setupVirtualThreads(config, shutdownContextBuildItem, launchModeBuildItem.getLaunchMode()); + producer.produce( + SyntheticBeanBuildItem.configure(VirtualThreadsExecutor.class) + .scope(BuiltinScope.APPLICATION.getInfo()) + .unremovable() + .setRuntimeInit() + .supplier(recorder.getCurrentSupplier()) + .done()); + producer.produce( + SyntheticBeanBuildItem.configure(VirtualThreadsExecutor.class) + .types(ExecutorService.class, Executor.class) + .addQualifier().annotation(Identifier.class).addValue("value", "virtual-threads").done() + .scope(BuiltinScope.APPLICATION.getInfo()) + .unremovable() + .setRuntimeInit() + .supplier(recorder.getCurrentSupplier()) + .done()); } } diff --git a/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/ContextPreservingExecutorService.java b/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/ContextPreservingExecutorService.java index 09499bb96f67f..76d165e916bf3 100644 --- a/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/ContextPreservingExecutorService.java +++ b/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/ContextPreservingExecutorService.java @@ -15,7 +15,7 @@ /** * Delegating executor service implementation preserving the Vert.x context on {@link #execute(Runnable)} */ -class ContextPreservingExecutorService implements ExecutorService { +class ContextPreservingExecutorService implements ExecutorService, VirtualThreadsExecutor { private final ExecutorService delegate; ContextPreservingExecutorService(final ExecutorService delegate) { diff --git a/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/FallbackVirtualThreadsExecutorService.java b/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/FallbackVirtualThreadsExecutorService.java new file mode 100644 index 0000000000000..440b65646c4f2 --- /dev/null +++ b/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/FallbackVirtualThreadsExecutorService.java @@ -0,0 +1,54 @@ +package io.quarkus.virtual.threads; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.TimeUnit; + +import io.smallrye.mutiny.infrastructure.Infrastructure; +import io.vertx.core.Vertx; +import io.vertx.core.impl.ContextInternal; + +/** + * Fallback executor service in case the Virtual threads + */ +class FallbackVirtualThreadsExecutorService extends AbstractExecutorService implements VirtualThreadsExecutor { + + @Override + public void execute(Runnable command) { + var context = Vertx.currentContext(); + if (!(context instanceof ContextInternal)) { + Infrastructure.getDefaultWorkerPool().execute(command); + } else { + context.executeBlocking(() -> { + command.run(); + return null; + }, false); + } + } + + @Override + public void shutdown() { + // no-op + } + + @Override + public List shutdownNow() { + return Collections.EMPTY_LIST; + } + + @Override + public boolean isShutdown() { + return false; + } + + @Override + public boolean isTerminated() { + return false; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) { + return false; + } +} diff --git a/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/VirtualThreadsExecutor.java b/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/VirtualThreadsExecutor.java new file mode 100644 index 0000000000000..e23538a7be7d9 --- /dev/null +++ b/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/VirtualThreadsExecutor.java @@ -0,0 +1,9 @@ +package io.quarkus.virtual.threads; + +import java.util.concurrent.ExecutorService; + +/** + * Marker interface for Quarkus managed VirtualThreadPerTaskExecutor + */ +public interface VirtualThreadsExecutor extends ExecutorService { +} diff --git a/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/VirtualThreadsRecorder.java b/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/VirtualThreadsRecorder.java index 80fef790f3b1e..1146b9e009e19 100644 --- a/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/VirtualThreadsRecorder.java +++ b/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/VirtualThreadsRecorder.java @@ -9,15 +9,13 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import java.util.function.Supplier; import org.jboss.logging.Logger; import io.quarkus.runtime.LaunchMode; import io.quarkus.runtime.ShutdownContext; import io.quarkus.runtime.annotations.Recorder; -import io.smallrye.mutiny.infrastructure.Infrastructure; -import io.vertx.core.Vertx; -import io.vertx.core.impl.ContextInternal; @Recorder public class VirtualThreadsRecorder { @@ -26,9 +24,16 @@ public class VirtualThreadsRecorder { static VirtualThreadsConfig config = new VirtualThreadsConfig(); - private static volatile Executor current; + private static volatile VirtualThreadsExecutor current; private static final Object lock = new Object(); + public static Supplier VIRTUAL_THREADS_EXECUTOR_SUPPLIER = new Supplier() { + @Override + public VirtualThreadsExecutor get() { + return VirtualThreadsRecorder.getCurrent(); + } + }; + public void setupVirtualThreads(VirtualThreadsConfig c, ShutdownContext shutdownContext, LaunchMode launchMode) { config = c; if (config.enabled) { @@ -82,8 +87,12 @@ public void run() { } } - public static Executor getCurrent() { - Executor executor = current; + public Supplier getCurrentSupplier() { + return VIRTUAL_THREADS_EXECUTOR_SUPPLIER; + } + + public static VirtualThreadsExecutor getCurrent() { + VirtualThreadsExecutor executor = current; if (executor != null) { return executor; } @@ -134,7 +143,7 @@ static ExecutorService newVirtualThreadExecutor() * Since we try to load the "Loom-preview" classes/methods at runtime, the application can even be compiled * using java 11 and executed with a loom-compliant JDK. */ - private static Executor createExecutor() { + private static VirtualThreadsExecutor createExecutor() { if (config.enabled) { try { return new ContextPreservingExecutorService(newVirtualThreadExecutor()); @@ -149,19 +158,6 @@ private static Executor createExecutor() { } } // Fallback to regular worker threads - return new Executor() { - @Override - public void execute(Runnable command) { - var context = Vertx.currentContext(); - if (!(context instanceof ContextInternal)) { - Infrastructure.getDefaultWorkerPool().execute(command); - } else { - context.executeBlocking(() -> { - command.run(); - return null; - }, false); - } - } - }; + return new FallbackVirtualThreadsExecutorService(); } } diff --git a/integration-tests/virtual-threads/rest-client-reactive-virtual-threads/src/main/java/io/quarkus/virtual/rest/RestClientResource.java b/integration-tests/virtual-threads/rest-client-reactive-virtual-threads/src/main/java/io/quarkus/virtual/rest/RestClientResource.java index 04658da72be13..2075b15511fcc 100644 --- a/integration-tests/virtual-threads/rest-client-reactive-virtual-threads/src/main/java/io/quarkus/virtual/rest/RestClientResource.java +++ b/integration-tests/virtual-threads/rest-client-reactive-virtual-threads/src/main/java/io/quarkus/virtual/rest/RestClientResource.java @@ -1,10 +1,12 @@ package io.quarkus.virtual.rest; +import jakarta.inject.Inject; import jakarta.ws.rs.GET; import jakarta.ws.rs.Path; import org.eclipse.microprofile.rest.client.inject.RestClient; +import io.quarkus.virtual.threads.VirtualThreadsExecutor; import io.smallrye.common.annotation.RunOnVirtualThread; @Path("/") @@ -14,9 +16,13 @@ public class RestClientResource { @RestClient ServiceClient client; + @Inject + VirtualThreadsExecutor executor; + @GET public Greeting test() { AssertHelper.assertEverything(); + assert executor != null; return client.hello(); }