Skip to content

Commit

Permalink
Added synthetic beans for the managed ExecutorService backed by virtu…
Browse files Browse the repository at this point in the history
…al threads
  • Loading branch information
ozangunalp committed Oct 2, 2023
1 parent 9fc4b2a commit 3044b38
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -1,19 +1,43 @@
package io.quarkus.virtual.threads;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;

import io.quarkus.arc.deployment.SyntheticBeanBuildItem;
import io.quarkus.arc.processor.BuiltinScope;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.ExecutionTime;
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.LaunchModeBuildItem;
import io.quarkus.deployment.builditem.ShutdownContextBuildItem;
import io.smallrye.common.annotation.Identifier;

public class VirtualThreadsProcessor {

@BuildStep
@Record(ExecutionTime.STATIC_INIT)
public void setup(VirtualThreadsConfig config, VirtualThreadsRecorder recorder,
ShutdownContextBuildItem shutdownContextBuildItem,
LaunchModeBuildItem launchModeBuildItem) {
LaunchModeBuildItem launchModeBuildItem,
BuildProducer<SyntheticBeanBuildItem> producer) {
recorder.setupVirtualThreads(config, shutdownContextBuildItem, launchModeBuildItem.getLaunchMode());
producer.produce(
SyntheticBeanBuildItem.configure(VirtualThreadsExecutor.class)
.scope(BuiltinScope.APPLICATION.getInfo())
.unremovable()
.setRuntimeInit()
.supplier(recorder.getCurrentSupplier())
.done());
producer.produce(
SyntheticBeanBuildItem.configure(VirtualThreadsExecutor.class)
.types(ExecutorService.class, Executor.class)
.addQualifier().annotation(Identifier.class).addValue("value", "virtual-threads").done()
.scope(BuiltinScope.APPLICATION.getInfo())
.unremovable()
.setRuntimeInit()
.supplier(recorder.getCurrentSupplier())
.done());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
/**
* Delegating executor service implementation preserving the Vert.x context on {@link #execute(Runnable)}
*/
class ContextPreservingExecutorService implements ExecutorService {
class ContextPreservingExecutorService implements ExecutorService, VirtualThreadsExecutor {
private final ExecutorService delegate;

ContextPreservingExecutorService(final ExecutorService delegate) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package io.quarkus.virtual.threads;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.TimeUnit;

import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;

/**
* Fallback executor service in case the Virtual threads
*/
class FallbackVirtualThreadsExecutorService extends AbstractExecutorService implements VirtualThreadsExecutor {

@Override
public void execute(Runnable command) {
var context = Vertx.currentContext();
if (!(context instanceof ContextInternal)) {
Infrastructure.getDefaultWorkerPool().execute(command);
} else {
context.executeBlocking(() -> {
command.run();
return null;
}, false);
}
}

@Override
public void shutdown() {
// no-op
}

@Override
public List<Runnable> shutdownNow() {
return Collections.EMPTY_LIST;
}

@Override
public boolean isShutdown() {
return false;
}

@Override
public boolean isTerminated() {
return false;
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.quarkus.virtual.threads;

import java.util.concurrent.ExecutorService;

/**
* Marker interface for Quarkus managed VirtualThreadPerTaskExecutor
*/
public interface VirtualThreadsExecutor extends ExecutorService {
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.function.Supplier;

import org.jboss.logging.Logger;

import io.quarkus.runtime.LaunchMode;
import io.quarkus.runtime.ShutdownContext;
import io.quarkus.runtime.annotations.Recorder;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;

@Recorder
public class VirtualThreadsRecorder {
Expand All @@ -26,9 +24,16 @@ public class VirtualThreadsRecorder {

static VirtualThreadsConfig config = new VirtualThreadsConfig();

private static volatile Executor current;
private static volatile VirtualThreadsExecutor current;
private static final Object lock = new Object();

public static Supplier<VirtualThreadsExecutor> VIRTUAL_THREADS_EXECUTOR_SUPPLIER = new Supplier<VirtualThreadsExecutor>() {
@Override
public VirtualThreadsExecutor get() {
return VirtualThreadsRecorder.getCurrent();
}
};

public void setupVirtualThreads(VirtualThreadsConfig c, ShutdownContext shutdownContext, LaunchMode launchMode) {
config = c;
if (config.enabled) {
Expand Down Expand Up @@ -82,8 +87,12 @@ public void run() {
}
}

public static Executor getCurrent() {
Executor executor = current;
public Supplier<VirtualThreadsExecutor> getCurrentSupplier() {
return VIRTUAL_THREADS_EXECUTOR_SUPPLIER;
}

public static VirtualThreadsExecutor getCurrent() {
VirtualThreadsExecutor executor = current;
if (executor != null) {
return executor;
}
Expand Down Expand Up @@ -134,7 +143,7 @@ static ExecutorService newVirtualThreadExecutor()
* Since we try to load the "Loom-preview" classes/methods at runtime, the application can even be compiled
* using java 11 and executed with a loom-compliant JDK.
*/
private static Executor createExecutor() {
private static VirtualThreadsExecutor createExecutor() {
if (config.enabled) {
try {
return new ContextPreservingExecutorService(newVirtualThreadExecutor());
Expand All @@ -149,19 +158,6 @@ private static Executor createExecutor() {
}
}
// Fallback to regular worker threads
return new Executor() {
@Override
public void execute(Runnable command) {
var context = Vertx.currentContext();
if (!(context instanceof ContextInternal)) {
Infrastructure.getDefaultWorkerPool().execute(command);
} else {
context.executeBlocking(() -> {
command.run();
return null;
}, false);
}
}
};
return new FallbackVirtualThreadsExecutorService();
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package io.quarkus.virtual.rest;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;

import org.eclipse.microprofile.rest.client.inject.RestClient;

import io.quarkus.virtual.threads.VirtualThreadsExecutor;
import io.smallrye.common.annotation.RunOnVirtualThread;

@Path("/")
Expand All @@ -14,9 +16,13 @@ public class RestClientResource {
@RestClient
ServiceClient client;

@Inject
VirtualThreadsExecutor executor;

@GET
public Greeting test() {
AssertHelper.assertEverything();
assert executor != null;
return client.hello();
}

Expand Down

0 comments on commit 3044b38

Please sign in to comment.