Skip to content

Commit

Permalink
Added synthetic bean with @VirtualThreads qualifier for the managed E…
Browse files Browse the repository at this point in the history
…xecutorService backed by virtual threads
  • Loading branch information
ozangunalp committed Oct 3, 2023
1 parent 9fc4b2a commit fa2e4d7
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
package io.quarkus.virtual.threads;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;

import org.jboss.jandex.AnnotationInstance;

import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.arc.deployment.SyntheticBeanBuildItem;
import io.quarkus.arc.processor.BuiltinScope;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.ExecutionTime;
import io.quarkus.deployment.annotations.Record;
Expand All @@ -12,8 +21,20 @@ public class VirtualThreadsProcessor {
@Record(ExecutionTime.STATIC_INIT)
public void setup(VirtualThreadsConfig config, VirtualThreadsRecorder recorder,
ShutdownContextBuildItem shutdownContextBuildItem,
LaunchModeBuildItem launchModeBuildItem) {
LaunchModeBuildItem launchModeBuildItem,
BuildProducer<AdditionalBeanBuildItem> beans,
BuildProducer<SyntheticBeanBuildItem> producer) {
beans.produce(new AdditionalBeanBuildItem(VirtualThreads.class));
recorder.setupVirtualThreads(config, shutdownContextBuildItem, launchModeBuildItem.getLaunchMode());
producer.produce(
SyntheticBeanBuildItem.configure(ExecutorService.class)
.addType(Executor.class)
.addQualifier(AnnotationInstance.builder(VirtualThreads.class).build())
.scope(BuiltinScope.APPLICATION.getInfo())
.unremovable()
.setRuntimeInit()
.supplier(recorder.getCurrentSupplier())
.done());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package io.quarkus.virtual.threads;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* An implementation of {@code ExecutorService} that delegates to the real executor, while disallowing termination.
*/
class DelegatingExecutorService implements ExecutorService {
private final ExecutorService delegate;

DelegatingExecutorService(final ExecutorService delegate) {
this.delegate = delegate;
}

public void execute(final Runnable command) {
delegate.execute(command);
}

public boolean isShutdown() {
// container managed executors are never shut down from the application's perspective
return false;
}

public boolean isTerminated() {
// container managed executors are never shut down from the application's perspective
return false;
}

public boolean awaitTermination(final long timeout, final TimeUnit unit) {
return false;
}

public void shutdown() {
throw new RuntimeException("shutdown not allowed on managed executor service");
}

public List<Runnable> shutdownNow() {
throw new RuntimeException("shutdownNow not allowed on managed executor service");
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return delegate.submit(task);
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
return delegate.submit(task, result);
}

@Override
public Future<?> submit(Runnable task) {
return delegate.submit(task);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return delegate.invokeAll(tasks);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return delegate.invokeAll(tasks, timeout, unit);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return delegate.invokeAny(tasks);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return delegate.invokeAny(tasks, timeout, unit);
}

public String toString() {
return delegate.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package io.quarkus.virtual.threads;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.TimeUnit;

import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;

/**
* Fallback executor service implementation in case the virtual threads are disabled or not available on the current platform.
* <p>
* Executes tasks on the current Vert.x context worker pool. or when not available on the Mutiny Infrastructure default worker
* pool
* Shutdown methods are no-op as the executor service is a wrapper around these previous execute methods.
*/
class FallbackVirtualThreadsExecutorService extends AbstractExecutorService {

@Override
public void execute(Runnable command) {
var context = Vertx.currentContext();
if (!(context instanceof ContextInternal)) {
Infrastructure.getDefaultWorkerPool().execute(command);
} else {
context.executeBlocking(() -> {
command.run();
return null;
}, false);
}
}

@Override
public void shutdown() {
// no-op
}

@Override
public List<Runnable> shutdownNow() {
return Collections.EMPTY_LIST;
}

@Override
public boolean isShutdown() {
return false;
}

@Override
public boolean isTerminated() {
return false;
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package io.quarkus.virtual.threads;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import jakarta.enterprise.util.AnnotationLiteral;
import jakarta.inject.Qualifier;

/**
* Qualifies an injected virtual threads executor service.
*/
@Qualifier
@Target({ FIELD, METHOD, PARAMETER })
@Retention(RUNTIME)
public @interface VirtualThreads {

final class Literal extends AnnotationLiteral<VirtualThreads> implements VirtualThreads {

private static final long serialVersionUID = 1L;

/**
* Creates a new instance of {@link Literal}.
*
* @return the literal instance.
*/
public static Literal of() {
return new Literal();
}

private Literal() {
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,16 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.function.Supplier;

import org.jboss.logging.Logger;

import io.quarkus.runtime.LaunchMode;
import io.quarkus.runtime.ShutdownContext;
import io.quarkus.runtime.annotations.Recorder;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;

@Recorder
public class VirtualThreadsRecorder {
Expand All @@ -26,19 +23,26 @@ public class VirtualThreadsRecorder {

static VirtualThreadsConfig config = new VirtualThreadsConfig();

private static volatile Executor current;
private static volatile ExecutorService current;
private static final Object lock = new Object();

public static Supplier<ExecutorService> VIRTUAL_THREADS_EXECUTOR_SUPPLIER = new Supplier<ExecutorService>() {
@Override
public ExecutorService get() {
return new DelegatingExecutorService(VirtualThreadsRecorder.getCurrent());
}
};

public void setupVirtualThreads(VirtualThreadsConfig c, ShutdownContext shutdownContext, LaunchMode launchMode) {
config = c;
if (config.enabled) {
if (launchMode == LaunchMode.DEVELOPMENT) {
shutdownContext.addLastShutdownTask(new Runnable() {
@Override
public void run() {
Executor executor = current;
if (executor instanceof ExecutorService) {
((ExecutorService) executor).shutdownNow();
ExecutorService service = current;
if (service != null) {
service.shutdownNow();
}
current = null;
}
Expand All @@ -47,10 +51,9 @@ public void run() {
shutdownContext.addLastShutdownTask(new Runnable() {
@Override
public void run() {
Executor executor = current;
ExecutorService service = current;
current = null;
if (executor instanceof ExecutorService) {
ExecutorService service = (ExecutorService) executor;
if (service != null) {
service.shutdown();

final long timeout = config.shutdownTimeout.toNanos();
Expand Down Expand Up @@ -82,8 +85,12 @@ public void run() {
}
}

public static Executor getCurrent() {
Executor executor = current;
public Supplier<ExecutorService> getCurrentSupplier() {
return VIRTUAL_THREADS_EXECUTOR_SUPPLIER;
}

public static ExecutorService getCurrent() {
ExecutorService executor = current;
if (executor != null) {
return executor;
}
Expand Down Expand Up @@ -134,7 +141,7 @@ static ExecutorService newVirtualThreadExecutor()
* Since we try to load the "Loom-preview" classes/methods at runtime, the application can even be compiled
* using java 11 and executed with a loom-compliant JDK.
*/
private static Executor createExecutor() {
private static ExecutorService createExecutor() {
if (config.enabled) {
try {
return new ContextPreservingExecutorService(newVirtualThreadExecutor());
Expand All @@ -149,19 +156,6 @@ private static Executor createExecutor() {
}
}
// Fallback to regular worker threads
return new Executor() {
@Override
public void execute(Runnable command) {
var context = Vertx.currentContext();
if (!(context instanceof ContextInternal)) {
Infrastructure.getDefaultWorkerPool().execute(command);
} else {
context.executeBlocking(() -> {
command.run();
return null;
}, false);
}
}
};
return new FallbackVirtualThreadsExecutorService();
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package io.quarkus.virtual.rest;

import java.util.concurrent.ExecutorService;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;

import org.eclipse.microprofile.rest.client.inject.RestClient;

import io.quarkus.virtual.threads.VirtualThreads;
import io.smallrye.common.annotation.RunOnVirtualThread;

@Path("/")
Expand All @@ -14,9 +18,14 @@ public class RestClientResource {
@RestClient
ServiceClient client;

@Inject
@VirtualThreads
ExecutorService executor;

@GET
public Greeting test() {
AssertHelper.assertEverything();
assert executor != null;
return client.hello();
}

Expand Down

0 comments on commit fa2e4d7

Please sign in to comment.