diff --git a/fault-tolerance/src/main/java/io/helidon/faulttolerance/Fallback.java b/fault-tolerance/src/main/java/io/helidon/faulttolerance/Fallback.java index 730eede8eba..a50771b312e 100644 --- a/fault-tolerance/src/main/java/io/helidon/faulttolerance/Fallback.java +++ b/fault-tolerance/src/main/java/io/helidon/faulttolerance/Fallback.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 Oracle and/or its affiliates. + * Copyright (c) 2020, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -68,9 +68,9 @@ static Fallback create(Function> * @param type of the result * @return a new fallback */ - static Fallback createMulti(Function> fallback) { + static Fallback createMulti(Function> fallback) { Builder builder = builder(); - return builder.fallback(fallback).build(); + return builder.fallbackMulti(fallback).build(); } /** diff --git a/fault-tolerance/src/main/java/io/helidon/faulttolerance/FallbackImpl.java b/fault-tolerance/src/main/java/io/helidon/faulttolerance/FallbackImpl.java index cc99fec158e..660906865cb 100644 --- a/fault-tolerance/src/main/java/io/helidon/faulttolerance/FallbackImpl.java +++ b/fault-tolerance/src/main/java/io/helidon/faulttolerance/FallbackImpl.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 Oracle and/or its affiliates. + * Copyright (c) 2020, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package io.helidon.faulttolerance; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.Flow; import java.util.function.Function; @@ -48,7 +49,12 @@ public Multi invokeMulti(Supplier> supplier) { if (delayedTask.hadData() || errorChecker.shouldSkip(cause)) { return Multi.error(cause); } else { - return Multi.create(fallbackMulti.apply(cause)); + return Multi.create(fallbackMulti.apply(cause)) + .onErrorResumeWith(t2 -> { + // Copy exception structure of Single case + t2.addSuppressed(new CompletionException(throwable)); + return Multi.error(t2); + }); } }); } diff --git a/fault-tolerance/src/test/java/io/helidon/faulttolerance/FallbackTest.java b/fault-tolerance/src/test/java/io/helidon/faulttolerance/FallbackTest.java index 135bb142e95..e951973e365 100644 --- a/fault-tolerance/src/test/java/io/helidon/faulttolerance/FallbackTest.java +++ b/fault-tolerance/src/test/java/io/helidon/faulttolerance/FallbackTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 Oracle and/or its affiliates. + * Copyright (c) 2020, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,13 +17,14 @@ package io.helidon.faulttolerance; import java.util.Arrays; +import java.util.List; import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import io.helidon.common.reactive.Multi; import io.helidon.common.reactive.Single; import io.helidon.config.ConfigException; - import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -43,7 +44,8 @@ void reset() { @Test void testFallback() { - String result = Fallback.create(this::fallback).invoke(this::primary) + String result = Fallback.create(this::fallback) + .invoke(this::primary) .await(1, TimeUnit.SECONDS); assertThat(result, is("fallback")); @@ -62,6 +64,29 @@ void testFallbackFails() { assertThat(suppressedCause, instanceOf(IllegalArgumentException.class)); } + @Test + void testFallbackMulti() { + List result = Fallback.createMulti(this::fallbackMulti) + .invokeMulti(this::primaryMulti) + .collectList() + .await(1, TimeUnit.SECONDS); + + assertThat(result, is(List.of("1", "2", "3"))); + assertThat(primaryCounter.get(), is(1)); + assertThat(fallbackCounter.get(), is(1)); + } + + @Test + void testFallbackMultiFails() { + Multi result = Fallback.createMulti(this::fallbackMultiFail).invokeMulti(this::primaryMulti); + ConfigException exception = FaultToleranceTest.completionException(Single.create(result), ConfigException.class); + Throwable[] suppressed = exception.getSuppressed(); + assertThat("Should have a suppressed exception: " + Arrays.toString(suppressed), suppressed.length, is(1)); + assertThat(suppressed[0], instanceOf(CompletionException.class)); + Throwable suppressedCause = suppressed[0].getCause(); + assertThat(suppressedCause, instanceOf(IllegalArgumentException.class)); + } + private Single primary() { primaryCounter.incrementAndGet(); return Single.error(new IllegalArgumentException("Intentional failure")); @@ -76,4 +101,19 @@ private Single fallbackFail(Throwable throwable) { fallbackCounter.incrementAndGet(); return Single.error(new ConfigException("Intentional failure")); } + + private Multi primaryMulti() { + primaryCounter.incrementAndGet(); + return Multi.error(new IllegalArgumentException("Intentional failure")); + } + + private Multi fallbackMulti(Throwable throwable) { + fallbackCounter.incrementAndGet(); + return Multi.create(List.of("1", "2", "3")); + } + + private Multi fallbackMultiFail(Throwable throwable) { + fallbackCounter.incrementAndGet(); + return Multi.error(new ConfigException("Intentional failure")); + } } \ No newline at end of file