Skip to content

Commit

Permalink
Merge pull request #36248 from ozangunalp/vthreads_executor_beans
Browse files Browse the repository at this point in the history
Added synthetic beans for the managed ExecutorService backed by virtual threads
  • Loading branch information
cescoffier authored Oct 17, 2023
2 parents adb9cbf + d5ba035 commit f62e45b
Show file tree
Hide file tree
Showing 8 changed files with 290 additions and 55 deletions.
44 changes: 44 additions & 0 deletions docs/src/main/asciidoc/virtual-threads.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,50 @@ quarkus.virtual-threads.name-prefix=
----

== Inject the virtual thread executor

In order to run tasks on virtual threads Quarkus manages an internal `ThreadPerTaskExecutor`.
In rare instances where you'd need to access this executor directly you can inject it using the `@VirtualThreads` CDI qualifier:

IMPORTANT: Injecting the Virtual Thread ExecutorService is experimental and may change in future versions.

[source,java]
----
package org.acme;
import org.acme.fortune.repository.FortuneRepository;
import java.util.concurrent.ExecutorService;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import jakarta.transaction.Transactional;
import io.quarkus.logging.Log;
import io.quarkus.runtime.StartupEvent;
import io.quarkus.virtual.threads.VirtualThreads;
public class MyApplication {
@Inject
FortuneRepository repository;
@Inject
@VirtualThreads
ExecutorService vThreads;
void onEvent(@Observes StartupEvent event) {
vThreads.execute(this::findAll);
}
@Transactional
void findAll() {
Log.info(repository.findAllBlocking());
}
}
----

== Testing virtual thread applications

As mentioned above, virtual threads have a few limitations that can drastically affect your application performance and memory usage.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
package io.quarkus.virtual.threads;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;

import org.jboss.jandex.AnnotationInstance;

import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.arc.deployment.SyntheticBeanBuildItem;
import io.quarkus.arc.processor.BuiltinScope;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.ExecutionTime;
import io.quarkus.deployment.annotations.Record;
Expand All @@ -12,8 +21,19 @@ public class VirtualThreadsProcessor {
@Record(ExecutionTime.STATIC_INIT)
public void setup(VirtualThreadsConfig config, VirtualThreadsRecorder recorder,
ShutdownContextBuildItem shutdownContextBuildItem,
LaunchModeBuildItem launchModeBuildItem) {
LaunchModeBuildItem launchModeBuildItem,
BuildProducer<AdditionalBeanBuildItem> beans,
BuildProducer<SyntheticBeanBuildItem> producer) {
beans.produce(new AdditionalBeanBuildItem(VirtualThreads.class));
recorder.setupVirtualThreads(config, shutdownContextBuildItem, launchModeBuildItem.getLaunchMode());
producer.produce(
SyntheticBeanBuildItem.configure(ExecutorService.class)
.addType(Executor.class)
.addQualifier(AnnotationInstance.builder(VirtualThreads.class).build())
.scope(BuiltinScope.APPLICATION.getInfo())
.setRuntimeInit()
.supplier(recorder.getCurrentSupplier())
.done());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package io.quarkus.virtual.threads;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* An implementation of {@code ExecutorService} that delegates to the real executor, while disallowing termination.
*/
class DelegatingExecutorService implements ExecutorService {
private final ExecutorService delegate;

DelegatingExecutorService(final ExecutorService delegate) {
this.delegate = delegate;
}

public void execute(final Runnable command) {
delegate.execute(command);
}

public boolean isShutdown() {
// container managed executors are never shut down from the application's perspective
return false;
}

public boolean isTerminated() {
// container managed executors are never shut down from the application's perspective
return false;
}

public boolean awaitTermination(final long timeout, final TimeUnit unit) {
return false;
}

public void shutdown() {
throw new UnsupportedOperationException("shutdown not allowed on managed executor service");
}

public List<Runnable> shutdownNow() {
throw new UnsupportedOperationException("shutdownNow not allowed on managed executor service");
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return delegate.submit(task);
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
return delegate.submit(task, result);
}

@Override
public Future<?> submit(Runnable task) {
return delegate.submit(task);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return delegate.invokeAll(tasks);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return delegate.invokeAll(tasks, timeout, unit);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return delegate.invokeAny(tasks);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return delegate.invokeAny(tasks, timeout, unit);
}

public String toString() {
return delegate.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package io.quarkus.virtual.threads;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.TimeUnit;

import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;

/**
* Fallback executor service implementation in case the virtual threads are disabled or not available on the current platform.
* <p>
* Executes tasks on the current Vert.x context worker pool, or when not available, on the Mutiny Infrastructure default worker
* pool
* Shutdown methods are no-op as the executor service is a wrapper around these previous execute methods.
*/
class FallbackVirtualThreadsExecutorService extends AbstractExecutorService {

@Override
public void execute(Runnable command) {
var context = Vertx.currentContext();
if (!(context instanceof ContextInternal)) {
Infrastructure.getDefaultWorkerPool().execute(command);
} else {
context.executeBlocking(() -> {
command.run();
return null;
}, false);
}
}

@Override
public void shutdown() {
// no-op
}

@Override
public List<Runnable> shutdownNow() {
return Collections.EMPTY_LIST;
}

@Override
public boolean isShutdown() {
return false;
}

@Override
public boolean isTerminated() {
return false;
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.quarkus.virtual.threads;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import jakarta.enterprise.util.AnnotationLiteral;
import jakarta.inject.Qualifier;

/**
* Qualifies an injected virtual threads executor service.
*/
@Qualifier
@Target({ FIELD, METHOD, PARAMETER })
@Retention(RUNTIME)
public @interface VirtualThreads {

final class Literal extends AnnotationLiteral<VirtualThreads> implements VirtualThreads {

public static final Literal INSTANCE = new Literal();

private static final long serialVersionUID = 1L;

}
}
Loading

0 comments on commit f62e45b

Please sign in to comment.