From 1c770a8642df25c3e53dd1b6a3f233acaafc777f Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Wed, 5 Jul 2023 15:08:06 +0200 Subject: [PATCH 1/3] Aesh console printstream redirect may cause a deadlock with virtual threads. Replacing the queue implementation with a lock-free variant resolves the issue --- .../main/java/io/quarkus/deployment/console/AeshConsole.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/deployment/src/main/java/io/quarkus/deployment/console/AeshConsole.java b/core/deployment/src/main/java/io/quarkus/deployment/console/AeshConsole.java index c881b28d12b3f..320ee08e30de8 100644 --- a/core/deployment/src/main/java/io/quarkus/deployment/console/AeshConsole.java +++ b/core/deployment/src/main/java/io/quarkus/deployment/console/AeshConsole.java @@ -6,7 +6,7 @@ import java.util.HashMap; import java.util.Map; import java.util.TreeMap; -import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; @@ -56,7 +56,7 @@ public class AeshConsole extends QuarkusConsole { * Because Aesh can log deadlocks are possible on Windows if a write fails, unless care * is taken. */ - private final LinkedBlockingDeque writeQueue = new LinkedBlockingDeque<>(); + private final ConcurrentLinkedQueue writeQueue = new ConcurrentLinkedQueue<>(); private final Lock connectionLock = new ReentrantLock(); private static final ThreadLocal IN_WRITE = new ThreadLocal<>() { @Override From 1b7d0017a9c9a67af9b0f99ea0b09d07b18fb8b7 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Tue, 4 Jul 2023 21:34:08 +0200 Subject: [PATCH 2/3] Reactive messaging virtual threads support --- .../QuarkusMediatorConfigurationUtil.java | 21 +++- .../deployment/ReactiveMessagingDotNames.java | 2 + .../SmallRyeReactiveMessagingProcessor.java | 23 +++- .../runtime/QuarkusWorkerPoolRegistry.java | 100 +++++++++++++++++- .../SmallRyeReactiveMessagingLifecycle.java | 3 +- .../runtime/WorkerConfiguration.java | 12 ++- 6 files changed, 152 insertions(+), 9 deletions(-) diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/QuarkusMediatorConfigurationUtil.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/QuarkusMediatorConfigurationUtil.java index 118b2b7dfb91d..4149c1e5a9ddd 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/QuarkusMediatorConfigurationUtil.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/QuarkusMediatorConfigurationUtil.java @@ -9,6 +9,7 @@ import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.KOTLIN_UNIT; import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.MERGE; import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.OUTGOING; +import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.RUN_ON_VIRTUAL_THREAD; import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.SMALLRYE_BLOCKING; import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.TRANSACTIONAL; import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.VOID_CLASS; @@ -33,6 +34,7 @@ import io.quarkus.runtime.configuration.ConfigurationException; import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusMediatorConfiguration; import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusParameterDescriptor; +import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusWorkerPoolRegistry; import io.quarkus.smallrye.reactivemessaging.runtime.TypeInfo; import io.smallrye.reactive.messaging.Shape; import io.smallrye.reactive.messaging.annotations.Blocking; @@ -178,17 +180,32 @@ public Integer get() { AnnotationInstance blockingAnnotation = methodInfo.annotation(BLOCKING); AnnotationInstance smallryeBlockingAnnotation = methodInfo.annotation(SMALLRYE_BLOCKING); AnnotationInstance transactionalAnnotation = methodInfo.annotation(TRANSACTIONAL); - if (blockingAnnotation != null || smallryeBlockingAnnotation != null || transactionalAnnotation != null) { + AnnotationInstance runOnVirtualThreadAnnotation = methodInfo.annotation(RUN_ON_VIRTUAL_THREAD); + if (blockingAnnotation != null || smallryeBlockingAnnotation != null || transactionalAnnotation != null + || runOnVirtualThreadAnnotation != null) { mediatorConfigurationSupport.validateBlocking(validationOutput); configuration.setBlocking(true); if (blockingAnnotation != null) { AnnotationValue ordered = blockingAnnotation.value("ordered"); - configuration.setBlockingExecutionOrdered(ordered == null || ordered.asBoolean()); + if (runOnVirtualThreadAnnotation != null) { + if (ordered != null && ordered.asBoolean()) { + throw new ConfigurationException( + "The method `" + methodInfo.name() + + "` is using `@RunOnVirtualThread` but explicitly set as `@Blocking(ordered = true)`"); + } + configuration.setBlockingExecutionOrdered(false); + configuration.setWorkerPoolName(QuarkusWorkerPoolRegistry.DEFAULT_VIRTUAL_THREAD_WORKER); + } else { + configuration.setBlockingExecutionOrdered(ordered == null || ordered.asBoolean()); + } String poolName; if (blockingAnnotation.value() != null && !(poolName = blockingAnnotation.value().asString()).equals(Blocking.DEFAULT_WORKER_POOL)) { configuration.setWorkerPoolName(poolName); } + } else if (runOnVirtualThreadAnnotation != null) { + configuration.setBlockingExecutionOrdered(false); + configuration.setWorkerPoolName(QuarkusWorkerPoolRegistry.DEFAULT_VIRTUAL_THREAD_WORKER); } else { configuration.setBlockingExecutionOrdered(true); } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java index 06fa2d2b21c50..384dcd3aee6ba 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java @@ -10,6 +10,7 @@ import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory; import org.jboss.jandex.DotName; +import io.smallrye.common.annotation.RunOnVirtualThread; import io.smallrye.reactive.messaging.MessageConverter; import io.smallrye.reactive.messaging.MutinyEmitter; import io.smallrye.reactive.messaging.annotations.Blocking; @@ -89,6 +90,7 @@ public final class ReactiveMessagingDotNames { .createSimple("io.quarkus.smallrye.reactivemessaging.runtime.kotlin.AbstractSubscribingCoroutineInvoker"); static final DotName TRANSACTIONAL = DotName.createSimple("jakarta.transaction.Transactional"); + static final DotName RUN_ON_VIRTUAL_THREAD = DotName.createSimple(RunOnVirtualThread.class.getName()); private ReactiveMessagingDotNames() { } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java index cdef1d9380c9a..a5b0955d7ff07 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java @@ -2,6 +2,7 @@ import static io.quarkus.deployment.annotations.ExecutionTime.STATIC_INIT; import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.BLOCKING; +import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.RUN_ON_VIRTUAL_THREAD; import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.SMALLRYE_BLOCKING; import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.TRANSACTIONAL; @@ -49,7 +50,9 @@ import io.quarkus.deployment.builditem.CombinedIndexBuildItem; import io.quarkus.deployment.builditem.FeatureBuildItem; import io.quarkus.deployment.builditem.GeneratedClassBuildItem; +import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem; import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; +import io.quarkus.deployment.builditem.nativeimage.RuntimeInitializedClassBuildItem; import io.quarkus.deployment.metrics.MetricsCapabilityBuildItem; import io.quarkus.deployment.recording.RecorderContext; import io.quarkus.gizmo.ClassCreator; @@ -88,6 +91,7 @@ public class SmallRyeReactiveMessagingProcessor { private static final Logger LOGGER = Logger .getLogger("io.quarkus.smallrye-reactive-messaging.deployment.processor"); + static final String DEFAULT_VIRTUAL_THREADS_MAX_CONCURRENCY = "1024"; static final String INVOKER_SUFFIX = "_SmallRyeMessagingInvoker"; static String channelPropertyFormat = "mp.messaging.%s.%s.%s"; @@ -110,6 +114,12 @@ AdditionalBeanBuildItem beans() { QuarkusWorkerPoolRegistry.class); } + @BuildStep + void nativeRuntimeInitClasses(BuildProducer runtimeInitClasses) { + runtimeInitClasses.produce(new RuntimeInitializedClassBuildItem( + "io.quarkus.smallrye.reactivemessaging.runtime.QuarkusWorkerPoolRegistry$VirtualExecutorSupplier")); + } + @BuildStep AnnotationsTransformerBuildItem transformBeanScope(BeanArchiveIndexBuildItem index, CustomScopeAnnotationsBuildItem scopes) { @@ -221,6 +231,7 @@ public void build(SmallRyeReactiveMessagingRecorder recorder, RecorderContext re List channelFields, BuildProducer generatedClass, BuildProducer reflectiveClass, + BuildProducer defaultConfig, ReactiveMessagingConfiguration conf) { ClassOutput classOutput = new GeneratedClassGizmoAdaptor(generatedClass, true); @@ -240,17 +251,25 @@ public void build(SmallRyeReactiveMessagingRecorder recorder, RecorderContext re BeanInfo bean = mediatorMethod.getBean(); if (methodInfo.hasAnnotation(BLOCKING) || methodInfo.hasAnnotation(SMALLRYE_BLOCKING) + || methodInfo.hasAnnotation(RUN_ON_VIRTUAL_THREAD) || methodInfo.hasAnnotation(TRANSACTIONAL)) { // Just in case both annotation are used, use @Blocking value. - String poolName = Blocking.DEFAULT_WORKER_POOL; + String poolName = methodInfo.hasAnnotation(RUN_ON_VIRTUAL_THREAD) + ? QuarkusWorkerPoolRegistry.DEFAULT_VIRTUAL_THREAD_WORKER + : Blocking.DEFAULT_WORKER_POOL; // If the method is annotated with the SmallRye Reactive Messaging @Blocking, extract the worker pool name if any if (methodInfo.hasAnnotation(ReactiveMessagingDotNames.BLOCKING)) { AnnotationInstance blocking = methodInfo.annotation(ReactiveMessagingDotNames.BLOCKING); poolName = blocking.value() == null ? Blocking.DEFAULT_WORKER_POOL : blocking.value().asString(); } + if (methodInfo.hasAnnotation(RUN_ON_VIRTUAL_THREAD)) { + defaultConfig.produce(new RunTimeConfigurationDefaultBuildItem( + "smallrye.messaging.worker." + poolName + ".max-concurrency", + DEFAULT_VIRTUAL_THREADS_MAX_CONCURRENCY)); + } workerConfigurations.add(new WorkerConfiguration(methodInfo.declaringClass().toString(), - methodInfo.name(), poolName)); + methodInfo.name(), poolName, methodInfo.hasAnnotation(RUN_ON_VIRTUAL_THREAD))); } try { diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusWorkerPoolRegistry.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusWorkerPoolRegistry.java index 7a596d10ac02f..4f5149b1a09b5 100644 --- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusWorkerPoolRegistry.java +++ b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusWorkerPoolRegistry.java @@ -1,10 +1,15 @@ package io.quarkus.smallrye.reactivemessaging.runtime; +import java.lang.reflect.InvocationTargetException; import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.function.Supplier; import jakarta.annotation.Priority; import jakarta.enterprise.context.ApplicationScoped; @@ -15,13 +20,18 @@ import jakarta.inject.Inject; import org.eclipse.microprofile.config.ConfigProvider; +import org.jboss.logging.Logger; import org.slf4j.LoggerFactory; +import io.quarkus.runtime.ExecutorRecorder; import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.annotations.Blocking; import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder; import io.smallrye.reactive.messaging.providers.connectors.WorkerPoolRegistry; import io.smallrye.reactive.messaging.providers.helpers.Validation; +import io.vertx.core.Vertx; +import io.vertx.core.impl.ConcurrentHashSet; +import io.vertx.core.impl.ContextInternal; import io.vertx.mutiny.core.Context; import io.vertx.mutiny.core.WorkerExecutor; @@ -30,14 +40,79 @@ @ApplicationScoped // TODO: create a different entry for WorkerPoolRegistry than `analyzeWorker` and drop this class public class QuarkusWorkerPoolRegistry extends WorkerPoolRegistry { + + private static final Logger logger = Logger.getLogger(QuarkusWorkerPoolRegistry.class); private static final String WORKER_CONFIG_PREFIX = "smallrye.messaging.worker"; private static final String WORKER_CONCURRENCY = "max-concurrency"; + public static final String DEFAULT_VIRTUAL_THREAD_WORKER = ""; @Inject ExecutionHolder executionHolder; private final Map workerConcurrency = new HashMap<>(); private final Map workerExecutors = new ConcurrentHashMap<>(); + private final Set virtualThreadWorkers = initVirtualThreadWorkers(); + + private static Set initVirtualThreadWorkers() { + Set set = new ConcurrentHashSet<>(); + set.add(DEFAULT_VIRTUAL_THREAD_WORKER); + return set; + } + + private enum VirtualExecutorSupplier implements Supplier { + Instance; + + private final Executor executor; + + /** + * This method uses reflection in order to allow developers to quickly test quarkus-loom without needing to + * change --release, --source, --target flags and to enable previews. + * Since we try to load the "Loom-preview" classes/methods at runtime, the application can even be compiled + * using java 11 and executed with a loom-compliant JDK. + */ + VirtualExecutorSupplier() { + Executor actual; + try { + var virtual = (Executor) Executors.class.getMethod("newVirtualThreadPerTaskExecutor") + .invoke(this); + actual = new Executor() { + @Override + public void execute(Runnable command) { + var context = Vertx.currentContext(); + if (!(context instanceof ContextInternal)) { + virtual.execute(command); + } else { + ContextInternal contextInternal = (ContextInternal) context; + virtual.execute(new Runnable() { + @Override + public void run() { + final var previousContext = contextInternal.beginDispatch(); + try { + command.run(); + } finally { + contextInternal.endDispatch(previousContext); + } + } + }); + } + } + }; + } catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException e) { + //quite ugly but works + logger.warnf(e, "You weren't able to create an executor that spawns virtual threads, the default" + + " blocking executor will be used, please check that your JDK is compatible with " + + "virtual threads"); + //if for some reason a class/method can't be loaded or invoked we return the traditional EXECUTOR + actual = ExecutorRecorder.getCurrent(); + } + this.executor = actual; + } + + @Override + public Executor get() { + return this.executor; + } + } public void terminate( @Observes(notifyObserver = Reception.IF_EXISTS) @Priority(100) @BeforeDestroyed(ApplicationScoped.class) Object event) { @@ -56,6 +131,8 @@ public Uni executeWork(Context currentContext, Uni uni, String workerN return currentContext.executeBlocking(Uni.createFrom().deferred(() -> uni), ordered); } return executionHolder.vertx().executeBlocking(uni, ordered); + } else if (virtualThreadWorkers.contains(workerName)) { + return runOnVirtualThread(currentContext, uni); } else { if (currentContext != null) { return getWorker(workerName).executeBlocking(uni, ordered) @@ -73,6 +150,19 @@ public Uni executeWork(Context currentContext, Uni uni, String workerN } } + private Uni runOnVirtualThread(Context currentContext, Uni uni) { + return uni.runSubscriptionOn(VirtualExecutorSupplier.Instance.get()) + .onItemOrFailure().transformToUni((item, failure) -> { + return Uni.createFrom().emitter(emitter -> { + if (failure != null) { + currentContext.runOnContext(() -> emitter.fail(failure)); + } else { + currentContext.runOnContext(() -> emitter.complete(item)); + } + }); + }); + } + public WorkerExecutor getWorker(String workerName) { Objects.requireNonNull(workerName, "Worker Name not specified"); @@ -102,12 +192,16 @@ public WorkerExecutor getWorker(String workerName) { } // Shouldn't get here - throw new IllegalArgumentException("@Blocking referred to invalid worker name."); + throw new IllegalArgumentException("@Blocking referred to invalid worker name. " + workerName); } - public void defineWorker(String className, String method, String poolName) { + public void defineWorker(String className, String method, String poolName, boolean virtualThread) { Objects.requireNonNull(className, "className was empty"); Objects.requireNonNull(method, "Method was empty"); + if (virtualThread) { + virtualThreadWorkers.add(poolName); + return; + } if (!poolName.equals(Blocking.DEFAULT_WORKER_POOL)) { // Validate @Blocking value is not empty, if set @@ -118,7 +212,7 @@ public void defineWorker(String className, String method, String poolName) { // Validate @Blocking worker pool has configuration to define concurrency String workerConfigKey = WORKER_CONFIG_PREFIX + "." + poolName + "." + WORKER_CONCURRENCY; Optional concurrency = ConfigProvider.getConfig().getOptionalValue(workerConfigKey, Integer.class); - if (!concurrency.isPresent()) { + if (concurrency.isEmpty()) { throw getBlockingError(className, method, workerConfigKey + " was not defined"); } diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingLifecycle.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingLifecycle.java index 89294f8e41b89..98ab6ec818596 100644 --- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingLifecycle.java +++ b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingLifecycle.java @@ -29,7 +29,8 @@ void onStaticInit(@Observes @Initialized(ApplicationScoped.class) Object event, QuarkusWorkerPoolRegistry workerPoolRegistry) { mediatorManager.addAnalyzed(context.getMediatorConfigurations()); for (WorkerConfiguration worker : context.getWorkerConfigurations()) { - workerPoolRegistry.defineWorker(worker.getClassName(), worker.getMethodName(), worker.getPoolName()); + workerPoolRegistry.defineWorker(worker.getClassName(), worker.getMethodName(), worker.getPoolName(), + worker.isVirtualThread()); } for (EmitterConfiguration emitter : context.getEmitterConfigurations()) { mediatorManager.addEmitter(emitter); diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/WorkerConfiguration.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/WorkerConfiguration.java index 854bb1d09c49f..148c5609e8dbe 100644 --- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/WorkerConfiguration.java +++ b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/WorkerConfiguration.java @@ -8,13 +8,16 @@ public class WorkerConfiguration { private String poolName; + private boolean virtualThread; + public WorkerConfiguration() { } - public WorkerConfiguration(String className, String name, String poolName) { + public WorkerConfiguration(String className, String name, String poolName, boolean virtualThread) { this.className = className; this.methodName = name; this.poolName = poolName; + this.virtualThread = virtualThread; } public String getClassName() { @@ -41,4 +44,11 @@ public void setPoolName(String poolName) { this.poolName = poolName; } + public boolean isVirtualThread() { + return virtualThread; + } + + public void setVirtualThread(boolean virtualThread) { + this.virtualThread = virtualThread; + } } From 1c0b57ce5c4508d544e929548cba93b6f3fb28c6 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Tue, 18 Jul 2023 12:05:11 +0200 Subject: [PATCH 3/3] Guide for using virtual threads with reactive messaging extensions --- docs/src/main/asciidoc/amqp-reference.adoc | 6 + docs/src/main/asciidoc/kafka.adoc | 6 + .../asciidoc/messaging-virtual-threads.adoc | 167 ++++++++++++++++++ docs/src/main/asciidoc/pulsar.adoc | 59 +++++++ .../src/main/asciidoc/rabbitmq-reference.adoc | 6 + 5 files changed, 244 insertions(+) create mode 100644 docs/src/main/asciidoc/messaging-virtual-threads.adoc diff --git a/docs/src/main/asciidoc/amqp-reference.adoc b/docs/src/main/asciidoc/amqp-reference.adoc index 0b870346c580d..7da796b9f759c 100644 --- a/docs/src/main/asciidoc/amqp-reference.adoc +++ b/docs/src/main/asciidoc/amqp-reference.adoc @@ -411,6 +411,12 @@ The first one provides more fine-grained tuning such as the worker pool to use a The second one, used also with other reactive features of Quarkus, uses the default worker pool and preserves the order. ==== +[TIP] +.@RunOnVirtualThread +==== +For running the blocking processing on Java _virtual threads_, see the xref:messaging-virtual-threads.adoc[Quarkus Virtual Thread support with Reactive Messaging documentation]. +==== + [TIP] .@Transactional ==== diff --git a/docs/src/main/asciidoc/kafka.adoc b/docs/src/main/asciidoc/kafka.adoc index 7dda7c2f445c5..27c695fd9426b 100644 --- a/docs/src/main/asciidoc/kafka.adoc +++ b/docs/src/main/asciidoc/kafka.adoc @@ -284,6 +284,12 @@ The second one, used also with other reactive features of Quarkus, uses the defa Detailed information on the usage of `@Blocking` annotation can be found in https://smallrye.io/smallrye-reactive-messaging/latest/concepts/blocking/[SmallRye Reactive Messaging – Handling blocking execution]. ==== +[TIP] +.@RunOnVirtualThread +==== +For running the blocking processing on Java _virtual threads_, see the xref:messaging-virtual-threads.adoc[Quarkus Virtual Thread support with Reactive Messaging documentation]. +==== + [TIP] .@Transactional ==== diff --git a/docs/src/main/asciidoc/messaging-virtual-threads.adoc b/docs/src/main/asciidoc/messaging-virtual-threads.adoc new file mode 100644 index 0000000000000..f07e9cff31c56 --- /dev/null +++ b/docs/src/main/asciidoc/messaging-virtual-threads.adoc @@ -0,0 +1,167 @@ += Quarkus Virtual Thread support with Reactive Messaging + +include::_attributes.adoc[] +:runonvthread: https://javadoc.io/doc/io.smallrye.common/smallrye-common-annotation/latest/io/smallrye/common/annotation/RunOnVirtualThread.html +:rm_blocking_annotation: https://javadoc.io/doc/io.smallrye.reactive/smallrye-reactive-messaging-api/latest/io/smallrye/reactive/messaging/annotations/Blocking.html +:rm_blocking_docs: http://smallrye.io/smallrye-reactive-messaging/4.8.0/concepts/blocking/ + +This guide explains how to benefit from Java virtual threads when writing message processing applications in Quarkus. + +[TIP] +==== +This guide focuses on using virtual threads with Reactive Messaging extensions. +Please refer to xref:virtual-threads.adoc[Writing simpler reactive REST services with Quarkus Virtual Thread support] +to read more about Java virtual threads in general and the Quarkus Virtual Thread support for REST services. + +For reference guides of specific Reactive Messaging extensions see xref:kafka.adoc[Apache Kafka Reference Guide], +xref:amqp-reference.adoc[Reactive Messaging AMQP 1.0 Connector], xref:rabbitmq-reference.adoc[Reactive Messaging RabbitMQ Connector] or xref:pulsar.adoc[Apache Pulsar Reference Guide]. +==== + +By default, Reactive Messaging invokes message processing methods on an event-loop thread. +See the xref:quarkus-reactive-architecture.adoc[Quarkus Reactive Architecture documentation] for further details on this topic. +But, you sometimes need to combine Reactive Messaging with blocking processing such as calling external services or database operations. +For this, you can use the link:{rm_blocking_annotation}[@Blocking] annotation indicating that the processing is _blocking_ and should be run on a worker thread. +You can read more on the blocking processing in link:{rm_blocking_docs}[SmallRye Reactive Messaging documentation]. + +The idea behind Quarkus Virtual Thread support for Reactive Messaging is to offload the message processing on virtual threads, +instead of running it on an event-loop thread or a worker thread. + +To enable virtual thread support on a message consumer method, simply add the link:{runonvthread}[@RunOnVirtualThread] annotation to the method. +If the JDK is compatible (Java 19 or later versions) then each incoming message will be offloaded to a new virtual thread. +It will then be possible to perform blocking operations without blocking the platform thread upon which the virtual thread is mounted. + +== Example using the Reactive Messaging Kafka extension + +Let's see an example of how to process Kafka records on virtual threads. +First, make sure to have a reactive messaging extension dependency to your build file: + +[source,xml,role="primary asciidoc-tabs-target-sync-cli asciidoc-tabs-target-sync-maven"] +.pom.xml +---- + + io.quarkus + quarkus-smallrye-reactive-messaging-kafka + +---- + +[source,gradle,role="secondary asciidoc-tabs-target-sync-gradle"] +.build.gradle +---- +implementation("io.quarkus:quarkus-smallrye-reactive-messaging-kafka") +---- + +You also need to make sure that you are using Java 19 or later, this can be enforced in your `pom.xml` file with the following: + +[source,xml,role="primary asciidoc-tabs-target-sync-cli asciidoc-tabs-target-sync-maven"] +.pom.xml +---- + + 19 + 19 + +---- + +Virtual threads are still a preview feature, so you need to start your application with the `--enable-preview` flag: + +[source, bash] +---- +java --enable-preview -jar target/quarkus-app/quarkus-run.jar +---- + +or to use the Quarkus Dev mode, insert the following to the `quarkus-maven-plugin` configuration: + +[source,xml,role="primary asciidoc-tabs-target-sync-cli asciidoc-tabs-target-sync-maven"] +.pom.xml +---- + + io.quarkus + quarkus-maven-plugin + ${quarkus.version} + + + + build + + + + + 19 + 19 + + --enable-preview + + --enable-preview --add-opens java.base/java.lang=ALL-UNNAMED + + +---- + +Then you can start using the annotation `@RunOnVirtualThread` on your consumer methods also annotated with `@Incoming`. +In the following example we'll use the xref:rest-client-reactive.adoc[RESTEasy Reactive REST Client] to make a blocking call to a REST endpoint: + +[source, java] +---- +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.eclipse.microprofile.rest.client.inject.RestClient; + +import io.smallrye.common.annotation.RunOnVirtualThread; + +import jakarta.enterprise.context.ApplicationScoped; + +@ApplicationScoped +public class PriceConsumer { + + @RestClient // <2> + PriceAlertService alertService; + + + @Incoming("prices") + @RunOnVirtualThread // <1> + public void consume(double price) { + if (price > 90.0) { + alertService.alert(price); // <3> + } + } + + @Outgoing("prices-out") // <4> + public Multi randomPriceGenerator() { + return Multi.createFrom().generator(Random::new, (r, e) -> { + e.emit(r.nextDouble(100)); + return r; + }); + } + + +} +---- + +<1> `@RunOnVirtualThread` annotation on the `@Incoming` method ensures that the method will be called on a virtual thread. +<2> the REST client stub is injected with the `@RestClient` annotation. +<3> `alert` method blocks the virtual thread until the REST call returns. +<4> This `@Outgoing` method generates random prices and writes them a Kafka topic to be consumed back by the application. + +Note that by default Reactive Messaging message processing happens sequentially, preserving the order of messages. +In the same way, `@Blocking(ordered = false)` annotation changes this behaviour, +using `@RunOnVirtualThread` enforces concurrent message processing without preserving the order. + +In order to leverage the lightweight nature of virtual threads, the default maximum concurrency for methods annotated with `@RunOnVirtualThread` is 1024. +As opposed to platform threads, virtual threads are not pooled and created per message. Therefore the maximum concurrency applies separately to all `@RunOnVirtualThread` methods. + +There are two ways to customize the concurrency level: + +1. The `@RunOnVirtualThread` annotation can be used together with the link:{rm_blocking_annotation}[@Blocking] annotation to specify a worker name. ++ +[source, java] +---- +@Incoming("prices") +@RunOnVirtualThread +@Blocking("my-worker") +public void consume(double price) { + //... +} +---- ++ +Then, for example, to set the maximum concurrency of this method down to 30, set using the config property `smallrye.messaging.worker.my-worker.max-concurrency=30`. ++ +2. For every `@RunOnVirtualThread` method that is not configured with a worker name, you can use the config property `smallrye.messaging.worker..max-concurrency`. + diff --git a/docs/src/main/asciidoc/pulsar.adoc b/docs/src/main/asciidoc/pulsar.adoc index e1be2db74bc35..529684251e09d 100644 --- a/docs/src/main/asciidoc/pulsar.adoc +++ b/docs/src/main/asciidoc/pulsar.adoc @@ -254,6 +254,65 @@ Following types can be injected as channels: As with the previous `Message` example, if your injected channel receives payloads (`Multi`), it acknowledges the message automatically, and support multiple subscribers. If your injected channel receives Message (`Multi>`), you will be responsible for the acknowledgment and broadcasting. +[[blocking-processing]] +=== Blocking processing + +Reactive Messaging invokes your method on an I/O thread. +See the xref:quarkus-reactive-architecture.adoc[Quarkus Reactive Architecture documentation] for further details on this topic. +But, you often need to combine Reactive Messaging with blocking processing such as database interactions. +For this, you need to use the `@Blocking` annotation indicating that the processing is _blocking_ and should not be run on the caller thread. + +For example, The following code illustrates how you can store incoming payloads to a database using Hibernate with Panache: + +[source,java] +---- +import io.smallrye.reactive.messaging.annotations.Blocking; +import org.eclipse.microprofile.reactive.messaging.Incoming; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.transaction.Transactional; + +@ApplicationScoped +public class PriceStorage { + + @Incoming("prices") + @Transactional + public void store(int priceInUsd) { + Price price = new Price(); + price.value = priceInUsd; + price.persist(); + } + +} +---- + +[NOTE] +==== +There are 2 `@Blocking` annotations: + +1. `io.smallrye.reactive.messaging.annotations.Blocking` +2. `io.smallrye.common.annotation.Blocking` + +They have the same effect. +Thus, you can use both. +The first one provides more fine-grained tuning such as the worker pool to use and whether it preserves the order. +The second one, used also with other reactive features of Quarkus, uses the default worker pool and preserves the order. + +Detailed information on the usage of `@Blocking` annotation can be found in https://smallrye.io/smallrye-reactive-messaging/latest/concepts/blocking/[SmallRye Reactive Messaging – Handling blocking execution]. +==== + +[TIP] +.@RunOnVirtualThread +==== +For running the blocking processing on Java _virtual threads_, see the xref:messaging-virtual-threads.adoc[Quarkus Virtual Thread support with Reactive Messaging documentation]. +==== + +[TIP] +.@Transactional +==== +If your method is annotated with `@Transactional`, it will be considered _blocking_ automatically, even if the method is not annotated with `@Blocking`. +==== + === Pulsar Subscription Types Pulsar *subscriptionType* consumer configuration can be used flexibly to achieve different messaging scenarios, such as publish-subscribe or queuing. diff --git a/docs/src/main/asciidoc/rabbitmq-reference.adoc b/docs/src/main/asciidoc/rabbitmq-reference.adoc index e7bb380a417c4..105a674eb4940 100644 --- a/docs/src/main/asciidoc/rabbitmq-reference.adoc +++ b/docs/src/main/asciidoc/rabbitmq-reference.adoc @@ -335,6 +335,12 @@ The first one provides more fine-grained tuning such as the worker pool to use a The second one, used also with other reactive features of Quarkus, uses the default worker pool and preserves the order. ==== +[TIP] +.@RunOnVirtualThread +==== +For running the blocking processing on Java _virtual threads_, see the xref:messaging-virtual-threads.adoc[Quarkus Virtual Thread support with Reactive Messaging documentation]. +==== + == Customizing the underlying RabbitMQ client The connector uses the Vert.x RabbitMQ client underneath.