Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added synthetic beans for the managed ExecutorService backed by virtual threads #36248

Merged
merged 2 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions docs/src/main/asciidoc/virtual-threads.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,50 @@

----

== Inject the virtual thread executor

Check warning on line 450 in docs/src/main/asciidoc/virtual-threads.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsWarnings] Consider using 'to' rather than 'In order to' unless updating existing content that uses the term. Raw Output: {"message": "[Quarkus.TermsWarnings] Consider using 'to' rather than 'In order to' unless updating existing content that uses the term.", "location": {"path": "docs/src/main/asciidoc/virtual-threads.adoc", "range": {"start": {"line": 450, "column": 25}}}, "severity": "WARNING"}

Check warning on line 450 in docs/src/main/asciidoc/virtual-threads.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.Fluff] Depending on the context, consider using 'Be concise: use 'to' rather than' rather than 'In order to'. Raw Output: {"message": "[Quarkus.Fluff] Depending on the context, consider using 'Be concise: use 'to' rather than' rather than 'In order to'.", "location": {"path": "docs/src/main/asciidoc/virtual-threads.adoc", "range": {"start": {"line": 450, "column": 25}}}, "severity": "INFO"}

ozangunalp marked this conversation as resolved.
Show resolved Hide resolved
In order to run tasks on virtual threads Quarkus manages an internal `ThreadPerTaskExecutor`.
In rare instances where you'd need to access this executor directly you can inject it using the `@VirtualThreads` CDI qualifier:

Check warning on line 453 in docs/src/main/asciidoc/virtual-threads.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.Fluff] Depending on the context, consider using 'Rewrite the sentence, or use 'must', instead of' rather than 'need to'. Raw Output: {"message": "[Quarkus.Fluff] Depending on the context, consider using 'Rewrite the sentence, or use 'must', instead of' rather than 'need to'.", "location": {"path": "docs/src/main/asciidoc/virtual-threads.adoc", "range": {"start": {"line": 453, "column": 16}}}, "severity": "INFO"}

Check warning on line 453 in docs/src/main/asciidoc/virtual-threads.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsSuggestions] Depending on the context, consider using 'by using' or 'that uses' rather than 'using'. Raw Output: {"message": "[Quarkus.TermsSuggestions] Depending on the context, consider using 'by using' or 'that uses' rather than 'using'.", "location": {"path": "docs/src/main/asciidoc/virtual-threads.adoc", "range": {"start": {"line": 453, "column": 71}}}, "severity": "INFO"}

IMPORTANT: Injecting the Virtual Thread ExecutorService is experimental and may change in future versions.

Check warning on line 455 in docs/src/main/asciidoc/virtual-threads.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsWarnings] Consider using 'might (for possiblity)' or 'can (for ability)' rather than 'may' unless updating existing content that uses the term. Raw Output: {"message": "[Quarkus.TermsWarnings] Consider using 'might (for possiblity)' or 'can (for ability)' rather than 'may' unless updating existing content that uses the term.", "location": {"path": "docs/src/main/asciidoc/virtual-threads.adoc", "range": {"start": {"line": 455, "column": 77}}}, "severity": "WARNING"}

[source,java]
----
package org.acme;

import org.acme.fortune.repository.FortuneRepository;

import java.util.concurrent.ExecutorService;

import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import jakarta.transaction.Transactional;

import io.quarkus.logging.Log;
import io.quarkus.runtime.StartupEvent;
import io.quarkus.virtual.threads.VirtualThreads;

public class MyApplication {

@Inject
FortuneRepository repository;

@Inject
@VirtualThreads
ExecutorService vThreads;

void onEvent(@Observes StartupEvent event) {
vThreads.execute(this::findAll);
}

@Transactional
void findAll() {
Log.info(repository.findAllBlocking());
}

}
----

== Testing virtual thread applications

As mentioned above, virtual threads have a few limitations that can drastically affect your application performance and memory usage.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
package io.quarkus.virtual.threads;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;

import org.jboss.jandex.AnnotationInstance;

import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.arc.deployment.SyntheticBeanBuildItem;
import io.quarkus.arc.processor.BuiltinScope;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.ExecutionTime;
import io.quarkus.deployment.annotations.Record;
Expand All @@ -12,8 +21,19 @@ public class VirtualThreadsProcessor {
@Record(ExecutionTime.STATIC_INIT)
public void setup(VirtualThreadsConfig config, VirtualThreadsRecorder recorder,
ShutdownContextBuildItem shutdownContextBuildItem,
LaunchModeBuildItem launchModeBuildItem) {
LaunchModeBuildItem launchModeBuildItem,
BuildProducer<AdditionalBeanBuildItem> beans,
BuildProducer<SyntheticBeanBuildItem> producer) {
beans.produce(new AdditionalBeanBuildItem(VirtualThreads.class));
recorder.setupVirtualThreads(config, shutdownContextBuildItem, launchModeBuildItem.getLaunchMode());
producer.produce(
SyntheticBeanBuildItem.configure(ExecutorService.class)
.addType(Executor.class)
.addQualifier(AnnotationInstance.builder(VirtualThreads.class).build())
.scope(BuiltinScope.APPLICATION.getInfo())
.setRuntimeInit()
.supplier(recorder.getCurrentSupplier())
.done());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package io.quarkus.virtual.threads;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* An implementation of {@code ExecutorService} that delegates to the real executor, while disallowing termination.
*/
class DelegatingExecutorService implements ExecutorService {
private final ExecutorService delegate;

DelegatingExecutorService(final ExecutorService delegate) {
this.delegate = delegate;
}

public void execute(final Runnable command) {
delegate.execute(command);
}

public boolean isShutdown() {
// container managed executors are never shut down from the application's perspective
return false;
}

public boolean isTerminated() {
// container managed executors are never shut down from the application's perspective
return false;
}

public boolean awaitTermination(final long timeout, final TimeUnit unit) {
return false;
}

public void shutdown() {
throw new UnsupportedOperationException("shutdown not allowed on managed executor service");
}

public List<Runnable> shutdownNow() {
throw new UnsupportedOperationException("shutdownNow not allowed on managed executor service");
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return delegate.submit(task);
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
return delegate.submit(task, result);
}

@Override
public Future<?> submit(Runnable task) {
return delegate.submit(task);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return delegate.invokeAll(tasks);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return delegate.invokeAll(tasks, timeout, unit);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return delegate.invokeAny(tasks);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return delegate.invokeAny(tasks, timeout, unit);
}

public String toString() {
return delegate.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package io.quarkus.virtual.threads;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.TimeUnit;

import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;

/**
* Fallback executor service implementation in case the virtual threads are disabled or not available on the current platform.
* <p>
* Executes tasks on the current Vert.x context worker pool, or when not available, on the Mutiny Infrastructure default worker
* pool
* Shutdown methods are no-op as the executor service is a wrapper around these previous execute methods.
*/
class FallbackVirtualThreadsExecutorService extends AbstractExecutorService {

@Override
public void execute(Runnable command) {
var context = Vertx.currentContext();
if (!(context instanceof ContextInternal)) {
Infrastructure.getDefaultWorkerPool().execute(command);
} else {
context.executeBlocking(() -> {
command.run();
return null;
}, false);
}
}

@Override
public void shutdown() {
// no-op
}

@Override
public List<Runnable> shutdownNow() {
return Collections.EMPTY_LIST;
}

@Override
public boolean isShutdown() {
return false;
}

@Override
public boolean isTerminated() {
return false;
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.quarkus.virtual.threads;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import jakarta.enterprise.util.AnnotationLiteral;
import jakarta.inject.Qualifier;

/**
* Qualifies an injected virtual threads executor service.
*/
@Qualifier
@Target({ FIELD, METHOD, PARAMETER })
@Retention(RUNTIME)
public @interface VirtualThreads {

final class Literal extends AnnotationLiteral<VirtualThreads> implements VirtualThreads {

public static final Literal INSTANCE = new Literal();

private static final long serialVersionUID = 1L;

}
}
Loading
Loading