Skip to content

Commit

Permalink
Reactive messaging virtual threads support
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Jul 17, 2023
1 parent 1c770a8 commit de2052d
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.KOTLIN_UNIT;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.MERGE;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.OUTGOING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.RUN_ON_VIRTUAL_THREAD;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.SMALLRYE_BLOCKING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.TRANSACTIONAL;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.VOID_CLASS;
Expand All @@ -33,6 +34,7 @@
import io.quarkus.runtime.configuration.ConfigurationException;
import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusMediatorConfiguration;
import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusParameterDescriptor;
import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusWorkerPoolRegistry;
import io.quarkus.smallrye.reactivemessaging.runtime.TypeInfo;
import io.smallrye.reactive.messaging.Shape;
import io.smallrye.reactive.messaging.annotations.Blocking;
Expand Down Expand Up @@ -178,17 +180,32 @@ public Integer get() {
AnnotationInstance blockingAnnotation = methodInfo.annotation(BLOCKING);
AnnotationInstance smallryeBlockingAnnotation = methodInfo.annotation(SMALLRYE_BLOCKING);
AnnotationInstance transactionalAnnotation = methodInfo.annotation(TRANSACTIONAL);
if (blockingAnnotation != null || smallryeBlockingAnnotation != null || transactionalAnnotation != null) {
AnnotationInstance runOnVirtualThreadAnnotation = methodInfo.annotation(RUN_ON_VIRTUAL_THREAD);
if (blockingAnnotation != null || smallryeBlockingAnnotation != null || transactionalAnnotation != null
|| runOnVirtualThreadAnnotation != null) {
mediatorConfigurationSupport.validateBlocking(validationOutput);
configuration.setBlocking(true);
if (blockingAnnotation != null) {
AnnotationValue ordered = blockingAnnotation.value("ordered");
configuration.setBlockingExecutionOrdered(ordered == null || ordered.asBoolean());
if (runOnVirtualThreadAnnotation != null) {
if (ordered != null && ordered.asBoolean()) {
throw new ConfigurationException(
"The method `" + methodInfo.name()
+ "` is using `@RunOnVirtualThread` but explicitly set as `@Blocking(ordered = true)`");
}
configuration.setBlockingExecutionOrdered(false);
configuration.setWorkerPoolName(QuarkusWorkerPoolRegistry.DEFAULT_VIRTUAL_THREAD_WORKER);
} else {
configuration.setBlockingExecutionOrdered(ordered == null || ordered.asBoolean());
}
String poolName;
if (blockingAnnotation.value() != null &&
!(poolName = blockingAnnotation.value().asString()).equals(Blocking.DEFAULT_WORKER_POOL)) {
configuration.setWorkerPoolName(poolName);
}
} else if (runOnVirtualThreadAnnotation != null) {
configuration.setBlockingExecutionOrdered(false);
configuration.setWorkerPoolName(QuarkusWorkerPoolRegistry.DEFAULT_VIRTUAL_THREAD_WORKER);
} else {
configuration.setBlockingExecutionOrdered(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.jboss.jandex.DotName;

import io.smallrye.common.annotation.RunOnVirtualThread;
import io.smallrye.reactive.messaging.MessageConverter;
import io.smallrye.reactive.messaging.MutinyEmitter;
import io.smallrye.reactive.messaging.annotations.Blocking;
Expand Down Expand Up @@ -89,6 +90,7 @@ public final class ReactiveMessagingDotNames {
.createSimple("io.quarkus.smallrye.reactivemessaging.runtime.kotlin.AbstractSubscribingCoroutineInvoker");

static final DotName TRANSACTIONAL = DotName.createSimple("jakarta.transaction.Transactional");
static final DotName RUN_ON_VIRTUAL_THREAD = DotName.createSimple(RunOnVirtualThread.class.getName());

private ReactiveMessagingDotNames() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static io.quarkus.deployment.annotations.ExecutionTime.STATIC_INIT;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.BLOCKING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.RUN_ON_VIRTUAL_THREAD;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.SMALLRYE_BLOCKING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.TRANSACTIONAL;

Expand Down Expand Up @@ -49,6 +50,7 @@
import io.quarkus.deployment.builditem.CombinedIndexBuildItem;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.GeneratedClassBuildItem;
import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.deployment.metrics.MetricsCapabilityBuildItem;
import io.quarkus.deployment.recording.RecorderContext;
Expand Down Expand Up @@ -221,6 +223,7 @@ public void build(SmallRyeReactiveMessagingRecorder recorder, RecorderContext re
List<InjectedChannelBuildItem> channelFields,
BuildProducer<GeneratedClassBuildItem> generatedClass,
BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
BuildProducer<RunTimeConfigurationDefaultBuildItem> defaultConfig,
ReactiveMessagingConfiguration conf) {

ClassOutput classOutput = new GeneratedClassGizmoAdaptor(generatedClass, true);
Expand All @@ -240,17 +243,24 @@ public void build(SmallRyeReactiveMessagingRecorder recorder, RecorderContext re
BeanInfo bean = mediatorMethod.getBean();

if (methodInfo.hasAnnotation(BLOCKING) || methodInfo.hasAnnotation(SMALLRYE_BLOCKING)
|| methodInfo.hasAnnotation(RUN_ON_VIRTUAL_THREAD)
|| methodInfo.hasAnnotation(TRANSACTIONAL)) {
// Just in case both annotation are used, use @Blocking value.
String poolName = Blocking.DEFAULT_WORKER_POOL;
String poolName = methodInfo.hasAnnotation(RUN_ON_VIRTUAL_THREAD)
? QuarkusWorkerPoolRegistry.DEFAULT_VIRTUAL_THREAD_WORKER
: Blocking.DEFAULT_WORKER_POOL;

// If the method is annotated with the SmallRye Reactive Messaging @Blocking, extract the worker pool name if any
if (methodInfo.hasAnnotation(ReactiveMessagingDotNames.BLOCKING)) {
AnnotationInstance blocking = methodInfo.annotation(ReactiveMessagingDotNames.BLOCKING);
poolName = blocking.value() == null ? Blocking.DEFAULT_WORKER_POOL : blocking.value().asString();
}
if (methodInfo.hasAnnotation(RUN_ON_VIRTUAL_THREAD)) {
defaultConfig.produce(new RunTimeConfigurationDefaultBuildItem(
"smallrye.messaging.worker." + poolName + ".max-concurrency", "1000"));
}
workerConfigurations.add(new WorkerConfiguration(methodInfo.declaringClass().toString(),
methodInfo.name(), poolName));
methodInfo.name(), poolName, methodInfo.hasAnnotation(RUN_ON_VIRTUAL_THREAD)));
}

try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package io.quarkus.smallrye.reactivemessaging.runtime;

import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Supplier;

import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
Expand All @@ -15,13 +20,18 @@
import jakarta.inject.Inject;

import org.eclipse.microprofile.config.ConfigProvider;
import org.jboss.logging.Logger;
import org.slf4j.LoggerFactory;

import io.quarkus.runtime.ExecutorRecorder;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.annotations.Blocking;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.providers.connectors.WorkerPoolRegistry;
import io.smallrye.reactive.messaging.providers.helpers.Validation;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ConcurrentHashSet;
import io.vertx.core.impl.ContextInternal;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.WorkerExecutor;

Expand All @@ -30,14 +40,73 @@
@ApplicationScoped
// TODO: create a different entry for WorkerPoolRegistry than `analyzeWorker` and drop this class
public class QuarkusWorkerPoolRegistry extends WorkerPoolRegistry {

private static final Logger logger = Logger.getLogger(QuarkusWorkerPoolRegistry.class);
private static final String WORKER_CONFIG_PREFIX = "smallrye.messaging.worker";
private static final String WORKER_CONCURRENCY = "max-concurrency";
public static final String DEFAULT_VIRTUAL_THREAD_WORKER = "<virtual-thread>";

@Inject
ExecutionHolder executionHolder;

private final Map<String, Integer> workerConcurrency = new HashMap<>();
private final Map<String, WorkerExecutor> workerExecutors = new ConcurrentHashMap<>();
private final Set<String> virtualThreadWorkers = initVirtualThreadWorkers();

private static Set<String> initVirtualThreadWorkers() {
Set<String> set = new ConcurrentHashSet<>();
set.add(DEFAULT_VIRTUAL_THREAD_WORKER);
return set;
}

public static final Supplier<Executor> VIRTUAL_EXECUTOR_SUPPLIER = new Supplier<Executor>() {
Executor current = null;

/**
* This method uses reflection in order to allow developers to quickly test quarkus-loom without needing to
* change --release, --source, --target flags and to enable previews.
* Since we try to load the "Loom-preview" classes/methods at runtime, the application can even be compiled
* using java 11 and executed with a loom-compliant JDK.
*/
@Override
public Executor get() {
if (current == null) {
try {
var virtual = (Executor) Executors.class.getMethod("newVirtualThreadPerTaskExecutor")
.invoke(this);
current = new Executor() {
@Override
public void execute(Runnable command) {
var context = Vertx.currentContext();
if (!(context instanceof ContextInternal)) {
virtual.execute(command);
} else {
virtual.execute(new Runnable() {
@Override
public void run() {
final var previousContext = ((ContextInternal) context).beginDispatch();
try {
command.run();
} finally {
((ContextInternal) context).endDispatch(previousContext);
}
}
});
}
}
};
} catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException e) {
//quite ugly but works
logger.warnf(e, "You weren't able to create an executor that spawns virtual threads, the default" +
" blocking executor will be used, please check that your JDK is compatible with " +
"virtual threads");
//if for some reason a class/method can't be loaded or invoked we return the traditional EXECUTOR
current = ExecutorRecorder.getCurrent();
}
}
return current;
}
};

public void terminate(
@Observes(notifyObserver = Reception.IF_EXISTS) @Priority(100) @BeforeDestroyed(ApplicationScoped.class) Object event) {
Expand All @@ -56,6 +125,8 @@ public <T> Uni<T> executeWork(Context currentContext, Uni<T> uni, String workerN
return currentContext.executeBlocking(Uni.createFrom().deferred(() -> uni), ordered);
}
return executionHolder.vertx().executeBlocking(uni, ordered);
} else if (virtualThreadWorkers.contains(workerName)) {
return runOnVirtualThread(currentContext, uni);
} else {
if (currentContext != null) {
return getWorker(workerName).executeBlocking(uni, ordered)
Expand All @@ -73,6 +144,19 @@ public <T> Uni<T> executeWork(Context currentContext, Uni<T> uni, String workerN
}
}

private <T> Uni<T> runOnVirtualThread(Context currentContext, Uni<T> uni) {
return uni.runSubscriptionOn(VIRTUAL_EXECUTOR_SUPPLIER.get())
.onItemOrFailure().transformToUni((item, failure) -> {
return Uni.createFrom().emitter(emitter -> {
if (failure != null) {
currentContext.runOnContext(() -> emitter.fail(failure));
} else {
currentContext.runOnContext(() -> emitter.complete(item));
}
});
});
}

public WorkerExecutor getWorker(String workerName) {
Objects.requireNonNull(workerName, "Worker Name not specified");

Expand Down Expand Up @@ -102,12 +186,16 @@ public WorkerExecutor getWorker(String workerName) {
}

// Shouldn't get here
throw new IllegalArgumentException("@Blocking referred to invalid worker name.");
throw new IllegalArgumentException("@Blocking referred to invalid worker name. " + workerName);
}

public void defineWorker(String className, String method, String poolName) {
public void defineWorker(String className, String method, String poolName, boolean virtualThread) {
Objects.requireNonNull(className, "className was empty");
Objects.requireNonNull(method, "Method was empty");
if (virtualThread) {
virtualThreadWorkers.add(poolName);
return;
}

if (!poolName.equals(Blocking.DEFAULT_WORKER_POOL)) {
// Validate @Blocking value is not empty, if set
Expand All @@ -118,7 +206,7 @@ public void defineWorker(String className, String method, String poolName) {
// Validate @Blocking worker pool has configuration to define concurrency
String workerConfigKey = WORKER_CONFIG_PREFIX + "." + poolName + "." + WORKER_CONCURRENCY;
Optional<Integer> concurrency = ConfigProvider.getConfig().getOptionalValue(workerConfigKey, Integer.class);
if (!concurrency.isPresent()) {
if (concurrency.isEmpty()) {
throw getBlockingError(className, method, workerConfigKey + " was not defined");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ void onStaticInit(@Observes @Initialized(ApplicationScoped.class) Object event,
QuarkusWorkerPoolRegistry workerPoolRegistry) {
mediatorManager.addAnalyzed(context.getMediatorConfigurations());
for (WorkerConfiguration worker : context.getWorkerConfigurations()) {
workerPoolRegistry.defineWorker(worker.getClassName(), worker.getMethodName(), worker.getPoolName());
workerPoolRegistry.defineWorker(worker.getClassName(), worker.getMethodName(), worker.getPoolName(),
worker.isVirtualThread());
}
for (EmitterConfiguration emitter : context.getEmitterConfigurations()) {
mediatorManager.addEmitter(emitter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ public class WorkerConfiguration {

private String poolName;

private boolean virtualThread;

public WorkerConfiguration() {
}

public WorkerConfiguration(String className, String name, String poolName) {
public WorkerConfiguration(String className, String name, String poolName, boolean virtualThread) {
this.className = className;
this.methodName = name;
this.poolName = poolName;
this.virtualThread = virtualThread;
}

public String getClassName() {
Expand All @@ -41,4 +44,11 @@ public void setPoolName(String poolName) {
this.poolName = poolName;
}

public boolean isVirtualThread() {
return virtualThread;
}

public void setVirtualThread(boolean virtualThread) {
this.virtualThread = virtualThread;
}
}

0 comments on commit de2052d

Please sign in to comment.