From 1c825efeb997a30191611dd8725d18241706c883 Mon Sep 17 00:00:00 2001 From: Clement Escoffier Date: Tue, 18 Jul 2023 13:52:48 +0200 Subject: [PATCH] Implement support for @RunOnVirtualThread for gRPC services. This commit allows gRPC methods to be invoked on virtual threads (while keeping the duplicated context and request scope). A new virtual thread is created for each invocation. Note that methods ingesting streams are not supported for now. This commit also sets up the structure to run integration tests verifying the virtual thread support. --- .github/workflows/ci-actions-incremental.yml | 45 ++- .../asciidoc/grpc-service-implementation.adoc | 4 + .../main/asciidoc/grpc-virtual-threads.adoc | 150 ++++++++++ .../deployment/BindableServiceBuildItem.java | 16 ++ .../quarkus/grpc/deployment/GrpcDotNames.java | 2 + .../grpc/deployment/GrpcServerProcessor.java | 78 +++-- .../deployment/GrpcServerProcessorTest.java | 2 +- .../MutinyServiceBlockingMethodTest.java | 2 +- .../grpc/runtime/GrpcServerRecorder.java | 150 ++++++++-- .../blocking/BlockingServerInterceptor.java | 99 +++++-- .../BlockingServerInterceptorTest.java | 3 +- integration-tests/pom.xml | 14 + .../disable-native-profile | 1 + .../grpc-virtual-threads/pom.xml | 99 +++++++ .../grpc/example/streaming/AssertHelper.java | 53 ++++ .../example/streaming/TestServiceImpl.java | 56 ++++ .../src/main/proto/empty.proto | 45 +++ .../src/main/proto/messages.proto | 178 ++++++++++++ .../src/main/proto/test.proto | 22 ++ .../src/main/resources/application.properties | 15 + .../streaming/VertxVirtualThreadTest.java | 10 + .../example/streaming/VirtualThreadTest.java | 7 + .../streaming/VirtualThreadTestBase.java | 48 ++++ integration-tests/virtual-threads/pom.xml | 266 ++++++++++++++++++ 24 files changed, 1303 insertions(+), 62 deletions(-) create mode 100644 docs/src/main/asciidoc/grpc-virtual-threads.adoc create mode 100644 integration-tests/virtual-threads/grpc-virtual-threads/disable-native-profile create mode 100644 integration-tests/virtual-threads/grpc-virtual-threads/pom.xml create mode 100644 integration-tests/virtual-threads/grpc-virtual-threads/src/main/java/io/quarkus/grpc/example/streaming/AssertHelper.java create mode 100644 integration-tests/virtual-threads/grpc-virtual-threads/src/main/java/io/quarkus/grpc/example/streaming/TestServiceImpl.java create mode 100644 integration-tests/virtual-threads/grpc-virtual-threads/src/main/proto/empty.proto create mode 100644 integration-tests/virtual-threads/grpc-virtual-threads/src/main/proto/messages.proto create mode 100644 integration-tests/virtual-threads/grpc-virtual-threads/src/main/proto/test.proto create mode 100644 integration-tests/virtual-threads/grpc-virtual-threads/src/main/resources/application.properties create mode 100644 integration-tests/virtual-threads/grpc-virtual-threads/src/test/java/io/quarkus/grpc/example/streaming/VertxVirtualThreadTest.java create mode 100644 integration-tests/virtual-threads/grpc-virtual-threads/src/test/java/io/quarkus/grpc/example/streaming/VirtualThreadTest.java create mode 100644 integration-tests/virtual-threads/grpc-virtual-threads/src/test/java/io/quarkus/grpc/example/streaming/VirtualThreadTestBase.java create mode 100644 integration-tests/virtual-threads/pom.xml diff --git a/.github/workflows/ci-actions-incremental.yml b/.github/workflows/ci-actions-incremental.yml index 4b355461d2237..6695c91e5a1fc 100644 --- a/.github/workflows/ci-actions-incremental.yml +++ b/.github/workflows/ci-actions-incremental.yml @@ -602,7 +602,50 @@ jobs: quarkus-quickstarts/target/build-report.json quarkus-quickstarts/LICENSE retention-days: 2 - + virtual-thread-tests: + name: Virtual Thread Support Tests - JDK ${{matrix.java.name}} + runs-on: ${{matrix.java.os-name}} + needs: [build-jdk11, calculate-test-jobs] + # Skip main in forks + if: "needs.calculate-test-jobs.outputs.run_quickstarts == 'true' && (github.repository == 'quarkusio/quarkus' || !endsWith(github.ref, '/main'))" + timeout-minutes: 90 + strategy: + fail-fast: false + matrix: + java: + - { + name: "20", + java-version: 20, + os-name: "ubuntu-latest", + extra-args: "--enable-preview" + } + steps: + - uses: actions/checkout@v3 + - name: Download Maven Repo + uses: actions/download-artifact@v3 + with: + name: maven-repo + path: . + - name: Extract Maven Repo + shell: bash + run: tar -xzf maven-repo.tgz -C ~ + - name: Set up JDK ${{ matrix.java.java-version }} + uses: actions/setup-java@v3 + with: + distribution: temurin + java-version: ${{ matrix.java.java-version }} + - name: Run tests + run: | + export LANG=en_US && ./mvnw -e -B -fae --settings .github/mvn-settings.xml -f integration-tests/virtual-threads clean verify -Dextra-args=${{matrix.java.extra-args}} + - name: Upload build reports (if build failed) + uses: actions/upload-artifact@v3 + if: ${{ failure() || cancelled() }} + with: + name: "build-reports-Virtual Thread Support - JDK ${{matrix.java.name}}" + path: | + integration-tests/virtual-threads/**/target/*-reports/TEST-*.xml + integration-tests/virtual-threads/target/build-report.json + retention-days: 2 tcks-test: name: MicroProfile TCKs Tests needs: [build-jdk11, calculate-test-jobs] diff --git a/docs/src/main/asciidoc/grpc-service-implementation.adoc b/docs/src/main/asciidoc/grpc-service-implementation.adoc index 0b909e6b38f9b..f49e1161aead9 100644 --- a/docs/src/main/asciidoc/grpc-service-implementation.adoc +++ b/docs/src/main/asciidoc/grpc-service-implementation.adoc @@ -421,3 +421,7 @@ To disable the gRPC server metrics when `quarkus-micrometer` is used, add the fo ---- quarkus.micrometer.binder.grpc-server.enabled=false ---- + +=== Use virtual threads + +To use virtual threads in your gRPC service implementation, check the dedicated xref:./grpc-virtual-threads.adoc[guide]. diff --git a/docs/src/main/asciidoc/grpc-virtual-threads.adoc b/docs/src/main/asciidoc/grpc-virtual-threads.adoc new file mode 100644 index 0000000000000..eb027e7b7d375 --- /dev/null +++ b/docs/src/main/asciidoc/grpc-virtual-threads.adoc @@ -0,0 +1,150 @@ += Quarkus Virtual Thread support for gRPC services + +include::_attributes.adoc[] +:runonvthread: https://javadoc.io/doc/io.smallrye.common/smallrye-common-annotation/latest/io/smallrye/common/annotation/RunOnVirtualThread.html +:blocking_annotation: https://javadoc.io/doc/io.smallrye.reactive/smallrye-reactive-messaging-api/latest/io/smallrye/reactive/messaging/annotations/Blocking.html + +This guide explains how to benefit from Java virtual threads when implementing a gRPC service. + +[TIP] +==== +This guide focuses on using virtual threads with the gRPC extensions. +Please refer to xref:virtual-threads.adoc[Writing simpler reactive REST services with Quarkus Virtual Thread support] +to read more about Java virtual threads in general and the Quarkus Virtual Thread support. +==== + +By default, the Quarkus gRPC extension invokes service methods on an event-loop thread. +See the xref:quarkus-reactive-architecture.adoc[Quarkus Reactive Architecture documentation] for further details on this topic. +But, you can also use the link:{blocking_annotation}[@Blocking] annotation to indicate that the service is _blocking_ and should be run on a worker thread. + +The idea behind Quarkus Virtual Thread support for gRPC services is to offload the service method invocation on virtual threads, instead of running it on an event-loop thread or a worker thread. + +To enable virtual thread support on a service method, simply add the link:{runonvthread}[@RunOnVirtualThread] annotation to the method. +If the JDK is compatible (Java 19 or later versions) then the invocation will be offloaded to a new virtual thread. +It will then be possible to perform blocking operations without blocking the platform thread upon which the virtual thread is mounted. + +== Configuring gRPC services to use virtual threads + +Let's see an example of how to implement a gRPC service using virtual threads. +First, make sure to have the gRPC extension dependency in your build file: + +[source,xml,role="primary asciidoc-tabs-target-sync-cli asciidoc-tabs-target-sync-maven"] +.pom.xml +---- + + io.quarkus + quarkus-grpc + +---- + +[source,gradle,role="secondary asciidoc-tabs-target-sync-gradle"] +.build.gradle +---- +implementation("io.quarkus:quarkus-grpc") +---- + +You also need to make sure that you are using Java 19 or later, this can be enforced in your `pom.xml` file with the following: + +[source,xml,role="primary asciidoc-tabs-target-sync-cli asciidoc-tabs-target-sync-maven"] +.pom.xml +---- + + 19 + 19 + +---- + +Virtual threads are still a preview feature, so you need to start your application with the `--enable-preview` flag: + +[source, bash] +---- +java --enable-preview -jar target/quarkus-app/quarkus-run.jar +---- + +or to use the Quarkus Dev mode, insert the following to the `quarkus-maven-plugin` configuration: + +[source,xml,role="primary asciidoc-tabs-target-sync-cli asciidoc-tabs-target-sync-maven"] +.pom.xml +---- + + io.quarkus + quarkus-maven-plugin + ${quarkus.version} + + + + build + generate-code + generate-code-tests + + + + + 19 + 19 + + --enable-preview + + + +---- + +NOTE: The `--enable-preview` flag is not necessary with Java 21+. + +Then you can start using the annotation `@RunOnVirtualThread` in your service implementation: + +[source, java] +---- +package io.quarkus.grpc.example.streaming; + +import com.google.protobuf.ByteString; +import com.google.protobuf.EmptyProtos; + +import io.grpc.testing.integration.Messages; +import io.grpc.testing.integration.TestService; +import io.quarkus.grpc.GrpcService; +import io.smallrye.common.annotation.RunOnVirtualThread; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; + +@GrpcService +public class TestServiceImpl implements TestService { + + @RunOnVirtualThread + @Override + public Uni emptyCall(EmptyProtos.Empty request) { + return Uni.createFrom().item(EmptyProtos.Empty.newBuilder().build()); + } + + @RunOnVirtualThread + @Override + public Uni unaryCall(Messages.SimpleRequest request) { + var value = request.getPayload().getBody().toStringUtf8(); + var resp = Messages.SimpleResponse.newBuilder() + .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFromUtf8(value.toUpperCase())).build()) + .build(); + return Uni.createFrom().item(resp); + } + + @Override + @RunOnVirtualThread + public Multi streamingOutputCall(Messages.StreamingOutputCallRequest request) { + var value = request.getPayload().getBody().toStringUtf8(); + return Multi.createFrom(). emitter(emitter -> { + emitter.emit(value.toUpperCase()); + emitter.emit(value.toUpperCase()); + emitter.emit(value.toUpperCase()); + emitter.complete(); + }).map(v -> Messages.StreamingOutputCallResponse.newBuilder() + .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFromUtf8(v)).build()) + .build()); + } +} + +---- + +[IMPORTANT] +.Limitations +==== +The gRPC methods receiving _streams_, such as a `Multi` cannot use `@RunOnVirtualThread`, as the method must not be blocking and produce its result (`Multi` or `Uni`) immediately. +==== \ No newline at end of file diff --git a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/BindableServiceBuildItem.java b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/BindableServiceBuildItem.java index 0019faa6ca4bd..be68a67bb892c 100644 --- a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/BindableServiceBuildItem.java +++ b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/BindableServiceBuildItem.java @@ -11,6 +11,7 @@ public final class BindableServiceBuildItem extends MultiBuildItem { final DotName serviceClass; final List blockingMethods = new ArrayList<>(); + final List virtualMethods = new ArrayList<>(); public BindableServiceBuildItem(DotName serviceClass) { this.serviceClass = serviceClass; @@ -27,10 +28,25 @@ public void registerBlockingMethod(String method) { blockingMethods.add(method); } + /** + * A method from {@code serviceClass} is annotated with {@link io.smallrye.common.annotation.RunOnVirtualThread}. + * Stores the method name so the runtime interceptor can recognize it. + * Note: gRPC method have unique names - overloading is not permitted. + * + * @param method the method name + */ + public void registerVirtualMethod(String method) { + virtualMethods.add(method); + } + public boolean hasBlockingMethods() { return !blockingMethods.isEmpty(); } + public boolean hasVirtualMethods() { + return !virtualMethods.isEmpty(); + } + public DotName getServiceClass() { return serviceClass; } diff --git a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcDotNames.java b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcDotNames.java index 8a59a56b0fb95..08cf2daab02b9 100644 --- a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcDotNames.java +++ b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcDotNames.java @@ -28,6 +28,7 @@ import io.quarkus.grpc.runtime.supports.GrpcClientConfigProvider; import io.smallrye.common.annotation.Blocking; import io.smallrye.common.annotation.NonBlocking; +import io.smallrye.common.annotation.RunOnVirtualThread; public class GrpcDotNames { @@ -38,6 +39,7 @@ public class GrpcDotNames { public static final DotName BLOCKING = DotName.createSimple(Blocking.class.getName()); public static final DotName NON_BLOCKING = DotName.createSimple(NonBlocking.class.getName()); + public static final DotName RUN_ON_VIRTUAL_THREAD = DotName.createSimple(RunOnVirtualThread.class.getName()); public static final DotName TRANSACTIONAL = DotName.createSimple("jakarta.transaction.Transactional"); public static final DotName ABSTRACT_BLOCKING_STUB = DotName.createSimple(AbstractBlockingStub.class.getName()); diff --git a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcServerProcessor.java b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcServerProcessor.java index 2b7888e326cc7..af4a5d85e947e 100644 --- a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcServerProcessor.java +++ b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcServerProcessor.java @@ -5,6 +5,7 @@ import static io.quarkus.grpc.deployment.GrpcDotNames.BLOCKING; import static io.quarkus.grpc.deployment.GrpcDotNames.MUTINY_SERVICE; import static io.quarkus.grpc.deployment.GrpcDotNames.NON_BLOCKING; +import static io.quarkus.grpc.deployment.GrpcDotNames.RUN_ON_VIRTUAL_THREAD; import static io.quarkus.grpc.deployment.GrpcDotNames.TRANSACTIONAL; import static io.quarkus.grpc.deployment.GrpcInterceptors.MICROMETER_INTERCEPTORS; import static java.util.Arrays.asList; @@ -118,6 +119,8 @@ void processGeneratedBeans(CombinedIndexBuildItem index, BuildProducer blocking methods Map> generatedBeans = new HashMap<>(); + // generated bean class -> virtual methods + Map> virtuals = new HashMap<>(); String[] excludedPackages = { "grpc.health.v1", "io.grpc.reflection" }; // We need to transform the generated bean and register a bindable service if: @@ -169,20 +172,32 @@ void processGeneratedBeans(CombinedIndexBuildItem index, BuildProducer blockingMethods = gatherBlockingMethodNames(userDefinedBean, index.getIndex()); - + Set blockingMethods = gatherBlockingOrVirtualMethodNames(userDefinedBean, index.getIndex(), false); + Set virtualMethods = gatherBlockingOrVirtualMethodNames(userDefinedBean, index.getIndex(), true); generatedBeans.put(generatedBean.name(), blockingMethods); + virtuals.put(generatedBean.name(), virtualMethods); } } - if (!generatedBeans.isEmpty()) { + if (!generatedBeans.isEmpty() || !virtuals.isEmpty()) { // For every suitable bean we must: // (a) add @Singleton and @GrpcService // (b) register a BindableServiceBuildItem, incl. all blocking methods (derived from the user-defined impl) - for (Entry> entry : generatedBeans.entrySet()) { - BindableServiceBuildItem bindableService = new BindableServiceBuildItem(entry.getKey()); - for (String blockingMethod : entry.getValue()) { - bindableService.registerBlockingMethod(blockingMethod); + Set names = new HashSet<>(generatedBeans.keySet()); + names.addAll(virtuals.keySet()); + for (DotName name : names) { + BindableServiceBuildItem bindableService = new BindableServiceBuildItem(name); + var blocking = generatedBeans.get(name); + var rovt = virtuals.get(name); + if (blocking != null) { + for (String blockingMethod : blocking) { + bindableService.registerBlockingMethod(blockingMethod); + } + } + if (rovt != null) { + for (String virtualMethod : rovt) { + bindableService.registerVirtualMethod(virtualMethod); + } } bindables.produce(bindableService); } @@ -220,10 +235,14 @@ void discoverBindableServices(BuildProducer bindables, continue; } BindableServiceBuildItem item = new BindableServiceBuildItem(service.name()); - Set blockingMethods = gatherBlockingMethodNames(service, index); + Set blockingMethods = gatherBlockingOrVirtualMethodNames(service, index, false); + Set virtualMethods = gatherBlockingOrVirtualMethodNames(service, index, true); for (String method : blockingMethods) { item.registerBlockingMethod(method); } + for (String method : virtualMethods) { + item.registerVirtualMethod(method); + } bindables.produce(item); } } @@ -255,6 +274,7 @@ private static List classHierarchy(ClassInfo service, IndexView index private enum BlockingMode { UNDEFINED(false), BLOCKING(true), + VIRTUAL_THREAD(true), NON_BLOCKING(false), // @Transactional on a method IMPLICIT(true); @@ -270,17 +290,24 @@ private static BlockingMode nonInheritedBlockingMode(Predicate checker, Supplier exceptionMsgSupplier) { boolean blocking = checker.test(BLOCKING); boolean nonBlocking = checker.test(NON_BLOCKING); + boolean vt = checker.test(RUN_ON_VIRTUAL_THREAD); if (blocking && nonBlocking) { throw new DeploymentException(exceptionMsgSupplier.get()); } - if (blocking) { + if (nonBlocking && vt) { + throw new DeploymentException(exceptionMsgSupplier.get()); + } + if (blocking && !vt) { return BlockingMode.BLOCKING; } + if (vt) { + return BlockingMode.VIRTUAL_THREAD; + } if (nonBlocking) { return BlockingMode.NON_BLOCKING; } boolean transactional = checker.test(TRANSACTIONAL); - if (transactional) { + if (transactional) { // Cannot be on a virtual thread here. return BlockingMode.IMPLICIT; } return BlockingMode.UNDEFINED; @@ -294,7 +321,8 @@ private static BlockingMode nonInheritedBlockingMode(Predicate checker, private static BlockingMode inheritedBlockingMode(Predicate checker, BlockingMode currentlyKnownMode) { boolean transactional = checker.test(TRANSACTIONAL); - if (transactional) { + boolean vt = checker.test(RUN_ON_VIRTUAL_THREAD); + if (transactional && !vt) { return BlockingMode.IMPLICIT; } return currentlyKnownMode; @@ -310,7 +338,7 @@ private static BlockingMode inheritedBlockingMode(Predicate checker, *

* Otherwise returns the "topmost" "non-explicit" annotation (aka {@link jakarta.transaction.Transactional}). */ - private static boolean methodIsBlocking(List classes, String methodName, Type[] methodArgs) { + private static BlockingMode getMethodBlockingMode(List classes, String methodName, Type[] methodArgs) { BlockingMode classModeInherited = BlockingMode.UNDEFINED; BlockingMode methodMode = BlockingMode.UNDEFINED; for (int i = 0; i < classes.size(); i++) { @@ -327,10 +355,11 @@ private static boolean methodIsBlocking(List classes, String methodNa }; methodMode = nonInheritedBlockingMode(annotationOnMethod, () -> "Method '" + method.declaringClass().name() + "#" + method.name() + - "' contains both @Blocking and @NonBlocking annotations."); + "' contains both @Blocking and @NonBlocking or both @NonBlocking and @RunOnVirtualThread annotations."); if (methodMode == BlockingMode.UNDEFINED) { methodMode = nonInheritedBlockingMode(annotationOnClass, - () -> "Class '" + ci.name() + "' contains both @Blocking and @NonBlocking annotations."); + () -> "Class '" + ci.name() + + "' contains both @Blocking and @NonBlocking or both @NonBlocking and @RunOnVirtualThread annotations."); } // Handles the case when a method's overridden without an explicit annotation and @Transactional is defined on a superclass @@ -346,9 +375,9 @@ private static boolean methodIsBlocking(List classes, String methodNa } } if (methodMode != BlockingMode.UNDEFINED) { - return methodMode.blocking; + return methodMode; } - return classModeInherited.blocking; + return classModeInherited; } /** @@ -374,7 +403,7 @@ private static boolean methodIsBlocking(List classes, String methodNa *

  • Else: non-blocking.
  • * */ - static Set gatherBlockingMethodNames(ClassInfo service, IndexView index) { + static Set gatherBlockingOrVirtualMethodNames(ClassInfo service, IndexView index, boolean virtual) { Set result = new HashSet<>(); @@ -418,7 +447,11 @@ static Set gatherBlockingMethodNames(ClassInfo service, IndexView index) } // Find the annotations for the current method. - if (methodIsBlocking(classes, methodName, implBaseMethod.parameterTypes().toArray(new Type[0]))) { + BlockingMode blocking = getMethodBlockingMode(classes, methodName, + implBaseMethod.parameterTypes().toArray(new Type[0])); + if (virtual && blocking == BlockingMode.VIRTUAL_THREAD) { + result.add(methodName); + } else if (blocking.blocking) { result.add(methodName); } } @@ -563,6 +596,7 @@ void gatherGrpcInterceptors(BeanArchiveIndexBuildItem indexBuildItem, List additionalGlobalInterceptors, List delegatingGrpcBeans, BuildProducer syntheticBeans, + RecorderContext recorderContext, GrpcServerRecorder recorder) { @@ -659,13 +693,19 @@ ServiceStartBuildItem initializeServer(GrpcServerRecorder recorder, blocking.put(bindable.serviceClass.toString(), bindable.blockingMethods); } } + Map> virtuals = new HashMap<>(); + for (BindableServiceBuildItem bindable : bindables) { + if (bindable.hasVirtualMethods()) { + virtuals.put(bindable.serviceClass.toString(), bindable.virtualMethods); + } + } if (!bindables.isEmpty() || (LaunchMode.current() == LaunchMode.DEVELOPMENT && buildTimeConfig.devMode.forceServerStart)) { //Uses mainrouter when the 'quarkus.http.root-path' is not '/' recorder.initializeGrpcServer(vertx.getVertx(), routerBuildItem.getMainRouter() != null ? routerBuildItem.getMainRouter() : routerBuildItem.getHttpRouter(), - config, shutdown, blocking, launchModeBuildItem.getLaunchMode()); + config, shutdown, blocking, virtuals, launchModeBuildItem.getLaunchMode()); return new ServiceStartBuildItem(GRPC_SERVER); } return null; diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/deployment/GrpcServerProcessorTest.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/deployment/GrpcServerProcessorTest.java index 6a0896add0cb0..fafe4961b97a9 100644 --- a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/deployment/GrpcServerProcessorTest.java +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/deployment/GrpcServerProcessorTest.java @@ -66,7 +66,7 @@ public void blockingAnnotations(Class clazz, Set expectedBlocking) th ClassInfo classInfo = index.getClassByName(className); - assertThat(GrpcServerProcessor.gatherBlockingMethodNames(classInfo, index)) + assertThat(GrpcServerProcessor.gatherBlockingOrVirtualMethodNames(classInfo, index, false)) .containsExactlyInAnyOrderElementsOf(expectedBlocking); } diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/deployment/MutinyServiceBlockingMethodTest.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/deployment/MutinyServiceBlockingMethodTest.java index 53ca300bed45c..df1086813b58b 100644 --- a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/deployment/MutinyServiceBlockingMethodTest.java +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/deployment/MutinyServiceBlockingMethodTest.java @@ -28,7 +28,7 @@ public void testBlocking() throws Exception { ClassInfo classInfo = index.getClassByName(className); - assertThat(GrpcServerProcessor.gatherBlockingMethodNames(classInfo, index)) + assertThat(GrpcServerProcessor.gatherBlockingOrVirtualMethodNames(classInfo, index, false)) .containsExactlyInAnyOrderElementsOf(List.of("sayHello", "wEIRD")); } diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java index 7217ae4c5fec5..de8af94ba457d 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java @@ -6,6 +6,7 @@ import java.io.File; import java.io.IOException; import java.io.UncheckedIOException; +import java.lang.reflect.InvocationTargetException; import java.net.BindException; import java.time.Duration; import java.util.AbstractMap; @@ -19,10 +20,13 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import java.util.regex.Pattern; import jakarta.enterprise.inject.Instance; @@ -59,6 +63,7 @@ import io.quarkus.runtime.ShutdownContext; import io.quarkus.runtime.annotations.Recorder; import io.quarkus.vertx.http.runtime.PortSystemProperties; +import io.smallrye.mutiny.infrastructure.Infrastructure; import io.vertx.core.AbstractVerticle; import io.vertx.core.AsyncResult; import io.vertx.core.Context; @@ -66,6 +71,7 @@ import io.vertx.core.Handler; import io.vertx.core.Promise; import io.vertx.core.Vertx; +import io.vertx.core.impl.ContextInternal; import io.vertx.ext.web.Route; import io.vertx.ext.web.Router; import io.vertx.ext.web.RoutingContext; @@ -79,12 +85,13 @@ public class GrpcServerRecorder { private static final Logger LOGGER = Logger.getLogger(GrpcServerRecorder.class.getName()); private static final AtomicInteger grpcVerticleCount = new AtomicInteger(0); - private static volatile DevModeWrapper devModeWrapper; private static volatile List services = Collections.emptyList(); private static final Pattern GRPC_CONTENT_TYPE = Pattern.compile("^application/grpc.*"); + private static final Logger logger = Logger.getLogger(GrpcServerRecorder.class); + public static List getServices() { return services; } @@ -93,7 +100,9 @@ public void initializeGrpcServer(RuntimeValue vertxSupplier, RuntimeValue routerSupplier, GrpcConfiguration cfg, ShutdownContext shutdown, - Map> blockingMethodsPerService, LaunchMode launchMode) { + Map> blockingMethodsPerService, + Map> virtualMethodsPerService, + LaunchMode launchMode) { GrpcContainer grpcContainer = Arc.container().instance(GrpcContainer.class).get(); if (grpcContainer == null) { throw new IllegalStateException("gRPC not initialized, GrpcContainer not found"); @@ -118,16 +127,20 @@ public void initializeGrpcServer(RuntimeValue vertxSupplier, // start single server, not in a verticle, regardless of the configuration.instances // for reason unknown to me, verticles occasionally get undeployed on dev mode reload if (GrpcServerReloader.getServer() != null || (provider != null && provider.serverAlreadyExists())) { - devModeReload(grpcContainer, vertx, configuration, provider, blockingMethodsPerService, shutdown); + devModeReload(grpcContainer, vertx, configuration, provider, blockingMethodsPerService, + virtualMethodsPerService, shutdown); } else { - devModeStart(grpcContainer, vertx, configuration, provider, blockingMethodsPerService, shutdown, + devModeStart(grpcContainer, vertx, configuration, provider, blockingMethodsPerService, + virtualMethodsPerService, shutdown, launchMode); } } else { - prodStart(grpcContainer, vertx, configuration, provider, blockingMethodsPerService, launchMode); + prodStart(grpcContainer, vertx, configuration, provider, blockingMethodsPerService, virtualMethodsPerService, + launchMode); } } else { - buildGrpcServer(vertx, configuration, routerSupplier, shutdown, blockingMethodsPerService, grpcContainer, + buildGrpcServer(vertx, configuration, routerSupplier, shutdown, blockingMethodsPerService, virtualMethodsPerService, + grpcContainer, launchMode); } } @@ -135,6 +148,7 @@ public void initializeGrpcServer(RuntimeValue vertxSupplier, // TODO -- handle XDS private void buildGrpcServer(Vertx vertx, GrpcServerConfiguration configuration, RuntimeValue routerSupplier, ShutdownContext shutdown, Map> blockingMethodsPerService, + Map> virtualMethodsPerService, GrpcContainer grpcContainer, LaunchMode launchMode) { GrpcServer server = GrpcServer.server(vertx); @@ -153,7 +167,7 @@ private void buildGrpcServer(Vertx vertx, GrpcServerConfiguration configuration, for (GrpcServiceDefinition service : toBeRegistered) { ServerServiceDefinition defWithInterceptors = serviceWithInterceptors( - vertx, grpcContainer, blockingMethodsPerService, compressionInterceptor, service, + vertx, grpcContainer, blockingMethodsPerService, virtualMethodsPerService, compressionInterceptor, service, launchMode == LaunchMode.DEVELOPMENT); LOGGER.debugf("Registered gRPC service '%s'", service.definition.getServiceDescriptor().getName()); ServerServiceDefinition serviceDefinition = ServerInterceptors.intercept(defWithInterceptors, globalInterceptors); @@ -211,11 +225,14 @@ private static boolean isGrpc(RoutingContext rc) { } private void prodStart(GrpcContainer grpcContainer, Vertx vertx, GrpcServerConfiguration configuration, - GrpcBuilderProvider provider, Map> blockingMethodsPerService, LaunchMode launchMode) { + GrpcBuilderProvider provider, Map> blockingMethodsPerService, + Map> virtualMethodsPerService, + LaunchMode launchMode) { CompletableFuture startResult = new CompletableFuture<>(); vertx.deployVerticle( - () -> new GrpcServerVerticle(configuration, grpcContainer, provider, launchMode, blockingMethodsPerService), + () -> new GrpcServerVerticle(configuration, grpcContainer, provider, launchMode, blockingMethodsPerService, + virtualMethodsPerService), new DeploymentOptions().setInstances(configuration.instances), result -> { if (result.failed()) { @@ -265,11 +282,13 @@ private void initHealthStorage() { } private void devModeStart(GrpcContainer grpcContainer, Vertx vertx, GrpcServerConfiguration configuration, - GrpcBuilderProvider provider, Map> blockingMethodsPerService, ShutdownContext shutdown, + GrpcBuilderProvider provider, Map> blockingMethodsPerService, + Map> virtualMethodsPerService, + ShutdownContext shutdown, LaunchMode launchMode) { Map.Entry portToServer = buildServer(vertx, configuration, provider, - blockingMethodsPerService, grpcContainer, launchMode); + blockingMethodsPerService, virtualMethodsPerService, grpcContainer, launchMode); Server server = portToServer.getValue(); if (provider == null) { @@ -402,7 +421,8 @@ public static String getImplementationClassName(BindableService service) { } private void devModeReload(GrpcContainer grpcContainer, Vertx vertx, GrpcServerConfiguration configuration, - GrpcBuilderProvider provider, Map> blockingMethodsPerService, ShutdownContext shutdown) { + GrpcBuilderProvider provider, Map> blockingMethodsPerService, + Map> virtualMethodsPerService, ShutdownContext shutdown) { List services = collectServiceDefinitions(grpcContainer.getServices()); List definitions = new ArrayList<>(); @@ -420,7 +440,7 @@ private void devModeReload(GrpcContainer grpcContainer, Vertx vertx, GrpcServerC CompressionInterceptor compressionInterceptor = prepareCompressionInterceptor(configuration); for (GrpcServiceDefinition service : services) { servicesWithInterceptors.add( - serviceWithInterceptors(vertx, grpcContainer, blockingMethodsPerService, + serviceWithInterceptors(vertx, grpcContainer, blockingMethodsPerService, virtualMethodsPerService, compressionInterceptor, service, true)); } @@ -458,6 +478,7 @@ public RuntimeValue initServerInterceptorStorage( @SuppressWarnings("rawtypes") private Map.Entry buildServer(Vertx vertx, GrpcServerConfiguration configuration, GrpcBuilderProvider provider, Map> blockingMethodsPerService, + Map> virtualMethodsPerService, GrpcContainer grpcContainer, LaunchMode launchMode) { int port = launchMode == LaunchMode.TEST ? configuration.testPort : configuration.port; @@ -509,6 +530,7 @@ private Map.Entry buildServer(Vertx vertx, GrpcServerConfigurat for (GrpcServiceDefinition service : toBeRegistered) { builder.addService( serviceWithInterceptors(vertx, grpcContainer, blockingMethodsPerService, + virtualMethodsPerService, compressionInterceptor, service, launchMode == LaunchMode.DEVELOPMENT)); LOGGER.debugf("Registered gRPC service '%s'", service.definition.getServiceDescriptor().getName()); definitions.add(service.definition); @@ -549,7 +571,9 @@ private CompressionInterceptor prepareCompressionInterceptor(GrpcServerConfigura } private ServerServiceDefinition serviceWithInterceptors(Vertx vertx, GrpcContainer grpcContainer, - Map> blockingMethodsPerService, CompressionInterceptor compressionInterceptor, + Map> blockingMethodsPerService, + Map> virtualMethodsPerService, + CompressionInterceptor compressionInterceptor, GrpcServiceDefinition service, boolean devMode) { List interceptors = new ArrayList<>(); if (compressionInterceptor != null) { @@ -558,11 +582,13 @@ private ServerServiceDefinition serviceWithInterceptors(Vertx vertx, GrpcContain interceptors.addAll(grpcContainer.getSortedPerServiceInterceptors(service.getImplementationClassName())); - // We only register the blocking interceptor if needed by at least one method of the service. + // We only register the blocking interceptor if needed by at least one method of the service (either blocking or runOnVirtualThread) if (!blockingMethodsPerService.isEmpty()) { List list = blockingMethodsPerService.get(service.getImplementationClassName()); - if (list != null) { - interceptors.add(new BlockingServerInterceptor(vertx, list, devMode)); + List virtuals = virtualMethodsPerService.get(service.getImplementationClassName()); + if (list != null || virtuals != null) { + interceptors + .add(new BlockingServerInterceptor(vertx, list, virtuals, VIRTUAL_EXECUTOR_SUPPLIER.get(), devMode)); } } return ServerInterceptors.intercept(service.definition, interceptors); @@ -574,18 +600,21 @@ private class GrpcServerVerticle extends AbstractVerticle { private final GrpcBuilderProvider provider; private final LaunchMode launchMode; private final Map> blockingMethodsPerService; + private final Map> virtualMethodsPerService; private volatile PortSystemProperties portSystemProperties; private Server grpcServer; GrpcServerVerticle(GrpcServerConfiguration configuration, GrpcContainer grpcContainer, GrpcBuilderProvider provider, LaunchMode launchMode, - Map> blockingMethodsPerService) { + Map> blockingMethodsPerService, + Map> virtualMethodsPerService) { this.configuration = configuration; this.grpcContainer = grpcContainer; this.provider = provider; this.launchMode = launchMode; this.blockingMethodsPerService = blockingMethodsPerService; + this.virtualMethodsPerService = virtualMethodsPerService; } @Override @@ -596,7 +625,7 @@ public void start(Promise startPromise) { return; } Map.Entry portToServer = buildServer(getVertx(), configuration, provider, - blockingMethodsPerService, grpcContainer, launchMode); + blockingMethodsPerService, virtualMethodsPerService, grpcContainer, launchMode); grpcServer = portToServer.getValue(); if (grpcServer instanceof VertxServer) { @@ -611,10 +640,14 @@ public void start(Promise startPromise) { } startPromise.fail(effectiveCause); } else { - int actualPort = grpcServer.getPort(); - if (actualPort != portToServer.getKey()) { - portSystemProperties = new PortSystemProperties(); - portSystemProperties.set("grpc.server", actualPort, launchMode); + try { + int actualPort = grpcServer.getPort(); + if (actualPort != portToServer.getKey()) { + portSystemProperties = new PortSystemProperties(); + portSystemProperties.set("grpc.server", actualPort, launchMode); + } + } catch (Exception e) { + // Ignore, port reused. } startPromise.complete(); grpcVerticleCount.incrementAndGet(); @@ -694,4 +727,75 @@ public void run(Runnable command) { } } } + + public static final Supplier VIRTUAL_EXECUTOR_SUPPLIER = new Supplier<>() { + Executor current = null; + + /** + * This method uses reflection in order to allow developers to quickly test quarkus-loom without needing to + * change --release, --source, --target flags and to enable previews. + * Since we try to load the "Loom-preview" classes/methods at runtime, the application can even be compiled + * using java 11 and executed with a loom-compliant JDK. + *

    + * IMPORTANT: we still need to use a duplicated context to have all the propagation working. + * Thus, the context is captured and applied/terminated in the virtual thread. + */ + @Override + public Executor get() { + if (current == null) { + try { + var virtual = (Executor) Executors.class.getMethod("newVirtualThreadPerTaskExecutor") + .invoke(this); + current = new Executor() { + @Override + public void execute(Runnable command) { + var context = Vertx.currentContext(); + if (!(context instanceof ContextInternal)) { + virtual.execute(command); + } else { + virtual.execute(new Runnable() { + @Override + public void run() { + final var previousContext = ((ContextInternal) context).beginDispatch(); + try { + command.run(); + } finally { + ((ContextInternal) context).endDispatch(previousContext); + } + } + }); + } + } + }; + } catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException e) { + logger.debug("Unable to invoke java.util.concurrent.Executors#newVirtualThreadPerTaskExecutor", e); + //quite ugly but works + logger.warnf("You weren't able to create an executor that spawns virtual threads, the default" + + " blocking executor will be used, please check that your JDK is compatible with " + + "virtual threads"); + //if for some reason a class/method can't be loaded or invoked we return the traditional executor, + // wrapping executeBlocking. + current = new Executor() { + @Override + public void execute(Runnable command) { + var context = Vertx.currentContext(); + if (!(context instanceof ContextInternal)) { + Infrastructure.getDefaultWorkerPool().execute(command); + } else { + context.executeBlocking(fut -> { + try { + command.run(); + fut.complete(null); + } catch (Exception e) { + fut.fail(e); + } + }); + } + } + }; + } + } + return current; + } + }; } diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/BlockingServerInterceptor.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/BlockingServerInterceptor.java index 69e99da2654c6..74c943c11fa14 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/BlockingServerInterceptor.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/BlockingServerInterceptor.java @@ -7,6 +7,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; import java.util.function.Consumer; import java.util.function.Function; @@ -27,23 +28,34 @@ /** * gRPC Server interceptor offloading the execution of the gRPC method on a worker thread if the method is annotated * with {@link io.smallrye.common.annotation.Blocking}. - * + *

    * For non-annotated methods, the interceptor acts as a pass-through. */ public class BlockingServerInterceptor implements ServerInterceptor, Function { private final Vertx vertx; private final Set blockingMethods; - private final Map cache = new ConcurrentHashMap<>(); + private final Set virtualMethods; + private final Map blockingCache = new ConcurrentHashMap<>(); + private final Map virtualCache = new ConcurrentHashMap<>(); private final boolean devMode; + private final Executor virtualThreadExecutor; - public BlockingServerInterceptor(Vertx vertx, List blockingMethods, boolean devMode) { + public BlockingServerInterceptor(Vertx vertx, List blockingMethods, List virtualMethods, + Executor virtualThreadExecutor, boolean devMode) { this.vertx = vertx; this.blockingMethods = new HashSet<>(); + this.virtualMethods = new HashSet<>(); this.devMode = devMode; for (String method : blockingMethods) { this.blockingMethods.add(method.toLowerCase()); } + if (virtualMethods != null) { + for (String method : virtualMethods) { + this.virtualMethods.add(method.toLowerCase()); + } + } + this.virtualThreadExecutor = virtualThreadExecutor; } @Override @@ -52,6 +64,11 @@ public Boolean apply(String name) { return blockingMethods.contains(methodName.toLowerCase()); } + public Boolean applyVirtual(String name) { + String methodName = name.substring(name.lastIndexOf("/") + 1); + return virtualMethods.contains(methodName.toLowerCase()); + } + @Override public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, @@ -66,15 +83,34 @@ public ServerCall.Listener interceptCall(ServerCall replay = new ReplayListener<>(state, true); + virtualThreadExecutor.execute(() -> { + ServerCall.Listener listener; + try { + requestContext.activate(state); + listener = next.startCall(call, headers); + } finally { + requestContext.deactivate(); + } + replay.setDelegate(listener); + }); + return replay; + } else if (isBlocking) { final ManagedContext requestContext = getRequestContext(); // context should always be active here // it is initialized by io.quarkus.grpc.runtime.supports.context.GrpcRequestContextGrpcInterceptor // that should always be called before this interceptor ContextState state = requestContext.getState(); - ReplayListener replay = new ReplayListener<>(state); + ReplayListener replay = new ReplayListener<>(state, false); vertx.executeBlocking(f -> { ServerCall.Listener listener; try { @@ -96,20 +132,22 @@ public ServerCall.Listener interceptCall(ServerCall * Note that event must be executed in order, explaining why incomingEvents * are executed sequentially */ private class ReplayListener extends ServerCall.Listener { private final InjectableContext.ContextState requestContextState; + private final boolean virtual; // exclusive to event loop context private ServerCall.Listener delegate; private final Queue>> incomingEvents = new LinkedList<>(); private boolean isConsumingFromIncomingEvents = false; - private ReplayListener(InjectableContext.ContextState requestContextState) { + private ReplayListener(InjectableContext.ContextState requestContextState, boolean virtual) { this.requestContextState = requestContextState; + this.virtual = virtual; } /** @@ -123,14 +161,22 @@ void setDelegate(ServerCall.Listener delegate) { if (!this.isConsumingFromIncomingEvents) { Consumer> consumer = incomingEvents.poll(); if (consumer != null) { - executeBlockingWithRequestContext(consumer); + if (virtual) { + executeVirtualWithRequestContext(consumer); + } else { + executeBlockingWithRequestContext(consumer); + } } } } - private void executeOnContextOrEnqueue(Consumer> consumer) { + private void scheduleOrEnqueue(Consumer> consumer) { if (this.delegate != null && !this.isConsumingFromIncomingEvents) { - executeBlockingWithRequestContext(consumer); + if (virtual) { + executeVirtualWithRequestContext(consumer); + } else { + executeBlockingWithRequestContext(consumer); + } } else { incomingEvents.add(consumer); } @@ -162,29 +208,50 @@ private void executeBlockingWithRequestContext(Consumer> consumer) { + final Context grpcContext = Context.current(); + Handler> blockingHandler = new BlockingExecutionHandler<>(consumer, grpcContext, delegate, + requestContextState, getRequestContext(), this); + if (devMode) { + blockingHandler = new DevModeBlockingExecutionHandler(Thread.currentThread().getContextClassLoader(), + blockingHandler); + } + this.isConsumingFromIncomingEvents = true; + Handler> finalBlockingHandler = blockingHandler; + virtualThreadExecutor.execute(() -> { + finalBlockingHandler.handle(Promise.promise()); + Consumer> next = incomingEvents.poll(); + if (next != null) { + executeVirtualWithRequestContext(next); + } else { + this.isConsumingFromIncomingEvents = false; + } + }); + } + @Override public void onMessage(ReqT message) { - executeOnContextOrEnqueue(t -> t.onMessage(message)); + scheduleOrEnqueue(t -> t.onMessage(message)); } @Override public void onHalfClose() { - executeOnContextOrEnqueue(ServerCall.Listener::onHalfClose); + scheduleOrEnqueue(ServerCall.Listener::onHalfClose); } @Override public void onCancel() { - executeOnContextOrEnqueue(ServerCall.Listener::onCancel); + scheduleOrEnqueue(ServerCall.Listener::onCancel); } @Override public void onComplete() { - executeOnContextOrEnqueue(ServerCall.Listener::onComplete); + scheduleOrEnqueue(ServerCall.Listener::onComplete); } @Override public void onReady() { - executeOnContextOrEnqueue(ServerCall.Listener::onReady); + scheduleOrEnqueue(ServerCall.Listener::onReady); } } diff --git a/extensions/grpc/runtime/src/test/java/io/quarkus/grpc/runtime/supports/BlockingServerInterceptorTest.java b/extensions/grpc/runtime/src/test/java/io/quarkus/grpc/runtime/supports/BlockingServerInterceptorTest.java index 96a27e392e6eb..2ad16022d73bb 100644 --- a/extensions/grpc/runtime/src/test/java/io/quarkus/grpc/runtime/supports/BlockingServerInterceptorTest.java +++ b/extensions/grpc/runtime/src/test/java/io/quarkus/grpc/runtime/supports/BlockingServerInterceptorTest.java @@ -34,7 +34,8 @@ void setup() { InjectableContext.ContextState contextState = mock(InjectableContext.ContextState.class); ManagedContext requestContext = mock(ManagedContext.class); when(requestContext.getState()).thenReturn(contextState); - blockingServerInterceptor = new BlockingServerInterceptor(vertx, Collections.singletonList("blocking"), false) { + blockingServerInterceptor = new BlockingServerInterceptor(vertx, Collections.singletonList("blocking"), + Collections.emptyList(), null, false) { @Override protected ManagedContext getRequestContext() { return requestContext; diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 7f2e5546f937f..f68c3e71e8a54 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -404,6 +404,20 @@ + + + java-20 + + + !no-test-modules + + [20,) + + + virtual-threads + + + + test + + + + + io.quarkus + quarkus-grpc-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-resteasy-reactive-jackson-deployment + ${project.version} + pom + test + + + * + * + + + + + + + + + io.quarkus + quarkus-maven-plugin + + + + generate-code + build + + + + + + + + diff --git a/integration-tests/virtual-threads/grpc-virtual-threads/src/main/java/io/quarkus/grpc/example/streaming/AssertHelper.java b/integration-tests/virtual-threads/grpc-virtual-threads/src/main/java/io/quarkus/grpc/example/streaming/AssertHelper.java new file mode 100644 index 0000000000000..4b45668d60c24 --- /dev/null +++ b/integration-tests/virtual-threads/grpc-virtual-threads/src/main/java/io/quarkus/grpc/example/streaming/AssertHelper.java @@ -0,0 +1,53 @@ +package io.quarkus.grpc.example.streaming; + +import java.lang.reflect.Method; + +import io.quarkus.arc.Arc; +import io.smallrye.common.vertx.VertxContext; +import io.vertx.core.Vertx; + +public class AssertHelper { + + /** + * Asserts that the current method: + * - runs on a duplicated context + * - runs on a virtual thread + * - has the request scope activated + */ + public static void assertEverything() { + assertThatTheRequestScopeIsActive(); + assertThatItRunsOnVirtualThread(); + assertThatItRunsOnADuplicatedContext(); + } + + public static void assertThatTheRequestScopeIsActive() { + if (!Arc.container().requestContext().isActive()) { + throw new AssertionError(("Expected the request scope to be active")); + } + } + + public static void assertThatItRunsOnADuplicatedContext() { + var context = Vertx.currentContext(); + if (context == null) { + throw new AssertionError("The method does not run on a Vert.x context"); + } + if (!VertxContext.isOnDuplicatedContext()) { + throw new AssertionError("The method does not run on a Vert.x **duplicated** context"); + } + } + + public static void assertThatItRunsOnVirtualThread() { + // We cannot depend on a Java 20. + try { + Method isVirtual = Thread.class.getMethod("isVirtual"); + isVirtual.setAccessible(true); + boolean virtual = (Boolean) isVirtual.invoke(Thread.currentThread()); + if (!virtual) { + throw new AssertionError("Thread " + Thread.currentThread() + " is not a virtual thread"); + } + } catch (Exception e) { + throw new AssertionError( + "Thread " + Thread.currentThread() + " is not a virtual thread - cannot invoke Thread.isVirtual()", e); + } + } +} diff --git a/integration-tests/virtual-threads/grpc-virtual-threads/src/main/java/io/quarkus/grpc/example/streaming/TestServiceImpl.java b/integration-tests/virtual-threads/grpc-virtual-threads/src/main/java/io/quarkus/grpc/example/streaming/TestServiceImpl.java new file mode 100644 index 0000000000000..40f3f2fcdb14d --- /dev/null +++ b/integration-tests/virtual-threads/grpc-virtual-threads/src/main/java/io/quarkus/grpc/example/streaming/TestServiceImpl.java @@ -0,0 +1,56 @@ +package io.quarkus.grpc.example.streaming; + +import static io.quarkus.grpc.example.streaming.AssertHelper.assertEverything; + +import com.google.protobuf.ByteString; +import com.google.protobuf.EmptyProtos; + +import io.grpc.testing.integration.Messages; +import io.grpc.testing.integration.TestService; +import io.quarkus.grpc.GrpcService; +import io.smallrye.common.annotation.RunOnVirtualThread; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; + +@GrpcService +public class TestServiceImpl implements TestService { + + @RunOnVirtualThread + @Override + public Uni emptyCall(EmptyProtos.Empty request) { + assertEverything(); + return Uni.createFrom().item(EmptyProtos.Empty.newBuilder().build()) + .invoke(AssertHelper::assertEverything); + } + + @RunOnVirtualThread + @Override + public Uni unaryCall(Messages.SimpleRequest request) { + assertEverything(); + var value = request.getPayload().getBody().toStringUtf8(); + var resp = Messages.SimpleResponse.newBuilder() + .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFromUtf8(value.toUpperCase())).build()) + .build(); + return Uni.createFrom().item(resp) + .invoke(AssertHelper::assertEverything); + } + + @Override + @RunOnVirtualThread + public Multi streamingOutputCall(Messages.StreamingOutputCallRequest request) { + var value = request.getPayload().getBody().toStringUtf8(); + assertEverything(); + return Multi.createFrom(). emitter(emitter -> { + assertEverything(); + emitter.emit(value.toUpperCase()); + emitter.emit(value.toUpperCase()); + emitter.emit(value.toUpperCase()); + emitter.complete(); + }).map(v -> Messages.StreamingOutputCallResponse.newBuilder() + .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFromUtf8(v)).build()) + .build()) + .invoke(AssertHelper::assertEverything) + .onTermination().invoke(AssertHelper::assertEverything); + } + +} diff --git a/integration-tests/virtual-threads/grpc-virtual-threads/src/main/proto/empty.proto b/integration-tests/virtual-threads/grpc-virtual-threads/src/main/proto/empty.proto new file mode 100644 index 0000000000000..af5591b6654c0 --- /dev/null +++ b/integration-tests/virtual-threads/grpc-virtual-threads/src/main/proto/empty.proto @@ -0,0 +1,45 @@ +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +syntax = "proto2"; + +package grpc.testing; + +option java_package = "com.google.protobuf"; +option java_outer_classname = "EmptyProtos"; + +// An empty message that you can re-use to avoid defining duplicated empty +// messages in your project. A typical example is to use it as argument or the +// return value of a service API. For instance: +// +// service Foo { +// rpc Bar (grpc.testing.Empty) returns (grpc.testing.Empty) { }; +// }; +// +message Empty {} diff --git a/integration-tests/virtual-threads/grpc-virtual-threads/src/main/proto/messages.proto b/integration-tests/virtual-threads/grpc-virtual-threads/src/main/proto/messages.proto new file mode 100644 index 0000000000000..5110719e822b8 --- /dev/null +++ b/integration-tests/virtual-threads/grpc-virtual-threads/src/main/proto/messages.proto @@ -0,0 +1,178 @@ +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +// Message definitions to be used by integration test service definitions. + +syntax = "proto3"; + +package grpc.testing; + +option java_package = "io.grpc.testing.integration"; + +// The type of payload that should be returned. +enum PayloadType { + // Compressable text format. + COMPRESSABLE = 0; + + // Uncompressable binary format. + UNCOMPRESSABLE = 1; + + // Randomly chosen from all other formats defined in this enum. + RANDOM = 2; +} + +// Compression algorithms +enum CompressionType { + // No compression + NONE = 0; + GZIP = 1; + DEFLATE = 2; +} + +// A block of data, to simply increase gRPC message size. +message Payload { + // The type of data in body. + PayloadType type = 1; + // Primary contents of payload. + bytes body = 2; +} + +// A protobuf representation for grpc status. This is used by test +// clients to specify a status that the server should attempt to return. +message EchoStatus { + int32 code = 1; + string message = 2; +} + +// Unary request. +message SimpleRequest { + // Desired payload type in the response from the server. + // If response_type is RANDOM, server randomly chooses one from other formats. + PayloadType response_type = 1; + + // Desired payload size in the response from the server. + // If response_type is COMPRESSABLE, this denotes the size before compression. + int32 response_size = 2; + + // Optional input payload sent along with the request. + Payload payload = 3; + + // Whether SimpleResponse should include username. + bool fill_username = 4; + + // Whether SimpleResponse should include OAuth scope. + bool fill_oauth_scope = 5; + + // Compression algorithm to be used by the server for the response (stream) + CompressionType response_compression = 6; + + // Whether server should return a given status + EchoStatus response_status = 7; +} + +// Unary response, as configured by the request. +message SimpleResponse { + // Payload to increase message size. + Payload payload = 1; + // The user the request came from, for verifying authentication was + // successful when the client expected it. + string username = 2; + // OAuth scope. + string oauth_scope = 3; +} + +message SimpleContext { + string value = 1; +} + +// Client-streaming request. +message StreamingInputCallRequest { + // Optional input payload sent along with the request. + Payload payload = 1; + + // Not expecting any payload from the response. +} + +// Client-streaming response. +message StreamingInputCallResponse { + // Aggregated size of payloads received from the client. + int32 aggregated_payload_size = 1; +} + +// Configuration for a particular response. +message ResponseParameters { + // Desired payload sizes in responses from the server. + // If response_type is COMPRESSABLE, this denotes the size before compression. + int32 size = 1; + + // Desired interval between consecutive responses in the response stream in + // microseconds. + int32 interval_us = 2; +} + +// Server-streaming request. +message StreamingOutputCallRequest { + // Desired payload type in the response from the server. + // If response_type is RANDOM, the payload from each response in the stream + // might be of different types. This is to simulate a mixed type of payload + // stream. + PayloadType response_type = 1; + + // Configuration for each expected response message. + repeated ResponseParameters response_parameters = 2; + + // Optional input payload sent along with the request. + Payload payload = 3; + + // Compression algorithm to be used by the server for the response (stream) + CompressionType response_compression = 6; + + // Whether server should return a given status + EchoStatus response_status = 7; +} + +// Server-streaming response, as configured by the request and parameters. +message StreamingOutputCallResponse { + // Payload to increase response size. + Payload payload = 1; +} + +// For reconnect interop test only. +// Client tells server what reconnection parameters it used. +message ReconnectParams { + int32 max_reconnect_backoff_ms = 1; +} + +// For reconnect interop test only. +// Server tells client whether its reconnects are following the spec and the +// reconnect backoffs it saw. +message ReconnectInfo { + bool passed = 1; + repeated int32 backoff_ms = 2; +} diff --git a/integration-tests/virtual-threads/grpc-virtual-threads/src/main/proto/test.proto b/integration-tests/virtual-threads/grpc-virtual-threads/src/main/proto/test.proto new file mode 100644 index 0000000000000..ea83e36b7d44b --- /dev/null +++ b/integration-tests/virtual-threads/grpc-virtual-threads/src/main/proto/test.proto @@ -0,0 +1,22 @@ +syntax = "proto3"; + +import "empty.proto"; +import "messages.proto"; + +package grpc.testing; + +option java_package = "io.grpc.testing.integration"; + +service TestService { + // One empty request followed by one empty response. + rpc EmptyCall(grpc.testing.Empty) returns (grpc.testing.Empty); + + // One request followed by one response. + rpc UnaryCall(SimpleRequest) returns (SimpleResponse); + + // One request followed by a sequence of responses (streamed download). + // The server returns the payload with client desired type and sizes. + rpc StreamingOutputCall(StreamingOutputCallRequest) + returns (stream StreamingOutputCallResponse); +} + diff --git a/integration-tests/virtual-threads/grpc-virtual-threads/src/main/resources/application.properties b/integration-tests/virtual-threads/grpc-virtual-threads/src/main/resources/application.properties new file mode 100644 index 0000000000000..011cd24e1144d --- /dev/null +++ b/integration-tests/virtual-threads/grpc-virtual-threads/src/main/resources/application.properties @@ -0,0 +1,15 @@ +quarkus.grpc.clients.streaming.host=localhost +quarkus.grpc.clients.streaming.port=9001 + +%vertx.quarkus.grpc.clients.streaming.port=8081 +%vertx.quarkus.grpc.clients.streaming.use-quarkus-grpc-client=true +%vertx.quarkus.grpc.server.use-separate-server=false + +%n2o.quarkus.grpc.server.use-separate-server=true +%o2n.quarkus.grpc.server.use-separate-server=false + +%n2o.quarkus.grpc.clients.streaming.port=9001 +%n2o.quarkus.grpc.clients.streaming.use-quarkus-grpc-client=true + +%o2n.quarkus.grpc.clients.streaming.port=8081 +%o2n.quarkus.grpc.clients.streaming.use-quarkus-grpc-client=false diff --git a/integration-tests/virtual-threads/grpc-virtual-threads/src/test/java/io/quarkus/grpc/example/streaming/VertxVirtualThreadTest.java b/integration-tests/virtual-threads/grpc-virtual-threads/src/test/java/io/quarkus/grpc/example/streaming/VertxVirtualThreadTest.java new file mode 100644 index 0000000000000..c2799cc70f2f8 --- /dev/null +++ b/integration-tests/virtual-threads/grpc-virtual-threads/src/test/java/io/quarkus/grpc/example/streaming/VertxVirtualThreadTest.java @@ -0,0 +1,10 @@ +package io.quarkus.grpc.example.streaming; + +import io.quarkus.grpc.test.utils.VertxGRPCTestProfile; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.TestProfile; + +@QuarkusTest +@TestProfile(VertxGRPCTestProfile.class) +public class VertxVirtualThreadTest extends VirtualThreadTestBase { +} diff --git a/integration-tests/virtual-threads/grpc-virtual-threads/src/test/java/io/quarkus/grpc/example/streaming/VirtualThreadTest.java b/integration-tests/virtual-threads/grpc-virtual-threads/src/test/java/io/quarkus/grpc/example/streaming/VirtualThreadTest.java new file mode 100644 index 0000000000000..a8f5412fe7e71 --- /dev/null +++ b/integration-tests/virtual-threads/grpc-virtual-threads/src/test/java/io/quarkus/grpc/example/streaming/VirtualThreadTest.java @@ -0,0 +1,7 @@ +package io.quarkus.grpc.example.streaming; + +import io.quarkus.test.junit.QuarkusTest; + +@QuarkusTest +public class VirtualThreadTest extends VirtualThreadTestBase { +} diff --git a/integration-tests/virtual-threads/grpc-virtual-threads/src/test/java/io/quarkus/grpc/example/streaming/VirtualThreadTestBase.java b/integration-tests/virtual-threads/grpc-virtual-threads/src/test/java/io/quarkus/grpc/example/streaming/VirtualThreadTestBase.java new file mode 100644 index 0000000000000..461cfa4441ba9 --- /dev/null +++ b/integration-tests/virtual-threads/grpc-virtual-threads/src/test/java/io/quarkus/grpc/example/streaming/VirtualThreadTestBase.java @@ -0,0 +1,48 @@ +package io.quarkus.grpc.example.streaming; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.Test; + +import com.google.protobuf.ByteString; +import com.google.protobuf.EmptyProtos; + +import io.grpc.testing.integration.Messages; +import io.grpc.testing.integration.TestServiceGrpc; +import io.quarkus.grpc.GrpcClient; + +@SuppressWarnings("NewClassNamingConvention") +public class VirtualThreadTestBase { + @GrpcClient + TestServiceGrpc.TestServiceBlockingStub service; + + @Test + void testEmpty() { + assertThat(service.emptyCall(EmptyProtos.Empty.newBuilder().build())).isEqualTo(EmptyProtos.Empty.newBuilder().build()); + } + + @Test + void testUnary() { + var req = Messages.SimpleRequest.newBuilder() + .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFromUtf8("hello")).build()) + .build(); + assertThat(service.unaryCall(req).getPayload().getBody().toStringUtf8()).isEqualTo("HELLO"); + } + + @Test + void testStreamingOutputCall() { + var req = Messages.StreamingOutputCallRequest.newBuilder() + .setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFromUtf8("hello")).build()) + .build(); + + AtomicInteger count = new AtomicInteger(); + service.streamingOutputCall(req).forEachRemaining(r -> { + count.incrementAndGet(); + assertThat(r.getPayload().getBody().toStringUtf8()).isEqualTo("HELLO"); + }); + assertThat(count).hasValue(3); + } + +} diff --git a/integration-tests/virtual-threads/pom.xml b/integration-tests/virtual-threads/pom.xml new file mode 100644 index 0000000000000..f478d14d2bf77 --- /dev/null +++ b/integration-tests/virtual-threads/pom.xml @@ -0,0 +1,266 @@ + + + + quarkus-build-parent + io.quarkus + 999-SNAPSHOT + ../../build-parent/pom.xml + + 4.0.0 + + quarkus-virtual-threads-integration-tests-parent + Quarkus - Virtual Threads Integration Tests + + pom + + + true + true + ${skipTests} + ${skipTests} + true + + + + + + + + org.codehaus.mojo + properties-maven-plugin + + + org.apache.maven.plugins + maven-enforcer-plugin + + + enforce-deployment-deps + + enforce + + + + + + + + + + + io.quarkus + quarkus-enforcer-rules + ${project.version} + + + + + org.sonatype.plugins + nexus-staging-maven-plugin + ${nexus-staging-maven-plugin.version} + + true + + + + + + + io.quarkus + quarkus-maven-plugin + ${project.version} + + true + ${quarkus.build.skip} + + + + org.apache.maven.plugins + maven-surefire-plugin + + + + org.junit.jupiter.api.ClassOrderer$OrderAnnotation + + + + + + ${quarkus.platform.group-id} + quarkus-maven-plugin + ${quarkus.platform.version} + true + + + + build + generate-code + generate-code-tests + + + + + + org.apache.maven.plugins + maven-compiler-plugin + ${version.compiler.plugin} + + + --enable-preview + -parameters + + + + + org.apache.maven.plugins + maven-surefire-plugin + ${version.surefire.plugin} + + + org.jboss.logmanager.LogManager + ${maven.home} + + --enable-preview -Djdk.tracePinnedThreads + ${skipTests} + + + + maven-failsafe-plugin + ${version.surefire.plugin} + + + + integration-test + verify + + + + ${project.build.directory}/${project.build.finalName}-runner + + org.jboss.logmanager.LogManager + + ${maven.home} + + + + + + + + + + + + + + io.quarkus + quarkus-bom-test + ${project.version} + pom + import + + + + io.quarkus + quarkus-integration-test-class-transformer-deployment + ${project.version} + + + io.quarkus + quarkus-integration-test-class-transformer + ${project.version} + + + + + + + io.quarkus + quarkus-bom-test + ${project.version} + pom + test + + + * + * + + + + + + + + skip-test-on-jdk-17 + + [, 20) + + + true + + + + run-virtual-thread-tests + + [20,) + + + + test-modules + + + !no-test-modules + + + + grpc-virtual-threads + + + + + + unbind-executions + + + ${basedir}/disable-unbind-executions + + + + + + org.apache.maven.plugins + maven-source-plugin + + + attach-sources + none + + + + + org.apache.maven.plugins + maven-jar-plugin + + + default-jar + none + + + + + org.apache.maven.plugins + maven-install-plugin + + + default-install + none + + + + + + + +