diff --git a/.github/filter-virtual-threads-tests-json.sh b/.github/filter-virtual-threads-tests-json.sh new file mode 100755 index 0000000000000..28c2efee14f6c --- /dev/null +++ b/.github/filter-virtual-threads-tests-json.sh @@ -0,0 +1,51 @@ +#!/bin/bash + +# Purpose: Prints a filtered version of virtual-threads-tests.json, with "test-modules" reduced to the ones passed in as the first argument. +# This first argument is expected to the define one module per line. +# "include" elements that (after filtering) have no "test-modules" anymore are deleted entirely! +# Note: This script is only for CI and does therefore not aim to be compatible with BSD/macOS. + +set -e -u -o pipefail +shopt -s failglob + +# path of this shell script +PRG_PATH=$( cd "$(dirname "$0")" ; pwd -P ) + +JSON=$(cat ${PRG_PATH}/virtual-threads-tests.json) + +# Step 0: print unfiltered json and exit in case the parameter is empty (assumption: full build) +if [ -z "$1" ] +then + echo "${JSON}" + exit 0 +fi + +# Step 1: build an expression for grep that will only extract the given modules from each "test-modules" list, +# including a trailing comma (if exists). Note: mvn doesn't mind something like -pl 'foo,'. +EXPR='((?:(?<=^)|(?<=,)|(?<=, ))(' +while read -r impacted +do + EXPR+="${impacted}|" +done < <(echo -n "$1" | ggrep -Po '(?<=integration-tests/virtual-threads/).+') +EXPR+=')(,|$))+' + +# Step 2: apply the filter expression via grep to each "test-modules" list and replace each original list with the filtered one +while read -r modules +do + # Notes: + # - trailing "|" (after EXPR) avoids grep return code > 0 if nothing matches (which is a valid case) + # - "paste" joins all matches to get a single line + FILTERED=$(echo -n "${modules}" | ggrep -Po "${EXPR}|" | paste -sd " " -) + JSON=$(echo -n "${JSON}" | sed "s|${modules}|${FILTERED}|") +done < <(echo -n "${JSON}" | jq -r '.include[] | ."test-modules"') + +# Step 3: delete entire elements from "include" array that now have an empty "test-modules" list +JSON=$(echo "${JSON}" | jq 'del(.include[] | select(."test-modules" == ""))') + +# Step 4: echo final result, printing only {} in case _all_ elements were removed from "include" array +if [ -z "$(echo "${JSON}" | jq '.include[]')" ] +then + echo -n '{}' +else + echo -n "${JSON}" +fi diff --git a/.github/matrix-jvm-tests.json b/.github/matrix-jvm-tests.json index b20241376fe3d..e4ace13213865 100644 --- a/.github/matrix-jvm-tests.json +++ b/.github/matrix-jvm-tests.json @@ -13,8 +13,8 @@ "os-name": "ubuntu-latest" } , { - "name": "19", - "java-version": 19, + "name": "20", + "java-version": 20, "maven_args": "$JVM_TEST_MAVEN_ARGS", "maven_opts": "-Xmx2g -XX:MaxMetaspaceSize=1g", "os-name": "ubuntu-latest" diff --git a/.github/virtual-threads-tests.json b/.github/virtual-threads-tests.json new file mode 100644 index 0000000000000..22fd2166fa317 --- /dev/null +++ b/.github/virtual-threads-tests.json @@ -0,0 +1,16 @@ +{ + "include": [ + { + "category": "Main", + "timeout": 45, + "test-modules": "grpc-virtual-threads, mailer-virtual-threads, redis-virtual-threads, rest-client-reactive-virtual-threads, resteasy-reactive-virtual-threads", + "os-name": "ubuntu-latest" + }, + { + "category": "Messaging", + "timeout": 45, + "test-modules": "amqp-virtual-threads, jms-virtual-threads, kafka-virtual-threads", + "os-name": "ubuntu-latest" + } + ] +} diff --git a/.github/workflows/ci-actions-incremental.yml b/.github/workflows/ci-actions-incremental.yml index 05122939c3143..f6df078b36e60 100644 --- a/.github/workflows/ci-actions-incremental.yml +++ b/.github/workflows/ci-actions-incremental.yml @@ -205,6 +205,7 @@ jobs: outputs: native_matrix: ${{ steps.calc-native-matrix.outputs.matrix }} jvm_matrix: ${{ steps.calc-jvm-matrix.outputs.matrix }} + virtual_threads_matrix: ${{ steps.calc-virtual_threads-matrix.outputs.matrix }} run_jvm: ${{ steps.calc-run-flags.outputs.run_jvm }} run_devtools: ${{ steps.calc-run-flags.outputs.run_devtools }} run_gradle: ${{ steps.calc-run-flags.outputs.run_gradle }} @@ -226,6 +227,13 @@ jobs: json=$(.github/filter-jvm-tests-json.sh) echo "${json}" echo "matrix=${json}" >> $GITHUB_OUTPUT + - name: Calculate matrix from virtual-threads-tests.json + id: calc-virtual_threads-matrix + run: | + echo "GIB_IMPACTED_MODULES: ${GIB_IMPACTED_MODULES}" + json=$(.github/filter-virtual-threads-tests-json.sh "${GIB_IMPACTED_MODULES}" | tr -d '\n') + echo "${json}" + echo "matrix=${json}" >> $GITHUB_OUTPUT - name: Calculate run flags id: calc-run-flags run: | @@ -602,23 +610,17 @@ jobs: quarkus-quickstarts/target/build-report.json quarkus-quickstarts/LICENSE retention-days: 2 - virtual-thread-tests: - name: Virtual Thread Support Tests - JDK ${{matrix.java.name}} - runs-on: ${{matrix.java.os-name}} + virtual-thread-native-tests: + name: Virtual Thread Support Tests Native - ${{matrix.category}} + runs-on: ${{matrix.os-name}} needs: [build-jdk11, calculate-test-jobs] # Skip main in forks - if: "needs.calculate-test-jobs.outputs.run_quickstarts == 'true' && (github.repository == 'quarkusio/quarkus' || !endsWith(github.ref, '/main'))" - timeout-minutes: 90 + if: "needs.calculate-test-jobs.outputs.virtual_threads_matrix != '{}' && (github.repository == 'quarkusio/quarkus' || !endsWith(github.ref, '/main'))" + timeout-minutes: ${{matrix.timeout}} strategy: + max-parallel: 12 fail-fast: false - matrix: - java: - - { - name: "20", - java-version: 20, - os-name: "ubuntu-latest", - extra-args: "--enable-preview" - } + matrix: ${{ fromJson(needs.calculate-test-jobs.outputs.virtual_threads_matrix) }} steps: - uses: actions/checkout@v3 - name: Download Maven Repo @@ -629,19 +631,28 @@ jobs: - name: Extract Maven Repo shell: bash run: tar -xzf maven-repo.tgz -C ~ - - name: Set up JDK ${{ matrix.java.java-version }} + - name: Set up JDK 20 uses: actions/setup-java@v3 with: distribution: temurin - java-version: ${{ matrix.java.java-version }} - - name: Run tests + java-version: 20 + # We do this so we can get better analytics for the downloaded version of the build images + - name: Update Docker Client User Agent + shell: bash + run: | + cat <<< $(jq '.HttpHeaders += {"User-Agent": "Quarkus-CI-Docker-Client"}' ~/.docker/config.json) > ~/.docker/config.json + - name: Build + shell: bash + env: + TEST_MODULES: ${{matrix.test-modules}} + CONTAINER_BUILD: ${{startsWith(matrix.os-name, 'windows') && 'false' || 'true'}} run: | - export LANG=en_US && ./mvnw -e -B -fae --settings .github/mvn-settings.xml -f integration-tests/virtual-threads clean verify -Dnative -Dextra-args=${{matrix.java.extra-args}} -Dquarkus.native.container-build=true -Dquarkus.native.builder-image=quay.io/quarkus/ubi-quarkus-mandrel-builder-image:jdk-20 + export LANG=en_US && ./mvnw $COMMON_MAVEN_ARGS -f integration-tests/virtual-threads -pl "$TEST_MODULES" $NATIVE_TEST_MAVEN_ARGS -Dextra-args=--enable-preview -Dquarkus.native.container-build=true -Dquarkus.native.builder-image=quay.io/quarkus/ubi-quarkus-mandrel-builder-image:jdk-20 - name: Upload build reports (if build failed) uses: actions/upload-artifact@v3 if: ${{ failure() || cancelled() }} with: - name: "build-reports-Virtual Thread Support - JDK ${{matrix.java.name}}" + name: "build-reports-Virtual Thread Support Tests Native - ${{matrix.category}}" path: | integration-tests/virtual-threads/**/target/*-reports/TEST-*.xml integration-tests/virtual-threads/target/build-report.json diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 4d581c0c235cc..7ad38528f0246 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -388,6 +388,8 @@ istio management-interface management-interface-auth + + virtual-threads @@ -405,20 +407,6 @@ - - - java-20 - - - !no-test-modules - - [20,) - - - virtual-threads - - - + + io.quarkus + quarkus-smallrye-reactive-messaging-amqp-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-rest-client-reactive-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-resteasy-reactive-jackson-deployment + ${project.version} + pom + test + + + * + * + + + + + + + + + io.quarkus + quarkus-maven-plugin + + + org.apache.maven.plugins + maven-surefire-plugin + + + + + diff --git a/integration-tests/virtual-threads/amqp-virtual-threads/src/main/java/io/quarkus/it/vthreads/amqp/AssertHelper.java b/integration-tests/virtual-threads/amqp-virtual-threads/src/main/java/io/quarkus/it/vthreads/amqp/AssertHelper.java new file mode 100644 index 0000000000000..cf4317de2b273 --- /dev/null +++ b/integration-tests/virtual-threads/amqp-virtual-threads/src/main/java/io/quarkus/it/vthreads/amqp/AssertHelper.java @@ -0,0 +1,68 @@ +package io.quarkus.it.vthreads.amqp; + +import java.lang.reflect.Method; + +import io.quarkus.arc.Arc; +import io.smallrye.common.vertx.VertxContext; +import io.vertx.core.Vertx; + +public class AssertHelper { + + /** + * Asserts that the current method: + * - runs on a duplicated context + * - runs on a virtual thread + * - has the request scope activated + */ + public static void assertEverything() { + assertThatTheRequestScopeIsActive(); + assertThatItRunsOnVirtualThread(); + assertThatItRunsOnADuplicatedContext(); + } + + public static void assertThatTheRequestScopeIsActive() { + if (!Arc.container().requestContext().isActive()) { + throw new AssertionError(("Expected the request scope to be active")); + } + } + + public static void assertThatItRunsOnADuplicatedContext() { + var context = Vertx.currentContext(); + if (context == null) { + throw new AssertionError("The method does not run on a Vert.x context"); + } + if (!VertxContext.isOnDuplicatedContext()) { + throw new AssertionError("The method does not run on a Vert.x **duplicated** context"); + } + } + + public static void assertThatItRunsOnVirtualThread() { + // We cannot depend on a Java 20. + try { + Method isVirtual = Thread.class.getMethod("isVirtual"); + isVirtual.setAccessible(true); + boolean virtual = (Boolean) isVirtual.invoke(Thread.currentThread()); + if (!virtual) { + throw new AssertionError("Thread " + Thread.currentThread() + " is not a virtual thread"); + } + } catch (Exception e) { + throw new AssertionError( + "Thread " + Thread.currentThread() + " is not a virtual thread - cannot invoke Thread.isVirtual()", e); + } + } + + public static void assertThatItDoesNotRunOnVirtualThread() { + // We cannot depend on a Java 20. + try { + Method isVirtual = Thread.class.getMethod("isVirtual"); + isVirtual.setAccessible(true); + boolean virtual = (Boolean) isVirtual.invoke(Thread.currentThread()); + if (virtual) { + throw new AssertionError("Thread " + Thread.currentThread() + " is a virtual thread"); + } + } catch (Exception e) { + throw new AssertionError( + "Thread " + Thread.currentThread() + " is a virtual thread - but cannot invoke Thread.isVirtual()", e); + } + } +} diff --git a/integration-tests/virtual-threads/amqp-virtual-threads/src/main/java/io/quarkus/it/vthreads/amqp/PriceAlertService.java b/integration-tests/virtual-threads/amqp-virtual-threads/src/main/java/io/quarkus/it/vthreads/amqp/PriceAlertService.java new file mode 100644 index 0000000000000..11e9ac1df01bd --- /dev/null +++ b/integration-tests/virtual-threads/amqp-virtual-threads/src/main/java/io/quarkus/it/vthreads/amqp/PriceAlertService.java @@ -0,0 +1,23 @@ +package io.quarkus.it.vthreads.amqp; + +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.core.MediaType; + +import org.eclipse.microprofile.rest.client.inject.RegisterRestClient; + +@Path("price") +@RegisterRestClient(configKey = "price-alert") +public interface PriceAlertService { + + @Path("alert") + @POST + @Consumes(MediaType.TEXT_PLAIN) + String alert(double value); + + @Path("alert-message") + @POST + @Consumes(MediaType.TEXT_PLAIN) + String alertMessage(double value); +} diff --git a/integration-tests/virtual-threads/amqp-virtual-threads/src/main/java/io/quarkus/it/vthreads/amqp/PriceConsumer.java b/integration-tests/virtual-threads/amqp-virtual-threads/src/main/java/io/quarkus/it/vthreads/amqp/PriceConsumer.java new file mode 100644 index 0000000000000..d77507c91ace9 --- /dev/null +++ b/integration-tests/virtual-threads/amqp-virtual-threads/src/main/java/io/quarkus/it/vthreads/amqp/PriceConsumer.java @@ -0,0 +1,61 @@ +package io.quarkus.it.vthreads.amqp; + +import static io.quarkus.it.vthreads.amqp.AssertHelper.assertThatItDoesNotRunOnVirtualThread; +import static io.quarkus.it.vthreads.amqp.AssertHelper.assertThatItRunsOnADuplicatedContext; +import static io.quarkus.it.vthreads.amqp.AssertHelper.assertThatItRunsOnVirtualThread; + +import java.util.Random; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicInteger; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.eclipse.microprofile.rest.client.inject.RestClient; + +import io.smallrye.common.annotation.RunOnVirtualThread; + +@ApplicationScoped +public class PriceConsumer { + + @RestClient + PriceAlertService alertService; + + @Incoming("prices") + @RunOnVirtualThread + public CompletionStage consume(Message msg) { + assertThatItRunsOnVirtualThread(); + assertThatItRunsOnADuplicatedContext(); + double price = msg.getPayload(); + if (price > 90.0) { + alertService.alertMessage(price); + } + return msg.ack().thenAccept(x -> { + assertThatItRunsOnADuplicatedContext(); + assertThatItDoesNotRunOnVirtualThread(); + }); + } + + @Incoming("prices") + @RunOnVirtualThread + public void consume(double price) { + assertThatItRunsOnVirtualThread(); + assertThatItRunsOnADuplicatedContext(); + if (price > 90.0) { + alertService.alert(price); + } + } + + Random r = new Random(); + AtomicInteger i = new AtomicInteger(); + + @Outgoing("prices-out") + @RunOnVirtualThread + public Message randomPriceGenerator() { + assertThatItRunsOnVirtualThread(); + return Message.of(r.nextDouble() * 10 * i.incrementAndGet()); + } + +} diff --git a/integration-tests/virtual-threads/amqp-virtual-threads/src/main/resources/application.properties b/integration-tests/virtual-threads/amqp-virtual-threads/src/main/resources/application.properties new file mode 100644 index 0000000000000..f212b575de362 --- /dev/null +++ b/integration-tests/virtual-threads/amqp-virtual-threads/src/main/resources/application.properties @@ -0,0 +1,7 @@ +price-alert/mp-rest/url=${test.url} +mp.messaging.incoming.prices.broadcast=true +mp.messaging.outgoing.prices-out.address=prices + +smallrye.messaging.worker..max-concurrency=10 + +quarkus.native.additional-build-args=--enable-preview diff --git a/integration-tests/virtual-threads/amqp-virtual-threads/src/test/java/io/quarkus/it/vthreads/amqp/NoPinningVerify.java b/integration-tests/virtual-threads/amqp-virtual-threads/src/test/java/io/quarkus/it/vthreads/amqp/NoPinningVerify.java new file mode 100644 index 0000000000000..0db11972e439b --- /dev/null +++ b/integration-tests/virtual-threads/amqp-virtual-threads/src/test/java/io/quarkus/it/vthreads/amqp/NoPinningVerify.java @@ -0,0 +1,76 @@ +package io.quarkus.it.vthreads.amqp; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.w3c.dom.Document; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; +import org.xml.sax.SAXException; + +/** + * An integration test reading the output of the unit test to verify that no tests where pinning the carrier thread. + * It reads the reports generated by surefire. + */ +public class NoPinningVerify { + + @Test + void verify() throws IOException, ParserConfigurationException, SAXException { + var reports = new File("target", "surefire-reports"); + Assertions.assertTrue(reports.isDirectory(), + "Unable to find " + reports.getAbsolutePath() + ", did you run the tests with Maven before?"); + var list = reports.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.startsWith("TEST") && name.endsWith("Test.xml"); + } + }); + Assertions.assertNotNull(list, + "Unable to find " + reports.getAbsolutePath() + ", did you run the tests with Maven before?"); + + for (File report : list) { + Document document = DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(report); + var suite = document.getFirstChild(); + var cases = getChildren(suite.getChildNodes(), "testcase"); + for (Node c : cases) { + verify(report, c); + } + } + + } + + private void verify(File file, Node ca) { + var fullname = ca.getAttributes().getNamedItem("classname").getTextContent() + "." + + ca.getAttributes().getNamedItem("name").getTextContent(); + var output = getChildren(ca.getChildNodes(), "system-out"); + if (output.isEmpty()) { + return; + } + var sout = output.get(0).getTextContent(); + if (sout.contains("VThreadContinuation.onPinned")) { + throw new AssertionError("The test case " + fullname + " pinned the carrier thread, check " + file.getAbsolutePath() + + " for details (or the log of the test)"); + } + + } + + private List getChildren(NodeList nodes, String name) { + List list = new ArrayList<>(); + for (int i = 0; i < nodes.getLength(); i++) { + var node = nodes.item(i); + if (node.getNodeName().equalsIgnoreCase(name)) { + list.add(node); + } + } + return list; + } + +} diff --git a/integration-tests/virtual-threads/amqp-virtual-threads/src/test/java/io/quarkus/it/vthreads/amqp/VirtualThreadITCase.java b/integration-tests/virtual-threads/amqp-virtual-threads/src/test/java/io/quarkus/it/vthreads/amqp/VirtualThreadITCase.java new file mode 100644 index 0000000000000..5a2c4ea96e200 --- /dev/null +++ b/integration-tests/virtual-threads/amqp-virtual-threads/src/test/java/io/quarkus/it/vthreads/amqp/VirtualThreadITCase.java @@ -0,0 +1,7 @@ +package io.quarkus.it.vthreads.amqp; + +import io.quarkus.test.junit.QuarkusIntegrationTest; + +@QuarkusIntegrationTest +public class VirtualThreadITCase extends VirtualThreadTest { +} diff --git a/integration-tests/virtual-threads/amqp-virtual-threads/src/test/java/io/quarkus/it/vthreads/amqp/VirtualThreadTest.java b/integration-tests/virtual-threads/amqp-virtual-threads/src/test/java/io/quarkus/it/vthreads/amqp/VirtualThreadTest.java new file mode 100644 index 0000000000000..4d44358a66087 --- /dev/null +++ b/integration-tests/virtual-threads/amqp-virtual-threads/src/test/java/io/quarkus/it/vthreads/amqp/VirtualThreadTest.java @@ -0,0 +1,36 @@ +package io.quarkus.it.vthreads.amqp; + +import static com.github.tomakehurst.wiremock.client.CountMatchingStrategy.GREATER_THAN_OR_EQUAL; +import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo; +import static com.github.tomakehurst.wiremock.http.RequestMethod.POST; +import static com.github.tomakehurst.wiremock.matching.RequestPatternBuilder.newRequestPattern; +import static org.awaitility.Awaitility.await; + +import org.junit.jupiter.api.Test; + +import com.github.tomakehurst.wiremock.WireMockServer; +import com.github.tomakehurst.wiremock.client.CountMatchingStrategy; + +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; + +@QuarkusTest +@QuarkusTestResource(WireMockExtension.class) +public class VirtualThreadTest { + + public static final int EXPECTED_CALLS = 10; + WireMockServer mockServer; + + @Test + void testAlert() { + await().untilAsserted(() -> mockServer.verify(new CountMatchingStrategy(GREATER_THAN_OR_EQUAL, EXPECTED_CALLS), + newRequestPattern(POST, urlPathEqualTo("/price/alert")))); + } + + @Test + void testAlertMessage() { + await().untilAsserted(() -> mockServer.verify(new CountMatchingStrategy(GREATER_THAN_OR_EQUAL, EXPECTED_CALLS), + newRequestPattern(POST, urlPathEqualTo("/price/alert-message")))); + } + +} diff --git a/integration-tests/virtual-threads/amqp-virtual-threads/src/test/java/io/quarkus/it/vthreads/amqp/WireMockExtension.java b/integration-tests/virtual-threads/amqp-virtual-threads/src/test/java/io/quarkus/it/vthreads/amqp/WireMockExtension.java new file mode 100644 index 0000000000000..8d3499c6d2856 --- /dev/null +++ b/integration-tests/virtual-threads/amqp-virtual-threads/src/test/java/io/quarkus/it/vthreads/amqp/WireMockExtension.java @@ -0,0 +1,41 @@ +package io.quarkus.it.vthreads.amqp; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.post; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; + +import java.util.Map; + +import com.github.tomakehurst.wiremock.WireMockServer; + +import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; + +public class WireMockExtension implements QuarkusTestResourceLifecycleManager { + + private WireMockServer wireMockServer; + + @Override + public Map start() { + wireMockServer = new WireMockServer(); + wireMockServer.start(); + + wireMockServer.stubFor(post(urlEqualTo("/price/alert")) + .willReturn(aResponse().withBody("ok"))); + wireMockServer.stubFor(post(urlEqualTo("/price/alert-message")) + .willReturn(aResponse().withBody("ok"))); + + return Map.of("price-alert/mp-rest/url", wireMockServer.baseUrl()); + } + + @Override + public void inject(TestInjector testInjector) { + testInjector.injectIntoFields(wireMockServer, f -> f.getType().isAssignableFrom(WireMockServer.class)); + } + + @Override + public void stop() { + if (null != wireMockServer) { + wireMockServer.stop(); + } + } +} diff --git a/integration-tests/virtual-threads/grpc-virtual-threads/src/test/java/io/quarkus/grpc/example/streaming/NoPinningIT.java b/integration-tests/virtual-threads/grpc-virtual-threads/src/test/java/io/quarkus/grpc/example/streaming/NoPinningVerify.java similarity index 98% rename from integration-tests/virtual-threads/grpc-virtual-threads/src/test/java/io/quarkus/grpc/example/streaming/NoPinningIT.java rename to integration-tests/virtual-threads/grpc-virtual-threads/src/test/java/io/quarkus/grpc/example/streaming/NoPinningVerify.java index 459909827872f..7e3dc8bbea963 100644 --- a/integration-tests/virtual-threads/grpc-virtual-threads/src/test/java/io/quarkus/grpc/example/streaming/NoPinningIT.java +++ b/integration-tests/virtual-threads/grpc-virtual-threads/src/test/java/io/quarkus/grpc/example/streaming/NoPinningVerify.java @@ -20,7 +20,7 @@ * An integration test reading the output of the unit test to verify that no tests where pinning the carrier thread. * It reads the reports generated by surefire. */ -public class NoPinningIT { +public class NoPinningVerify { @Test void verify() throws IOException, ParserConfigurationException, SAXException { diff --git a/integration-tests/virtual-threads/jms-virtual-threads/pom.xml b/integration-tests/virtual-threads/jms-virtual-threads/pom.xml new file mode 100644 index 0000000000000..c9696a8c7a126 --- /dev/null +++ b/integration-tests/virtual-threads/jms-virtual-threads/pom.xml @@ -0,0 +1,151 @@ + + + 4.0.0 + + + quarkus-virtual-threads-integration-tests-parent + io.quarkus + 999-SNAPSHOT + + + quarkus-integration-test-virtual-threads-jms + Quarkus - Integration Tests - Virtual Threads - JMS + + + + + io.quarkiverse.artemis + quarkus-artemis-bom + 3.0.1 + pom + import + + + + + + io.quarkus + quarkus-resteasy-reactive-jackson + + + io.quarkus + quarkus-smallrye-reactive-messaging + + + io.smallrye.reactive + smallrye-reactive-messaging-jms + 4.8.0 + + + io.quarkus + quarkus-rest-client-reactive + + + io.quarkiverse.artemis + quarkus-artemis-jms + + + io.quarkiverse.artemis + quarkus-test-artemis + test + + + + io.quarkus + quarkus-junit5 + test + + + io.rest-assured + rest-assured + test + + + org.awaitility + awaitility + test + + + org.assertj + assertj-core + test + + + io.quarkus + quarkus-test-vertx + test + + + io.quarkus + quarkus-junit5-mockito + test + + + com.github.tomakehurst + wiremock-jre8-standalone + test + + + io.quarkus + quarkus-test-common + test + + + + + io.quarkus + quarkus-smallrye-reactive-messaging-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-rest-client-reactive-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-resteasy-reactive-jackson-deployment + ${project.version} + pom + test + + + * + * + + + + + + + + + io.quarkus + quarkus-maven-plugin + + + org.apache.maven.plugins + maven-surefire-plugin + + + + + diff --git a/integration-tests/virtual-threads/jms-virtual-threads/src/main/java/io/quarkus/it/vthreads/jms/AssertHelper.java b/integration-tests/virtual-threads/jms-virtual-threads/src/main/java/io/quarkus/it/vthreads/jms/AssertHelper.java new file mode 100644 index 0000000000000..1f1c0ad2965dd --- /dev/null +++ b/integration-tests/virtual-threads/jms-virtual-threads/src/main/java/io/quarkus/it/vthreads/jms/AssertHelper.java @@ -0,0 +1,68 @@ +package io.quarkus.it.vthreads.jms; + +import java.lang.reflect.Method; + +import io.quarkus.arc.Arc; +import io.smallrye.common.vertx.VertxContext; +import io.vertx.core.Vertx; + +public class AssertHelper { + + /** + * Asserts that the current method: + * - runs on a duplicated context + * - runs on a virtual thread + * - has the request scope activated + */ + public static void assertEverything() { + assertThatTheRequestScopeIsActive(); + assertThatItRunsOnVirtualThread(); + assertThatItRunsOnADuplicatedContext(); + } + + public static void assertThatTheRequestScopeIsActive() { + if (!Arc.container().requestContext().isActive()) { + throw new AssertionError(("Expected the request scope to be active")); + } + } + + public static void assertThatItRunsOnADuplicatedContext() { + var context = Vertx.currentContext(); + if (context == null) { + throw new AssertionError("The method does not run on a Vert.x context"); + } + if (!VertxContext.isOnDuplicatedContext()) { + throw new AssertionError("The method does not run on a Vert.x **duplicated** context"); + } + } + + public static void assertThatItRunsOnVirtualThread() { + // We cannot depend on a Java 20. + try { + Method isVirtual = Thread.class.getMethod("isVirtual"); + isVirtual.setAccessible(true); + boolean virtual = (Boolean) isVirtual.invoke(Thread.currentThread()); + if (!virtual) { + throw new AssertionError("Thread " + Thread.currentThread() + " is not a virtual thread"); + } + } catch (Exception e) { + throw new AssertionError( + "Thread " + Thread.currentThread() + " is not a virtual thread - cannot invoke Thread.isVirtual()", e); + } + } + + public static void assertThatItDoesNotRunOnVirtualThread() { + // We cannot depend on a Java 20. + try { + Method isVirtual = Thread.class.getMethod("isVirtual"); + isVirtual.setAccessible(true); + boolean virtual = (Boolean) isVirtual.invoke(Thread.currentThread()); + if (virtual) { + throw new AssertionError("Thread " + Thread.currentThread() + " is a virtual thread"); + } + } catch (Exception e) { + throw new AssertionError( + "Thread " + Thread.currentThread() + " is a virtual thread - but cannot invoke Thread.isVirtual()", e); + } + } +} diff --git a/integration-tests/virtual-threads/jms-virtual-threads/src/main/java/io/quarkus/it/vthreads/jms/PriceAlertService.java b/integration-tests/virtual-threads/jms-virtual-threads/src/main/java/io/quarkus/it/vthreads/jms/PriceAlertService.java new file mode 100644 index 0000000000000..9554c0ad931ea --- /dev/null +++ b/integration-tests/virtual-threads/jms-virtual-threads/src/main/java/io/quarkus/it/vthreads/jms/PriceAlertService.java @@ -0,0 +1,23 @@ +package io.quarkus.it.vthreads.jms; + +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.core.MediaType; + +import org.eclipse.microprofile.rest.client.inject.RegisterRestClient; + +@Path("price") +@RegisterRestClient(configKey = "price-alert") +public interface PriceAlertService { + + @Path("alert") + @POST + @Consumes(MediaType.TEXT_PLAIN) + String alert(double value); + + @Path("alert-message") + @POST + @Consumes(MediaType.TEXT_PLAIN) + String alertMessage(double value); +} diff --git a/integration-tests/virtual-threads/jms-virtual-threads/src/main/java/io/quarkus/it/vthreads/jms/PriceConsumer.java b/integration-tests/virtual-threads/jms-virtual-threads/src/main/java/io/quarkus/it/vthreads/jms/PriceConsumer.java new file mode 100644 index 0000000000000..336974a7c66d8 --- /dev/null +++ b/integration-tests/virtual-threads/jms-virtual-threads/src/main/java/io/quarkus/it/vthreads/jms/PriceConsumer.java @@ -0,0 +1,55 @@ +package io.quarkus.it.vthreads.jms; + +import static io.quarkus.it.vthreads.jms.AssertHelper.assertThatItDoesNotRunOnVirtualThread; +import static io.quarkus.it.vthreads.jms.AssertHelper.assertThatItRunsOnVirtualThread; + +import java.util.Random; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicInteger; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.eclipse.microprofile.rest.client.inject.RestClient; + +import io.smallrye.common.annotation.RunOnVirtualThread; + +@ApplicationScoped +public class PriceConsumer { + + @RestClient + PriceAlertService alertService; + + @Incoming("prices") + @RunOnVirtualThread + public CompletionStage consume(Message msg) { + assertThatItRunsOnVirtualThread(); + double price = msg.getPayload(); + if (price > 90.0) { + alertService.alertMessage(price); + } + return msg.ack().thenAccept(x -> assertThatItDoesNotRunOnVirtualThread()); + } + + @Incoming("prices") + @RunOnVirtualThread + public void consume(double price) { + assertThatItRunsOnVirtualThread(); + if (price > 90.0) { + alertService.alert(price); + } + } + + Random r = new Random(); + AtomicInteger i = new AtomicInteger(); + + @Outgoing("prices-out") + @RunOnVirtualThread + public Message randomPriceGenerator() { + assertThatItRunsOnVirtualThread(); + return Message.of(r.nextDouble() * 10 * i.incrementAndGet()); + } + +} diff --git a/integration-tests/virtual-threads/jms-virtual-threads/src/main/resources/application.properties b/integration-tests/virtual-threads/jms-virtual-threads/src/main/resources/application.properties new file mode 100644 index 0000000000000..176a3abff2075 --- /dev/null +++ b/integration-tests/virtual-threads/jms-virtual-threads/src/main/resources/application.properties @@ -0,0 +1,10 @@ +price-alert/mp-rest/url=${test.url} +mp.messaging.incoming.prices.broadcast=true +mp.messaging.outgoing.prices-out.destination=prices + +smallrye.messaging.worker..max-concurrency=5 + +quarkus.native.additional-build-args=--enable-preview + +quarkus.artemis.devservices.enabled=true +quarkus.artemis.devservices.image-name=quay.io/artemiscloud/activemq-artemis-broker:1.0.18 diff --git a/integration-tests/virtual-threads/jms-virtual-threads/src/test/java/io/quarkus/it/vthreads/jms/NoPinningVerify.java b/integration-tests/virtual-threads/jms-virtual-threads/src/test/java/io/quarkus/it/vthreads/jms/NoPinningVerify.java new file mode 100644 index 0000000000000..b32c44a8f8a15 --- /dev/null +++ b/integration-tests/virtual-threads/jms-virtual-threads/src/test/java/io/quarkus/it/vthreads/jms/NoPinningVerify.java @@ -0,0 +1,76 @@ +package io.quarkus.it.vthreads.jms; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.w3c.dom.Document; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; +import org.xml.sax.SAXException; + +/** + * An integration test reading the output of the unit test to verify that no tests where pinning the carrier thread. + * It reads the reports generated by surefire. + */ +public class NoPinningVerify { + + @Test + void verify() throws IOException, ParserConfigurationException, SAXException { + var reports = new File("target", "surefire-reports"); + Assertions.assertTrue(reports.isDirectory(), + "Unable to find " + reports.getAbsolutePath() + ", did you run the tests with Maven before?"); + var list = reports.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.startsWith("TEST") && name.endsWith("Test.xml"); + } + }); + Assertions.assertNotNull(list, + "Unable to find " + reports.getAbsolutePath() + ", did you run the tests with Maven before?"); + + for (File report : list) { + Document document = DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(report); + var suite = document.getFirstChild(); + var cases = getChildren(suite.getChildNodes(), "testcase"); + for (Node c : cases) { + verify(report, c); + } + } + + } + + private void verify(File file, Node ca) { + var fullname = ca.getAttributes().getNamedItem("classname").getTextContent() + "." + + ca.getAttributes().getNamedItem("name").getTextContent(); + var output = getChildren(ca.getChildNodes(), "system-out"); + if (output.isEmpty()) { + return; + } + var sout = output.get(0).getTextContent(); + if (sout.contains("VThreadContinuation.onPinned")) { + throw new AssertionError("The test case " + fullname + " pinned the carrier thread, check " + file.getAbsolutePath() + + " for details (or the log of the test)"); + } + + } + + private List getChildren(NodeList nodes, String name) { + List list = new ArrayList<>(); + for (int i = 0; i < nodes.getLength(); i++) { + var node = nodes.item(i); + if (node.getNodeName().equalsIgnoreCase(name)) { + list.add(node); + } + } + return list; + } + +} diff --git a/integration-tests/virtual-threads/jms-virtual-threads/src/test/java/io/quarkus/it/vthreads/jms/VirtualThreadITCase.java b/integration-tests/virtual-threads/jms-virtual-threads/src/test/java/io/quarkus/it/vthreads/jms/VirtualThreadITCase.java new file mode 100644 index 0000000000000..a7ab26278f205 --- /dev/null +++ b/integration-tests/virtual-threads/jms-virtual-threads/src/test/java/io/quarkus/it/vthreads/jms/VirtualThreadITCase.java @@ -0,0 +1,7 @@ +package io.quarkus.it.vthreads.jms; + +import io.quarkus.test.junit.QuarkusIntegrationTest; + +@QuarkusIntegrationTest +public class VirtualThreadITCase extends VirtualThreadTest { +} diff --git a/integration-tests/virtual-threads/jms-virtual-threads/src/test/java/io/quarkus/it/vthreads/jms/VirtualThreadTest.java b/integration-tests/virtual-threads/jms-virtual-threads/src/test/java/io/quarkus/it/vthreads/jms/VirtualThreadTest.java new file mode 100644 index 0000000000000..bb115b5ea1d3e --- /dev/null +++ b/integration-tests/virtual-threads/jms-virtual-threads/src/test/java/io/quarkus/it/vthreads/jms/VirtualThreadTest.java @@ -0,0 +1,36 @@ +package io.quarkus.it.vthreads.jms; + +import static com.github.tomakehurst.wiremock.client.CountMatchingStrategy.GREATER_THAN_OR_EQUAL; +import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo; +import static com.github.tomakehurst.wiremock.http.RequestMethod.POST; +import static com.github.tomakehurst.wiremock.matching.RequestPatternBuilder.newRequestPattern; +import static org.awaitility.Awaitility.await; + +import org.junit.jupiter.api.Test; + +import com.github.tomakehurst.wiremock.WireMockServer; +import com.github.tomakehurst.wiremock.client.CountMatchingStrategy; + +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; + +@QuarkusTest +@QuarkusTestResource(WireMockExtension.class) +public class VirtualThreadTest { + + public static final int EXPECTED_CALLS = 10; + WireMockServer mockServer; + + @Test + void testAlert() { + await().untilAsserted(() -> mockServer.verify(new CountMatchingStrategy(GREATER_THAN_OR_EQUAL, EXPECTED_CALLS), + newRequestPattern(POST, urlPathEqualTo("/price/alert")))); + } + + @Test + void testAlertMessage() { + await().untilAsserted(() -> mockServer.verify(new CountMatchingStrategy(GREATER_THAN_OR_EQUAL, EXPECTED_CALLS), + newRequestPattern(POST, urlPathEqualTo("/price/alert-message")))); + } + +} diff --git a/integration-tests/virtual-threads/jms-virtual-threads/src/test/java/io/quarkus/it/vthreads/jms/WireMockExtension.java b/integration-tests/virtual-threads/jms-virtual-threads/src/test/java/io/quarkus/it/vthreads/jms/WireMockExtension.java new file mode 100644 index 0000000000000..43584cd30366f --- /dev/null +++ b/integration-tests/virtual-threads/jms-virtual-threads/src/test/java/io/quarkus/it/vthreads/jms/WireMockExtension.java @@ -0,0 +1,41 @@ +package io.quarkus.it.vthreads.jms; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.post; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; + +import java.util.Map; + +import com.github.tomakehurst.wiremock.WireMockServer; + +import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; + +public class WireMockExtension implements QuarkusTestResourceLifecycleManager { + + private WireMockServer wireMockServer; + + @Override + public Map start() { + wireMockServer = new WireMockServer(); + wireMockServer.start(); + + wireMockServer.stubFor(post(urlEqualTo("/price/alert")) + .willReturn(aResponse().withBody("ok"))); + wireMockServer.stubFor(post(urlEqualTo("/price/alert-message")) + .willReturn(aResponse().withBody("ok"))); + + return Map.of("price-alert/mp-rest/url", wireMockServer.baseUrl()); + } + + @Override + public void inject(TestInjector testInjector) { + testInjector.injectIntoFields(wireMockServer, f -> f.getType().isAssignableFrom(WireMockServer.class)); + } + + @Override + public void stop() { + if (null != wireMockServer) { + wireMockServer.stop(); + } + } +} diff --git a/integration-tests/virtual-threads/kafka-virtual-threads/pom.xml b/integration-tests/virtual-threads/kafka-virtual-threads/pom.xml new file mode 100644 index 0000000000000..a2c0d1f7fc8f2 --- /dev/null +++ b/integration-tests/virtual-threads/kafka-virtual-threads/pom.xml @@ -0,0 +1,125 @@ + + + 4.0.0 + + + quarkus-virtual-threads-integration-tests-parent + io.quarkus + 999-SNAPSHOT + + + quarkus-integration-test-virtual-threads-kafka + Quarkus - Integration Tests - Virtual Threads - Kafka + + + + io.quarkus + quarkus-resteasy-reactive-jackson + + + io.quarkus + quarkus-smallrye-reactive-messaging-kafka + + + io.quarkus + quarkus-rest-client-reactive + + + + io.quarkus + quarkus-junit5 + test + + + io.rest-assured + rest-assured + test + + + org.awaitility + awaitility + test + + + org.assertj + assertj-core + test + + + io.quarkus + quarkus-test-vertx + test + + + io.quarkus + quarkus-junit5-mockito + test + + + com.github.tomakehurst + wiremock-jre8-standalone + test + + + io.quarkus + quarkus-test-common + test + + + + + io.quarkus + quarkus-smallrye-reactive-messaging-kafka-deployment + ${project.version} + pom + test + + + * + * + + + + io.quarkus + quarkus-rest-client-reactive-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-resteasy-reactive-jackson-deployment + ${project.version} + pom + test + + + * + * + + + + + + + + + io.quarkus + quarkus-maven-plugin + + + org.apache.maven.plugins + maven-surefire-plugin + + + + + diff --git a/integration-tests/virtual-threads/kafka-virtual-threads/src/main/java/io/quarkus/it/vthreads/kafka/AssertHelper.java b/integration-tests/virtual-threads/kafka-virtual-threads/src/main/java/io/quarkus/it/vthreads/kafka/AssertHelper.java new file mode 100644 index 0000000000000..35299e5918b7c --- /dev/null +++ b/integration-tests/virtual-threads/kafka-virtual-threads/src/main/java/io/quarkus/it/vthreads/kafka/AssertHelper.java @@ -0,0 +1,68 @@ +package io.quarkus.it.vthreads.kafka; + +import java.lang.reflect.Method; + +import io.quarkus.arc.Arc; +import io.smallrye.common.vertx.VertxContext; +import io.vertx.core.Vertx; + +public class AssertHelper { + + /** + * Asserts that the current method: + * - runs on a duplicated context + * - runs on a virtual thread + * - has the request scope activated + */ + public static void assertEverything() { + assertThatTheRequestScopeIsActive(); + assertThatItRunsOnVirtualThread(); + assertThatItRunsOnADuplicatedContext(); + } + + public static void assertThatTheRequestScopeIsActive() { + if (!Arc.container().requestContext().isActive()) { + throw new AssertionError(("Expected the request scope to be active")); + } + } + + public static void assertThatItRunsOnADuplicatedContext() { + var context = Vertx.currentContext(); + if (context == null) { + throw new AssertionError("The method does not run on a Vert.x context"); + } + if (!VertxContext.isOnDuplicatedContext()) { + throw new AssertionError("The method does not run on a Vert.x **duplicated** context"); + } + } + + public static void assertThatItRunsOnVirtualThread() { + // We cannot depend on a Java 20. + try { + Method isVirtual = Thread.class.getMethod("isVirtual"); + isVirtual.setAccessible(true); + boolean virtual = (Boolean) isVirtual.invoke(Thread.currentThread()); + if (!virtual) { + throw new AssertionError("Thread " + Thread.currentThread() + " is not a virtual thread"); + } + } catch (Exception e) { + throw new AssertionError( + "Thread " + Thread.currentThread() + " is not a virtual thread - cannot invoke Thread.isVirtual()", e); + } + } + + public static void assertThatItDoesNotRunOnVirtualThread() { + // We cannot depend on a Java 20. + try { + Method isVirtual = Thread.class.getMethod("isVirtual"); + isVirtual.setAccessible(true); + boolean virtual = (Boolean) isVirtual.invoke(Thread.currentThread()); + if (virtual) { + throw new AssertionError("Thread " + Thread.currentThread() + " is a virtual thread"); + } + } catch (Exception e) { + throw new AssertionError( + "Thread " + Thread.currentThread() + " is a virtual thread - but cannot invoke Thread.isVirtual()", e); + } + } +} diff --git a/integration-tests/virtual-threads/kafka-virtual-threads/src/main/java/io/quarkus/it/vthreads/kafka/PriceAlertService.java b/integration-tests/virtual-threads/kafka-virtual-threads/src/main/java/io/quarkus/it/vthreads/kafka/PriceAlertService.java new file mode 100644 index 0000000000000..c45867475f199 --- /dev/null +++ b/integration-tests/virtual-threads/kafka-virtual-threads/src/main/java/io/quarkus/it/vthreads/kafka/PriceAlertService.java @@ -0,0 +1,23 @@ +package io.quarkus.it.vthreads.kafka; + +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.core.MediaType; + +import org.eclipse.microprofile.rest.client.inject.RegisterRestClient; + +@Path("price") +@RegisterRestClient(configKey = "price-alert") +public interface PriceAlertService { + + @Path("alert") + @POST + @Consumes(MediaType.TEXT_PLAIN) + String alert(double value); + + @Path("alert-message") + @POST + @Consumes(MediaType.TEXT_PLAIN) + String alertMessage(double value); +} diff --git a/integration-tests/virtual-threads/kafka-virtual-threads/src/main/java/io/quarkus/it/vthreads/kafka/PriceConsumer.java b/integration-tests/virtual-threads/kafka-virtual-threads/src/main/java/io/quarkus/it/vthreads/kafka/PriceConsumer.java new file mode 100644 index 0000000000000..f9256b5065789 --- /dev/null +++ b/integration-tests/virtual-threads/kafka-virtual-threads/src/main/java/io/quarkus/it/vthreads/kafka/PriceConsumer.java @@ -0,0 +1,61 @@ +package io.quarkus.it.vthreads.kafka; + +import static io.quarkus.it.vthreads.kafka.AssertHelper.assertThatItDoesNotRunOnVirtualThread; +import static io.quarkus.it.vthreads.kafka.AssertHelper.assertThatItRunsOnADuplicatedContext; +import static io.quarkus.it.vthreads.kafka.AssertHelper.assertThatItRunsOnVirtualThread; + +import java.util.Random; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicInteger; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.eclipse.microprofile.rest.client.inject.RestClient; + +import io.smallrye.common.annotation.RunOnVirtualThread; + +@ApplicationScoped +public class PriceConsumer { + + @RestClient + PriceAlertService alertService; + + @Incoming("prices") + @RunOnVirtualThread + public CompletionStage consume(Message msg) { + assertThatItRunsOnVirtualThread(); + assertThatItRunsOnADuplicatedContext(); + double price = msg.getPayload(); + if (price > 90.0) { + alertService.alertMessage(price); + } + return msg.ack().thenAccept(x -> { + assertThatItRunsOnADuplicatedContext(); + assertThatItDoesNotRunOnVirtualThread(); + }); + } + + @Incoming("prices") + @RunOnVirtualThread + public void consume(double price) { + assertThatItRunsOnVirtualThread(); + assertThatItRunsOnADuplicatedContext(); + if (price > 90.0) { + alertService.alert(price); + } + } + + Random r = new Random(); + AtomicInteger i = new AtomicInteger(); + + @Outgoing("prices-out") + @RunOnVirtualThread + public Message randomPriceGenerator() { + assertThatItRunsOnVirtualThread(); + return Message.of(r.nextDouble() * 10 * i.incrementAndGet()); + } + +} diff --git a/integration-tests/virtual-threads/kafka-virtual-threads/src/main/resources/application.properties b/integration-tests/virtual-threads/kafka-virtual-threads/src/main/resources/application.properties new file mode 100644 index 0000000000000..f4a4265ca9756 --- /dev/null +++ b/integration-tests/virtual-threads/kafka-virtual-threads/src/main/resources/application.properties @@ -0,0 +1,8 @@ +price-alert/mp-rest/url=${test.url} +mp.messaging.incoming.prices.broadcast=true +mp.messaging.incoming.prices.auto.offset.reset=earliest +mp.messaging.outgoing.prices-out.topic=prices + +smallrye.messaging.worker..max-concurrency=10 + +quarkus.native.additional-build-args=--enable-preview diff --git a/integration-tests/virtual-threads/kafka-virtual-threads/src/test/java/io/quarkus/it/vthreads/kafka/NoPinningVerify.java b/integration-tests/virtual-threads/kafka-virtual-threads/src/test/java/io/quarkus/it/vthreads/kafka/NoPinningVerify.java new file mode 100644 index 0000000000000..69ade96c85ab9 --- /dev/null +++ b/integration-tests/virtual-threads/kafka-virtual-threads/src/test/java/io/quarkus/it/vthreads/kafka/NoPinningVerify.java @@ -0,0 +1,76 @@ +package io.quarkus.it.vthreads.kafka; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.w3c.dom.Document; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; +import org.xml.sax.SAXException; + +/** + * An integration test reading the output of the unit test to verify that no tests where pinning the carrier thread. + * It reads the reports generated by surefire. + */ +public class NoPinningVerify { + + @Test + void verify() throws IOException, ParserConfigurationException, SAXException { + var reports = new File("target", "surefire-reports"); + Assertions.assertTrue(reports.isDirectory(), + "Unable to find " + reports.getAbsolutePath() + ", did you run the tests with Maven before?"); + var list = reports.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.startsWith("TEST") && name.endsWith("Test.xml"); + } + }); + Assertions.assertNotNull(list, + "Unable to find " + reports.getAbsolutePath() + ", did you run the tests with Maven before?"); + + for (File report : list) { + Document document = DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(report); + var suite = document.getFirstChild(); + var cases = getChildren(suite.getChildNodes(), "testcase"); + for (Node c : cases) { + verify(report, c); + } + } + + } + + private void verify(File file, Node ca) { + var fullname = ca.getAttributes().getNamedItem("classname").getTextContent() + "." + + ca.getAttributes().getNamedItem("name").getTextContent(); + var output = getChildren(ca.getChildNodes(), "system-out"); + if (output.isEmpty()) { + return; + } + var sout = output.get(0).getTextContent(); + if (sout.contains("VThreadContinuation.onPinned")) { + throw new AssertionError("The test case " + fullname + " pinned the carrier thread, check " + file.getAbsolutePath() + + " for details (or the log of the test)"); + } + + } + + private List getChildren(NodeList nodes, String name) { + List list = new ArrayList<>(); + for (int i = 0; i < nodes.getLength(); i++) { + var node = nodes.item(i); + if (node.getNodeName().equalsIgnoreCase(name)) { + list.add(node); + } + } + return list; + } + +} diff --git a/integration-tests/virtual-threads/kafka-virtual-threads/src/test/java/io/quarkus/it/vthreads/kafka/VirtualThreadITCase.java b/integration-tests/virtual-threads/kafka-virtual-threads/src/test/java/io/quarkus/it/vthreads/kafka/VirtualThreadITCase.java new file mode 100644 index 0000000000000..c4fdf775344d7 --- /dev/null +++ b/integration-tests/virtual-threads/kafka-virtual-threads/src/test/java/io/quarkus/it/vthreads/kafka/VirtualThreadITCase.java @@ -0,0 +1,7 @@ +package io.quarkus.it.vthreads.kafka; + +import io.quarkus.test.junit.QuarkusIntegrationTest; + +@QuarkusIntegrationTest +public class VirtualThreadITCase extends VirtualThreadTest { +} diff --git a/integration-tests/virtual-threads/kafka-virtual-threads/src/test/java/io/quarkus/it/vthreads/kafka/VirtualThreadTest.java b/integration-tests/virtual-threads/kafka-virtual-threads/src/test/java/io/quarkus/it/vthreads/kafka/VirtualThreadTest.java new file mode 100644 index 0000000000000..547055b00a1f1 --- /dev/null +++ b/integration-tests/virtual-threads/kafka-virtual-threads/src/test/java/io/quarkus/it/vthreads/kafka/VirtualThreadTest.java @@ -0,0 +1,36 @@ +package io.quarkus.it.vthreads.kafka; + +import static com.github.tomakehurst.wiremock.client.CountMatchingStrategy.GREATER_THAN_OR_EQUAL; +import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo; +import static com.github.tomakehurst.wiremock.http.RequestMethod.POST; +import static com.github.tomakehurst.wiremock.matching.RequestPatternBuilder.newRequestPattern; +import static org.awaitility.Awaitility.await; + +import org.junit.jupiter.api.Test; + +import com.github.tomakehurst.wiremock.WireMockServer; +import com.github.tomakehurst.wiremock.client.CountMatchingStrategy; + +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; + +@QuarkusTest +@QuarkusTestResource(WireMockExtension.class) +public class VirtualThreadTest { + + public static final int EXPECTED_CALLS = 10; + WireMockServer mockServer; + + @Test + void testAlert() { + await().untilAsserted(() -> mockServer.verify(new CountMatchingStrategy(GREATER_THAN_OR_EQUAL, EXPECTED_CALLS), + newRequestPattern(POST, urlPathEqualTo("/price/alert")))); + } + + @Test + void testAlertMessage() { + await().untilAsserted(() -> mockServer.verify(new CountMatchingStrategy(GREATER_THAN_OR_EQUAL, EXPECTED_CALLS), + newRequestPattern(POST, urlPathEqualTo("/price/alert-message")))); + } + +} diff --git a/integration-tests/virtual-threads/kafka-virtual-threads/src/test/java/io/quarkus/it/vthreads/kafka/WireMockExtension.java b/integration-tests/virtual-threads/kafka-virtual-threads/src/test/java/io/quarkus/it/vthreads/kafka/WireMockExtension.java new file mode 100644 index 0000000000000..d081410fb3bb9 --- /dev/null +++ b/integration-tests/virtual-threads/kafka-virtual-threads/src/test/java/io/quarkus/it/vthreads/kafka/WireMockExtension.java @@ -0,0 +1,41 @@ +package io.quarkus.it.vthreads.kafka; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.post; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; + +import java.util.Map; + +import com.github.tomakehurst.wiremock.WireMockServer; + +import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; + +public class WireMockExtension implements QuarkusTestResourceLifecycleManager { + + private WireMockServer wireMockServer; + + @Override + public Map start() { + wireMockServer = new WireMockServer(); + wireMockServer.start(); + + wireMockServer.stubFor(post(urlEqualTo("/price/alert")) + .willReturn(aResponse().withBody("ok"))); + wireMockServer.stubFor(post(urlEqualTo("/price/alert-message")) + .willReturn(aResponse().withBody("ok"))); + + return Map.of("price-alert/mp-rest/url", wireMockServer.baseUrl()); + } + + @Override + public void inject(TestInjector testInjector) { + testInjector.injectIntoFields(wireMockServer, f -> f.getType().isAssignableFrom(WireMockServer.class)); + } + + @Override + public void stop() { + if (null != wireMockServer) { + wireMockServer.stop(); + } + } +} diff --git a/integration-tests/virtual-threads/mailer-virtual-threads/src/test/java/io/quarkus/virtual/mail/NoPinningIT.java b/integration-tests/virtual-threads/mailer-virtual-threads/src/test/java/io/quarkus/virtual/mail/NoPinningVerify.java similarity index 98% rename from integration-tests/virtual-threads/mailer-virtual-threads/src/test/java/io/quarkus/virtual/mail/NoPinningIT.java rename to integration-tests/virtual-threads/mailer-virtual-threads/src/test/java/io/quarkus/virtual/mail/NoPinningVerify.java index 58ac0ddf09f59..99ce0563fdbb4 100644 --- a/integration-tests/virtual-threads/mailer-virtual-threads/src/test/java/io/quarkus/virtual/mail/NoPinningIT.java +++ b/integration-tests/virtual-threads/mailer-virtual-threads/src/test/java/io/quarkus/virtual/mail/NoPinningVerify.java @@ -20,7 +20,7 @@ * An integration test reading the output of the unit test to verify that no tests where pinning the carrier thread. * It reads the reports generated by surefire. */ -public class NoPinningIT { +public class NoPinningVerify { @Test void verify() throws IOException, ParserConfigurationException, SAXException { diff --git a/integration-tests/virtual-threads/pom.xml b/integration-tests/virtual-threads/pom.xml index 7d84384f550bb..d16892a4cd826 100644 --- a/integration-tests/virtual-threads/pom.xml +++ b/integration-tests/virtual-threads/pom.xml @@ -23,7 +23,16 @@ true - + + grpc-virtual-threads + resteasy-reactive-virtual-threads + mailer-virtual-threads + redis-virtual-threads + rest-client-reactive-virtual-threads + kafka-virtual-threads + amqp-virtual-threads + jms-virtual-threads + @@ -90,15 +99,27 @@ org.jboss.logmanager.LogManager ${maven.home} - --enable-preview -Djdk.tracePinnedThreads + + --enable-preview -Djdk.tracePinnedThreads -Dgradle.scan.captureTestLogging=false ${skipTests} + + + pinning-test + + test + + + + NoPinningVerify.java + + + + io.quarkus quarkus-maven-plugin - ${project.version} - true @@ -204,13 +225,6 @@ [20,) - - grpc-virtual-threads - resteasy-reactive-virtual-threads - mailer-virtual-threads - redis-virtual-threads - rest-client-reactive-virtual-threads - 20 true diff --git a/integration-tests/virtual-threads/redis-virtual-threads/src/test/java/io/quarkus/virtual/redis/NoPinningIT.java b/integration-tests/virtual-threads/redis-virtual-threads/src/test/java/io/quarkus/virtual/redis/NoPinningVerify.java similarity index 98% rename from integration-tests/virtual-threads/redis-virtual-threads/src/test/java/io/quarkus/virtual/redis/NoPinningIT.java rename to integration-tests/virtual-threads/redis-virtual-threads/src/test/java/io/quarkus/virtual/redis/NoPinningVerify.java index 420d0200e2c6f..6490e39a4d387 100644 --- a/integration-tests/virtual-threads/redis-virtual-threads/src/test/java/io/quarkus/virtual/redis/NoPinningIT.java +++ b/integration-tests/virtual-threads/redis-virtual-threads/src/test/java/io/quarkus/virtual/redis/NoPinningVerify.java @@ -20,7 +20,7 @@ * An integration test reading the output of the unit test to verify that no tests where pinning the carrier thread. * It reads the reports generated by surefire. */ -public class NoPinningIT { +public class NoPinningVerify { @Test void verify() throws IOException, ParserConfigurationException, SAXException { diff --git a/integration-tests/virtual-threads/rest-client-reactive-virtual-threads/src/test/java/io/quarkus/virtual/rest/NoPinningIT.java b/integration-tests/virtual-threads/rest-client-reactive-virtual-threads/src/test/java/io/quarkus/virtual/rest/NoPinningVerify.java similarity index 98% rename from integration-tests/virtual-threads/rest-client-reactive-virtual-threads/src/test/java/io/quarkus/virtual/rest/NoPinningIT.java rename to integration-tests/virtual-threads/rest-client-reactive-virtual-threads/src/test/java/io/quarkus/virtual/rest/NoPinningVerify.java index 0124c23c7be9b..2eb1d2963110d 100644 --- a/integration-tests/virtual-threads/rest-client-reactive-virtual-threads/src/test/java/io/quarkus/virtual/rest/NoPinningIT.java +++ b/integration-tests/virtual-threads/rest-client-reactive-virtual-threads/src/test/java/io/quarkus/virtual/rest/NoPinningVerify.java @@ -20,7 +20,7 @@ * An integration test reading the output of the unit test to verify that no tests where pinning the carrier thread. * It reads the reports generated by surefire. */ -public class NoPinningIT { +public class NoPinningVerify { @Test void verify() throws IOException, ParserConfigurationException, SAXException { diff --git a/integration-tests/virtual-threads/resteasy-reactive-virtual-threads/src/test/java/io/quarkus/virtual/rr/NoPinningIT.java b/integration-tests/virtual-threads/resteasy-reactive-virtual-threads/src/test/java/io/quarkus/virtual/rr/NoPinningVerify.java similarity index 98% rename from integration-tests/virtual-threads/resteasy-reactive-virtual-threads/src/test/java/io/quarkus/virtual/rr/NoPinningIT.java rename to integration-tests/virtual-threads/resteasy-reactive-virtual-threads/src/test/java/io/quarkus/virtual/rr/NoPinningVerify.java index 4be03272150ff..3dc4cf6e76b2a 100644 --- a/integration-tests/virtual-threads/resteasy-reactive-virtual-threads/src/test/java/io/quarkus/virtual/rr/NoPinningIT.java +++ b/integration-tests/virtual-threads/resteasy-reactive-virtual-threads/src/test/java/io/quarkus/virtual/rr/NoPinningVerify.java @@ -20,7 +20,7 @@ * An integration test reading the output of the unit test to verify that no tests where pinning the carrier thread. * It reads the reports generated by surefire. */ -public class NoPinningIT { +public class NoPinningVerify { @Test void verify() throws IOException, ParserConfigurationException, SAXException {