Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed a few problems with Fallback and Multi's in SE #4157

Merged
merged 1 commit into from
May 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
* Copyright (c) 2020, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -68,9 +68,9 @@ static <T> Fallback<T> create(Function<Throwable, ? extends CompletionStage<T>>
* @param <T> type of the result
* @return a new fallback
*/
static <T> Fallback<T> createMulti(Function<Throwable, ? extends CompletionStage<T>> fallback) {
static <T> Fallback<T> createMulti(Function<Throwable, ? extends Flow.Publisher<T>> fallback) {
Builder<T> builder = builder();
return builder.fallback(fallback).build();
return builder.fallbackMulti(fallback).build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
* Copyright (c) 2020, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package io.helidon.faulttolerance;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.function.Function;
Expand Down Expand Up @@ -48,7 +49,12 @@ public Multi<T> invokeMulti(Supplier<? extends Flow.Publisher<T>> supplier) {
if (delayedTask.hadData() || errorChecker.shouldSkip(cause)) {
return Multi.error(cause);
} else {
return Multi.create(fallbackMulti.apply(cause));
return Multi.create(fallbackMulti.apply(cause))
.onErrorResumeWith(t2 -> {
// Copy exception structure of Single<T> case
t2.addSuppressed(new CompletionException(throwable));
return Multi.error(t2);
});
}
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
* Copyright (c) 2020, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,13 +17,14 @@
package io.helidon.faulttolerance;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.Single;
import io.helidon.config.ConfigException;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand All @@ -43,7 +44,8 @@ void reset() {

@Test
void testFallback() {
String result = Fallback.create(this::fallback).invoke(this::primary)
String result = Fallback.create(this::fallback)
.invoke(this::primary)
.await(1, TimeUnit.SECONDS);

assertThat(result, is("fallback"));
Expand All @@ -62,6 +64,29 @@ void testFallbackFails() {
assertThat(suppressedCause, instanceOf(IllegalArgumentException.class));
}

@Test
void testFallbackMulti() {
List<String> result = Fallback.createMulti(this::fallbackMulti)
.invokeMulti(this::primaryMulti)
.collectList()
.await(1, TimeUnit.SECONDS);

assertThat(result, is(List.of("1", "2", "3")));
assertThat(primaryCounter.get(), is(1));
assertThat(fallbackCounter.get(), is(1));
}

@Test
void testFallbackMultiFails() {
Multi<String> result = Fallback.createMulti(this::fallbackMultiFail).invokeMulti(this::primaryMulti);
ConfigException exception = FaultToleranceTest.completionException(Single.create(result), ConfigException.class);
Throwable[] suppressed = exception.getSuppressed();
assertThat("Should have a suppressed exception: " + Arrays.toString(suppressed), suppressed.length, is(1));
assertThat(suppressed[0], instanceOf(CompletionException.class));
Throwable suppressedCause = suppressed[0].getCause();
assertThat(suppressedCause, instanceOf(IllegalArgumentException.class));
}

private Single<String> primary() {
primaryCounter.incrementAndGet();
return Single.error(new IllegalArgumentException("Intentional failure"));
Expand All @@ -76,4 +101,19 @@ private Single<String> fallbackFail(Throwable throwable) {
fallbackCounter.incrementAndGet();
return Single.error(new ConfigException("Intentional failure"));
}

private Multi<String> primaryMulti() {
primaryCounter.incrementAndGet();
return Multi.error(new IllegalArgumentException("Intentional failure"));
}

private Multi<String> fallbackMulti(Throwable throwable) {
fallbackCounter.incrementAndGet();
return Multi.create(List.of("1", "2", "3"));
}

private Multi<String> fallbackMultiFail(Throwable throwable) {
fallbackCounter.incrementAndGet();
return Multi.error(new ConfigException("Intentional failure"));
}
}