From 0ba8a867a8fdbe783c60cfd27838102cc8f5e724 Mon Sep 17 00:00:00 2001 From: Santiago Pericasgeertsen Date: Tue, 6 Sep 2022 11:04:38 -0400 Subject: [PATCH 1/4] - Ported combined test (FaultToleranceTest) to Nima - Enhanced Async to support executors and a builder - Minor fixes to TimeoutImpl --- .../io/helidon/nima/faulttolerance/Async.java | 53 +++++++++++++++ .../nima/faulttolerance/AsyncImpl.java | 23 ++++++- .../nima/faulttolerance/TimeoutImpl.java | 9 +-- .../nima/faulttolerance/AsyncTest.java | 64 +++++++++++++++++++ .../faulttolerance/FaultToleranceTest.java | 46 ++++++------- 5 files changed, 162 insertions(+), 33 deletions(-) create mode 100644 nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/AsyncTest.java diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/Async.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/Async.java index fa15939996c..c60378b8d55 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/Async.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/Async.java @@ -16,9 +16,13 @@ package io.helidon.nima.faulttolerance; +import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import java.util.function.Supplier; +import io.helidon.common.LazyValue; + /** * Runs synchronous suppliers asynchronously using virtual threads. Includes * convenient static method to avoid creating instances of this class. @@ -55,4 +59,53 @@ static Async create() { static CompletableFuture invokeStatic(Supplier supplier) { return create().invoke(supplier); } + + /** + * A new builder to build a customized {@link Async} instance. + * @return a new builder + */ + static Builder builder() { + return new Builder(); + } + + /** + * Fluent API Builder for {@link Async}. + */ + class Builder implements io.helidon.common.Builder { + private LazyValue executor; + + private Builder() { + } + + @Override + public Async build() { + return new AsyncImpl(this); + } + + /** + * Configure executor service to use for executing tasks asynchronously. + * + * @param executor executor service supplier + * @return updated builder instance + */ + public Builder executor(Supplier executor) { + this.executor = LazyValue.create(Objects.requireNonNull(executor)); + return this; + } + + /** + * Configure executor service to use for executing tasks asynchronously. + * + * @param executor executor service + * @return updated builder instance + */ + public Builder executor(ExecutorService executor) { + this.executor = LazyValue.create(Objects.requireNonNull(executor)); + return this; + } + + LazyValue executor() { + return executor; + } + } } diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/AsyncImpl.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/AsyncImpl.java index 365927b2163..16e282138f1 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/AsyncImpl.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/AsyncImpl.java @@ -17,17 +17,33 @@ package io.helidon.nima.faulttolerance; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.function.Supplier; +import io.helidon.common.LazyValue; + +/** + * Implementation of {@code Async}. If no executor specified in builder, then it will + * use {@link Executors#newVirtualThreadPerTaskExecutor}. Note that this default executor + * is not configurable using Helidon's config. + */ class AsyncImpl implements Async { + private final LazyValue executor; + + AsyncImpl() { + this.executor = LazyValue.create(Executors.newVirtualThreadPerTaskExecutor()); + } - private AsyncImpl() { + AsyncImpl(Builder builder) { + this.executor = builder.executor() != null ? builder.executor() + : LazyValue.create(Executors.newVirtualThreadPerTaskExecutor()); } @Override public CompletableFuture invoke(Supplier supplier) { CompletableFuture result = new CompletableFuture<>(); - Thread.ofVirtual().start(() -> { + executor.get().submit(() -> { try { T t = supplier.get(); result.complete(t); @@ -38,6 +54,9 @@ public CompletableFuture invoke(Supplier supplier) { return result; } + /** + * Default {@code Async} instance that uses {@link Executors#newVirtualThreadPerTaskExecutor}. + */ static final class DefaultAsyncInstance { private static final Async INSTANCE = new AsyncImpl(); diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/TimeoutImpl.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/TimeoutImpl.java index 56da4938852..2b5a5516719 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/TimeoutImpl.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/TimeoutImpl.java @@ -16,7 +16,6 @@ package io.helidon.nima.faulttolerance; -import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; @@ -35,10 +34,8 @@ class TimeoutImpl implements Timeout { private final LazyValue executor; private final boolean currentThread; private final String name; - private final Duration timeout; TimeoutImpl(Builder builder) { - this.timeout = builder.timeout(); this.timeoutMillis = builder.timeout().toMillis(); this.executor = builder.executor(); this.currentThread = builder.currentThread(); @@ -60,7 +57,11 @@ public T invoke(Supplier supplier) { } catch (InterruptedException e) { throw new TimeoutException("Call interrupted", e); } catch (ExecutionException e) { - throw new TimeoutException("Asynchronous execution error", e.getCause()); + // Map java.util.concurrent.TimeoutException to Nima's TimeoutException + if (e.getCause() instanceof java.util.concurrent.TimeoutException) { + throw new TimeoutException("Timeout reached", e.getCause().getCause()); + } + throw new RuntimeException("Asynchronous execution error", e.getCause()); } } else { Thread thisThread = Thread.currentThread(); diff --git a/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/AsyncTest.java b/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/AsyncTest.java new file mode 100644 index 00000000000..29fc77fb818 --- /dev/null +++ b/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/AsyncTest.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2022 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.nima.faulttolerance; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.CoreMatchers.is; + +class AsyncTest { + private static final long WAIT_TIMEOUT_MILLIS = 2000; + + @Test + void testDefaultExecutorCreate() { + Thread thread = testAsync(Async.create()); + assertThat(thread.isVirtual(), is(true)); + } + + @Test + void testDefaultExecutorBuilder(){ + Async async = Async.builder().build(); + Thread thread = testAsync(async); + assertThat(thread.isVirtual(), is(true)); + } + + @Test + void testCustomExecutorBuilder() { + Async async = Async.builder() + .executor(FaultTolerance.executor()) // platform thread executor + .build(); + Thread thread = testAsync(async); + assertThat(thread.isVirtual(), is(false)); + } + + private Thread testAsync(Async async) { + try { + CompletableFuture cf = new CompletableFuture<>(); + async.invoke(() -> { + cf.complete(Thread.currentThread()); + return null; + }); + return cf.get(WAIT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/FaultToleranceTest.java b/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/FaultToleranceTest.java index c9aa0e3f55b..60a9595744b 100644 --- a/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/FaultToleranceTest.java +++ b/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/FaultToleranceTest.java @@ -18,23 +18,16 @@ import java.time.Duration; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertThrows; class FaultToleranceTest { @Test - @Disabled - // TODO fix - void testCustomCombination() throws ExecutionException, InterruptedException, TimeoutException { + void testCustomCombination() { CircuitBreaker breaker = CircuitBreaker.builder() .build(); @@ -46,34 +39,32 @@ void testCustomCombination() throws ExecutionException, InterruptedException, Ti FtHandlerTyped faultTolerance = FaultTolerance.builder() .addBreaker(breaker) .addBulkhead(bulkhead) - .addTimeout(Timeout.builder().timeout(Duration.ofMillis(100)).build()) + .addTimeout(Timeout.builder().timeout(Duration.ofMillis(1000)).build()) .addFallback(Fallback.builder() - .fallback(this::fallback) - .build()) + .fallback(this::fallback) + .build()) .build(); + // First call should not open breaker and execute call back String result = faultTolerance.invoke(this::primary); - assertThat(result, is(MyException.class.getName())); + assertThat(result, is(MyException.class.getName())); // callback called + // Manually open breaker breaker.state(CircuitBreaker.State.OPEN); + assertThat(breaker.state(), is(CircuitBreaker.State.OPEN)); + + // Next call should fail on breaker but still execute fallback result = faultTolerance.invoke(this::primary); assertThat(result, is(CircuitBreakerOpenException.class.getName())); + // Manually close breaker breaker.state(CircuitBreaker.State.CLOSED); + assertThat(breaker.state(), is(CircuitBreaker.State.CLOSED)); + // Second call forces timeout by calling a supplier that blocks indefinitely Manual m = new Manual(); - CompletableFuture mFuture = CompletableFuture.supplyAsync(() -> bulkhead.invoke(m::call)); - - assertThrows(BulkheadException.class, () -> faultTolerance.invoke(this::primary)); - - m.future.complete("result"); - mFuture.get(1, TimeUnit.SECONDS); - - // m = new Manual(); - // result = faultTolerance.invoke(m::call); - // assertThat(result.await(1, TimeUnit.SECONDS), is(TimeoutException.class.getName())); - // - // m.future.complete("hu"); + result = faultTolerance.invoke(m::call); + assertThat(result, is(TimeoutException.class.getName())); // callback called } private String primary() { @@ -81,6 +72,9 @@ private String primary() { } private String fallback(Throwable throwable) { + if (throwable instanceof RuntimeException && throwable.getCause() != null) { + throwable = throwable.getCause(); + } return throwable.getClass().getName(); } @@ -89,7 +83,7 @@ private static class Manual { private String call() { try { - return future.get(); + return future.get(); // blocks indefinitely } catch (Throwable e) { throw new RuntimeException(e); } @@ -97,7 +91,5 @@ private String call() { } private static class MyException extends RuntimeException { - } - } From 47e851df8db71475b67abb3cc66b3db91dc871e7 Mon Sep 17 00:00:00 2001 From: Santiago Pericasgeertsen Date: Thu, 8 Sep 2022 09:01:35 -0400 Subject: [PATCH 2/4] Increase wait time in test for pipeline runs. Signed-off-by: Santiago Pericasgeertsen --- .../io/helidon/nima/faulttolerance/CircuitBreakerTest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/CircuitBreakerTest.java b/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/CircuitBreakerTest.java index 51a788528aa..c65bae11f43 100644 --- a/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/CircuitBreakerTest.java +++ b/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/CircuitBreakerTest.java @@ -25,6 +25,9 @@ import static org.junit.jupiter.api.Assertions.assertThrows; class CircuitBreakerTest { + + private static final long BUSY_WAIT_SLEEP = 100; + @Test void testCircuitBreaker() throws InterruptedException { CircuitBreaker breaker = CircuitBreaker.builder() @@ -52,7 +55,7 @@ void testCircuitBreaker() throws InterruptedException { // need to wait until half open int count = 0; while (count++ < 10) { - Thread.sleep(50); + Thread.sleep(BUSY_WAIT_SLEEP); if (breaker.state() == CircuitBreaker.State.HALF_OPEN) { break; } From 4648d49b80e7a8fd87fffc48bd1d75350ba2ca55 Mon Sep 17 00:00:00 2001 From: Santiago Pericasgeertsen Date: Mon, 12 Sep 2022 10:02:40 -0400 Subject: [PATCH 3/4] Improved testing by introducing a listener for Bulkhead queues. Listeners are called right before blocking on the semaphore and before supplier is ready execute after dequeuing. --- .../helidon/nima/faulttolerance/Bulkhead.java | 55 ++++++++++++------- .../nima/faulttolerance/BulkheadImpl.java | 9 ++- .../nima/faulttolerance/BulkheadTest.java | 16 ++++-- 3 files changed, 55 insertions(+), 25 deletions(-) diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/Bulkhead.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/Bulkhead.java index 6b00287b3bf..e6d6cd86585 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/Bulkhead.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/Bulkhead.java @@ -16,12 +16,10 @@ package io.helidon.nima.faulttolerance; -import java.util.Objects; -import java.util.concurrent.ExecutorService; +import java.util.ArrayList; +import java.util.List; import java.util.function.Supplier; -import io.helidon.common.LazyValue; - /** * Bulkhead protects a resource that cannot serve unlimited parallel * requests. @@ -83,6 +81,30 @@ interface Stats { long waitingQueueSize(); } + /** + * A Bulkhead listener for queueing operations. + */ + interface QueueListener { + + /** + * Called right before blocking on the internal semaphore's queue. + * + * @param supplier the supplier to be enqueued + * @param type of value returned by supplier + */ + default void enqueueing(Supplier supplier) { + } + + /** + * Called after semaphore is acquired and before supplier is called. + * + * @param supplier the supplier to execute + * @param type of value returned by supplier + */ + default void dequeued(Supplier supplier) { + } + } + /** * Fluent API builder for {@link io.helidon.nima.faulttolerance.Bulkhead}. */ @@ -90,10 +112,10 @@ class Builder implements io.helidon.common.Builder { private static final int DEFAULT_LIMIT = 10; private static final int DEFAULT_QUEUE_LENGTH = 10; - private LazyValue executor = FaultTolerance.executor(); private int limit = DEFAULT_LIMIT; private int queueLength = DEFAULT_QUEUE_LENGTH; private String name = "Bulkhead-" + System.identityHashCode(this); + private List listeners = new ArrayList<>(); private Builder() { } @@ -103,16 +125,6 @@ public Bulkhead build() { return new BulkheadImpl(this); } - /** - * Configure executor service to use for executing tasks asynchronously. - * - * @param executor executor service supplier - * @return updated builder instance - */ - public Builder executor(Supplier executor) { - this.executor = LazyValue.create(Objects.requireNonNull(executor)); - return this; - } /** * Maximal number of parallel requests going through this bulkhead. @@ -150,6 +162,11 @@ public Builder name(String name) { return this; } + public Builder addQueueListener(QueueListener listener) { + listeners.add(listener); + return this; + } + int limit() { return limit; } @@ -158,12 +175,12 @@ int queueLength() { return queueLength; } - LazyValue executor() { - return executor; - } - String name() { return name; } + + List queueListeners() { + return listeners; + } } } diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/BulkheadImpl.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/BulkheadImpl.java index 78556d93bb5..359f65d680f 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/BulkheadImpl.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/BulkheadImpl.java @@ -17,6 +17,7 @@ package io.helidon.nima.faulttolerance; import java.lang.System.Logger.Level; +import java.util.List; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -32,11 +33,13 @@ class BulkheadImpl implements Bulkhead { private final AtomicLong callsAccepted = new AtomicLong(0L); private final AtomicLong callsRejected = new AtomicLong(0L); private final AtomicInteger enqueued = new AtomicInteger(); + private final List listeners; BulkheadImpl(Builder builder) { this.inProgress = new Semaphore(builder.limit(), true); this.name = builder.name(); this.maxQueue = builder.queueLength(); + this.listeners = builder.queueListeners(); } @Override @@ -46,7 +49,7 @@ public String name() { @Override public T invoke(Supplier supplier) { - + // execute immediately if semaphore can be acquired if (inProgress.tryAcquire()) { if (LOGGER.isLoggable(Level.DEBUG)) { LOGGER.log(Level.DEBUG, name + " invoke immediate " + supplier); @@ -62,7 +65,11 @@ public T invoke(Supplier supplier) { } try { // block current thread until permit available + listeners.forEach(l -> l.enqueueing(supplier)); inProgress.acquire(); + + // unblocked so we can proceed with execution + listeners.forEach(l -> l.dequeued(supplier)); enqueued.decrementAndGet(); if (LOGGER.isLoggable(Level.DEBUG)) { LOGGER.log(Level.DEBUG, name + " invoking " + supplier); diff --git a/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/BulkheadTest.java b/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/BulkheadTest.java index 2f2705f9d7f..09d35891dfc 100644 --- a/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/BulkheadTest.java +++ b/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/BulkheadTest.java @@ -20,6 +20,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import io.helidon.common.LogConfig; @@ -41,6 +42,8 @@ class BulkheadTest { private static final long WAIT_TIMEOUT_MILLIS = 2000; + private final CountDownLatch enqueuedSubmitted = new CountDownLatch(1); + @BeforeAll static void setupTest() { LogConfig.configureRuntime(); @@ -54,6 +57,12 @@ void testBulkhead() throws InterruptedException, ExecutionException, java.util.c .limit(1) .queueLength(1) .name(name) + .addQueueListener(new Bulkhead.QueueListener() { + @Override + public void enqueueing(Supplier supplier) { + enqueuedSubmitted.countDown(); + } + }) .build(); // Submit first inProgress task @@ -68,11 +77,8 @@ void testBulkhead() throws InterruptedException, ExecutionException, java.util.c // Submit new task that should be queued Task enqueued = new Task(1); - CountDownLatch enqueuedSubmitted = new CountDownLatch(1); - CompletableFuture enqueuedResult = Async.invokeStatic(() -> { - enqueuedSubmitted.countDown(); - return bulkhead.invoke(enqueued::run); - }); + CompletableFuture enqueuedResult = Async.invokeStatic( + () -> bulkhead.invoke(enqueued::run)); // Wait until previous task is "likely" queued if (!enqueuedSubmitted.await(WAIT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { From 62a6780906ea33b14022ce2c65bccff86fe0b5e3 Mon Sep 17 00:00:00 2001 From: Santiago Pericasgeertsen Date: Mon, 12 Sep 2022 13:45:22 -0400 Subject: [PATCH 4/4] Fixed Javadoc problem. Signed-off-by: Santiago Pericasgeertsen --- .../main/java/io/helidon/nima/faulttolerance/Bulkhead.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/Bulkhead.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/Bulkhead.java index e6d6cd86585..a4c033d9d83 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/Bulkhead.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/Bulkhead.java @@ -162,6 +162,12 @@ public Builder name(String name) { return this; } + /** + * Add a queue listener to this bulkhead. + * + * @param listener a queue listener + * @return updated builder instance + */ public Builder addQueueListener(QueueListener listener) { listeners.add(listener); return this;