diff --git a/examples/webserver/threads/README.md b/examples/webserver/threads/README.md new file mode 100644 index 00000000000..7c3b5355c93 --- /dev/null +++ b/examples/webserver/threads/README.md @@ -0,0 +1,98 @@ +# Helidon SE Threading Example + +Helidon's adoption of virtual threads has eliminated a lot of the headaches +of thread pools and thread pool tuning. But there are still cases where using +application specific executors is desirable. This example illustrates two +such cases: + +1. Using a virtual thread executor to execute multiple tasks in parallel. +2. Using a platform thread executor to execute long-running CPU intensive operations. + +## Build and run + +```bash +mvn package +java -jar target/helidon-examples-webserver-threads.jar +``` + +## Exercise the application + +__Compute:__ +``` +curl -X GET http://localhost:8080/thread/compute/5 +``` +The `compute` endpoint runs a costly floating point computation using a platform thread. +Increase the number to make the computation more costly (and take longer). + +The request returns the results of the computation (not important!). + +__Fanout:__ +``` +curl -X GET http://localhost:8080/thread/fanout/5 +``` +The `fanout` endpoint simulates a fanout of remote calls that are run in parallel using +virtual threads. Each remote call invokes the server's `sleep` endpoint sleeping anywhere from +0 to 4 seconds. Since the remote requests are executed in parallel the curl request should not +take longer than 4 seconds to return. Increase the number to have more remote calls made +in parallel. + +The request returns a list of numbers showing the sleep value of each remote client call. + +__Sleep:__ +``` +curl -X GET http://localhost:8080/thread/sleep/4 +``` +This is a simple endpoint that just sleeps for the specified number of seconds. It is +used by the `fanout` endpoint. + +The request returns the number of seconds requested to sleep. + +## Further Discussion + +### Use Case 1: Virtual Threads: Executing Tasks in Parallel + +Sometimes an endpoint needs to perform multiple blocking operations in parallel: +querying a database, calling another service, etc. Virtual threads are a +good fit for this because they are lightweight and do not consume platform +threads when performing blocking operations (like network I/O). + +The `fanout` endpoint in this example demonstrates this use case. You pass the endpoint +the number of parallel tasks to execute and it simulates remote client calls by using +the Helidon WebClient to call the `sleep` endpoint on the server. + +### Use Case 2: Platform Threads: Executing a CPU Intensive Task + +If you have an endpoint that performs an in-memory, CPU intensive task, then +platform threads might be a better match. This is because a virtual thread would be pinned to +a platform thread throughout the computation -- potentially causing unbounded consumption +of platform threads. Instead, the example uses a small, bounded pool of platform +threads to perform computations. Bounded meaning that the number of threads and the +size of the work queue are both limited and will reject work when they fill up. +This gives the application tight control over the resources allocated to these CPU intensive tasks. + +The `compute` endpoint in this example demonstrates this use case. You pass the endpoint +the number of times you want to make the computation, and it uses a small bounded pool +of platform threads to execute the task. + +### Use of Helidon's ThreadPoolSupplier and Configuration + +This example uses `io.helidon.common.configurable.ThreadPoolSupplier` to create the +two executors used in the example. This provides a few benefits: + +1. ThreadPoolSupplier supports a number of tuning parameters that enable us to configure a small, bounded threadpool. +2. You can drive the thread pool configuration via Helidon config -- see this example's `application.yaml` +3. You get propagation of Helidon's Context which supports Helidon's features as well as direct use by the application. + +### Logging + +In `logging.properties` the log level for `io.helidon.common.configurable.ThreadPool` +is increased so that you can see the values used to configure the platform thread pool. +When you start the application you will see a line like +``` +ThreadPool 'application-platform-executor-thread-pool-1' {corePoolSize=1, maxPoolSize=2, + queueCapacity=10, growthThreshold=1000, growthRate=0%, averageQueueSize=0.00, peakQueueSize=0, averageActiveThreads=0.00, peakPoolSize=0, currentPoolSize=0, completedTasks=0, failedTasks=0, rejectedTasks=0} +``` +This reflects the configuration of the platform thread pool created by the application +and used by the `compute` endpoint. At most the thread pool will consume two platform +threads for computations. The work queue is limited to 10 entries to allow for small +bursts of requests. diff --git a/examples/webserver/threads/pom.xml b/examples/webserver/threads/pom.xml new file mode 100644 index 00000000000..3fc7bbf87e5 --- /dev/null +++ b/examples/webserver/threads/pom.xml @@ -0,0 +1,85 @@ + + + + 4.0.0 + + io.helidon.applications + helidon-se + 4.0.0-SNAPSHOT + ../../../applications/se/pom.xml + + io.helidon.examples.webserver + helidon-examples-webserver-threads + 4.0.0-SNAPSHOT + + + io.helidon.examples.webserver.threads.Main + + + + + io.helidon.webserver + helidon-webserver + + + io.helidon.webclient + helidon-webclient + + + io.helidon.config + helidon-config-yaml + + + io.helidon.logging + helidon-logging-jul + runtime + + + org.junit.jupiter + junit-jupiter-api + test + + + org.hamcrest + hamcrest-all + test + + + io.helidon.webserver.testing.junit5 + helidon-webserver-testing-junit5 + test + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-libs + + + + + + diff --git a/examples/webserver/threads/src/main/java/io/helidon/examples/webserver/threads/Main.java b/examples/webserver/threads/src/main/java/io/helidon/examples/webserver/threads/Main.java new file mode 100644 index 00000000000..41e008633c1 --- /dev/null +++ b/examples/webserver/threads/src/main/java/io/helidon/examples/webserver/threads/Main.java @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.examples.webserver.threads; + +import io.helidon.logging.common.LogConfig; +import io.helidon.config.Config; +import io.helidon.webclient.api.WebClient; +import io.helidon.webserver.WebServer; +import io.helidon.webserver.http.HttpRouting; + +/** + * The application main class. + */ +public class Main { + + static WebClient webclient; + + /** + * Cannot be instantiated. + */ + private Main() { + } + + /** + * Application main entry point. + * @param args command line arguments. + */ + public static void main(String[] args) { + + // load logging configuration + LogConfig.configureRuntime(); + + // initialize global config from default configuration + Config config = Config.create(); + Config.global(config); + + WebServer webserver = WebServer.builder() + .config(config.get("server")) + .routing(Main::routing) + .build() + .start(); + + // Construct webclient here using port of running server + webclient = WebClient.builder() + .baseUri("http://localhost:" + webserver.port() + "/thread") + .build(); + + System.out.println("WEB server is up! http://localhost:" + webserver.port() + "/thread"); + } + + + /** + * Updates HTTP Routing. + */ + static void routing(HttpRouting.Builder routing) { + routing + .register("/thread", new ThreadService()); + } +} \ No newline at end of file diff --git a/examples/webserver/threads/src/main/java/io/helidon/examples/webserver/threads/ThreadService.java b/examples/webserver/threads/src/main/java/io/helidon/examples/webserver/threads/ThreadService.java new file mode 100644 index 00000000000..60ece4b13b3 --- /dev/null +++ b/examples/webserver/threads/src/main/java/io/helidon/examples/webserver/threads/ThreadService.java @@ -0,0 +1,208 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.examples.webserver.threads; + +import java.lang.System.Logger.Level; +import java.util.ArrayList; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; + +import io.helidon.common.configurable.ThreadPoolSupplier; +import io.helidon.config.Config; +import io.helidon.http.Status; +import io.helidon.webclient.api.ClientResponseTyped; +import io.helidon.webclient.api.WebClient; +import io.helidon.webserver.http.HttpRules; +import io.helidon.webserver.http.HttpService; +import io.helidon.webserver.http.ServerRequest; +import io.helidon.webserver.http.ServerResponse; + +class ThreadService implements HttpService { + + private static final System.Logger LOGGER = System.getLogger(ThreadService.class.getName()); + private static final Random rand = new Random(System.currentTimeMillis()); + + // ThreadPool of platform threads. + private static ExecutorService platformExecutorService; + // Executor of virtual threads. + private static ExecutorService virtualExecutorService; + + /** + * The config value for the key {@code greeting}. + */ + + ThreadService() { + this(Config.global().get("app")); + } + + ThreadService(Config appConfig) { + /* + * We create two executor services. One is a thread pool of platform threads. + * The second is a virtual thread executor service. + * See `application.yaml` for configuration of each of these. + */ + ThreadPoolSupplier platformThreadSupplier = ThreadPoolSupplier.builder() + .config(appConfig.get("application-platform-executor")) + .build(); + platformExecutorService = platformThreadSupplier.get(); + + ThreadPoolSupplier virtualThreadSupplier = ThreadPoolSupplier.builder() + .config(appConfig.get("application-virtual-executor")) + .build(); + virtualExecutorService = virtualThreadSupplier.get(); + } + + @Override + public void routing(HttpRules rules) { + rules + .get("/compute", this::computeHandler) + .get("/compute/{iterations}", this::computeHandler) + .get("/fanout", this::fanOutHandler) + .get("/fanout/{count}", this::fanOutHandler) + .get("/sleep", this::sleepHandler) + .get("/sleep/{seconds}", this::sleepHandler); + } + + /** + * Perform a CPU intensive operation. + * The optional path parameter controls the number of iterations of the computation. The more + * iterations the longer it will take. + * + * @param request server request + * @param response server response + */ + private void computeHandler(ServerRequest request, ServerResponse response) { + int iterations = request.path().pathParameters().first("iterations").asInt().orElse(1); + try { + // We execute the computation on a platform thread. This prevents unbounded obstruction of virtual + // threads, plus provides us the ability to limit the number of concurrent computation requests + // we handle by limiting the thread pool work queue length (as defined in application.yaml) + Future future = platformExecutorService.submit(() -> compute(iterations)); + response.send(future.get().toString()); + } catch (RejectedExecutionException e) { + // Work queue is full! We reject the request + LOGGER.log(Level.WARNING, e); + response.status(Status.SERVICE_UNAVAILABLE_503).send("Server busy"); + } catch (ExecutionException | InterruptedException e) { + LOGGER.log(Level.ERROR, e); + response.status(Status.INTERNAL_SERVER_ERROR_500).send(); + } + } + + /** + * Sleep for a specified number of seconds. + * The optional path parameter controls the number of seconds to sleep. Defaults to 1 + * + * @param request server request + * @param response server response + */ + private void sleepHandler(ServerRequest request, ServerResponse response) { + int seconds = request.path().pathParameters().first("seconds").asInt().orElse(1); + response.send(String.valueOf(sleep(seconds))); + } + + /** + * Fan out a number of remote requests in parallel. + * The optional path parameter controls the number of parallel requests to make. + * + * @param request server request + * @param response server response + */ + private void fanOutHandler(ServerRequest request, ServerResponse response) { + int count = request.path().pathParameters().first("count").asInt().orElse(1); + LOGGER.log(Level.INFO, "Fanning out " + count + " parallel requests"); + // We simulate multiple client requests running in parallel by calling our sleep endpoint. + try { + // For this we use our virtual thread based executor. We submit the work and save the Futures + var futures = new ArrayList>(); + for (int i = 0; i < count; i++) { + futures.add(virtualExecutorService.submit(() -> callRemote(rand.nextInt(5)))); + } + + // After work has been submitted we loop through the future and block getting the results. + // We aggregate the results in a list of Strings + var responses = new ArrayList(); + for (var future : futures) { + try { + responses.add(future.get()); + } catch (InterruptedException e) { + responses.add(e.getMessage()); + } + } + + // All parallel calls are complete! + response.send(String.join(":", responses)); + } catch (ExecutionException e) { + LOGGER.log(Level.ERROR, e); + response.status(Status.INTERNAL_SERVER_ERROR_500).send(); + } + } + + /** + * Simulate a remote client call by calling this server's sleep endpoint + * + * @param seconds number of seconds the endpoint should sleep. + * @return string response from client + */ + private String callRemote(int seconds) { + LOGGER.log(Level.INFO, Thread.currentThread() + ": Calling remote sleep for " + seconds + "s"); + WebClient client = Main.webclient; + ClientResponseTyped response = client.get("/sleep/" + seconds).request(String.class); + if (response.status().equals(Status.OK_200)) { + return response.entity(); + } + return response.status().toString(); + } + + /** + * Sleep current thread + * + * @param seconds number of seconds to sleep + * @return number of seconds requested to sleep + */ + private int sleep(int seconds) { + try { + Thread.sleep(seconds * 1_000L); + } catch (InterruptedException e) { + LOGGER.log(Level.WARNING, e); + } + return seconds; + } + + /** + * Perform a CPU intensive computation + * + * @param iterations: number of times to perform computation + * @return result of computation + */ + private double compute(int iterations) { + LOGGER.log(Level.INFO, Thread.currentThread() + ": Computing with " + iterations + " iterations"); + double d = 123456789.123456789 * rand.nextInt(100); + for (int i = 0; i < iterations; i++) { + for (int n = 0; n < 1_000_000; n++) { + for (int j = 0; j < 5; j++) { + d = Math.tan(d); + d = Math.atan(d); + } + } + } + return d; + } +} diff --git a/examples/webserver/threads/src/main/java/io/helidon/examples/webserver/threads/package-info.java b/examples/webserver/threads/src/main/java/io/helidon/examples/webserver/threads/package-info.java new file mode 100644 index 00000000000..f7dfc7e8a4e --- /dev/null +++ b/examples/webserver/threads/src/main/java/io/helidon/examples/webserver/threads/package-info.java @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.examples.webserver.threads; diff --git a/examples/webserver/threads/src/main/resources/application.yaml b/examples/webserver/threads/src/main/resources/application.yaml new file mode 100644 index 00000000000..05ee0f283e5 --- /dev/null +++ b/examples/webserver/threads/src/main/resources/application.yaml @@ -0,0 +1,29 @@ +# +# Copyright (c) 2024 Oracle and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +server: + port: 8080 + host: 0.0.0.0 + +app: + application-platform-executor: + thread-name-prefix: "application-platform-executor-" + core-pool-size: 1 + max-pool-size: 2 + queue-capacity: 10 + application-virtual-executor: + thread-name-prefix: "application-virtual-executor-" + virtual-threads: true diff --git a/examples/webserver/threads/src/main/resources/logging.properties b/examples/webserver/threads/src/main/resources/logging.properties new file mode 100644 index 00000000000..5fddb7f3d50 --- /dev/null +++ b/examples/webserver/threads/src/main/resources/logging.properties @@ -0,0 +1,22 @@ +# +# Copyright (c) 2024 Oracle and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +handlers=io.helidon.logging.jul.HelidonConsoleHandler +java.util.logging.SimpleFormatter.format=%1$tY.%1$tm.%1$td %1$tH:%1$tM:%1$tS.%1$tL %5$s%6$s%n +# Global logging level. Can be overridden by specific loggers +.level=INFO + +io.helidon.common.configurable.ThreadPool.level=ALL diff --git a/examples/webserver/threads/src/test/java/io/helidon/examples/webserver/threads/MainTest.java b/examples/webserver/threads/src/test/java/io/helidon/examples/webserver/threads/MainTest.java new file mode 100644 index 00000000000..bbe78c10f9a --- /dev/null +++ b/examples/webserver/threads/src/test/java/io/helidon/examples/webserver/threads/MainTest.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.examples.webserver.threads; + +import io.helidon.http.Status; +import io.helidon.webclient.api.HttpClientResponse; +import io.helidon.webclient.api.WebClient; +import io.helidon.webserver.http.HttpRouting; +import io.helidon.webserver.testing.junit5.ServerTest; +import io.helidon.webserver.testing.junit5.SetUpRoute; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +@ServerTest +class MainTest { + private final WebClient client; + + protected MainTest(WebClient client) { + this.client = client; + Main.webclient = this.client; // Needed for ThreadService to make calls + } + + @SetUpRoute + static void routing(HttpRouting.Builder builder) { + Main.routing(builder); + } + + @Test + void testFanOut() { + try (HttpClientResponse response = client.get("/thread/fanout/2").request()) { + assertThat(response.status(), is(Status.OK_200)); + } + } + + @Test + void testCompute() { + try (HttpClientResponse response = client.get("/thread/compute").request()) { + assertThat(response.status(), is(Status.OK_200)); + } + } +} \ No newline at end of file