From b69d89c95a26af07c94b5c30ebfe161d9a873c69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Thu, 19 Mar 2020 14:50:23 +0100 Subject: [PATCH] [Reactive] Apply bugfixes from #1511 --- .../main/java/io/helidon/common/reactive/Multi.java | 10 +++++----- .../helidon/common/reactive/MultiFlatMapIterable.java | 3 +++ .../common/reactive/MultiFlatMapPublisher.java | 3 +-- .../common/reactive/MultiOnErrorResumeWith.java | 4 +++- .../main/java/io/helidon/common/reactive/Single.java | 8 ++++---- .../helidon/common/reactive/SingleOnErrorResume.java | 4 +++- .../common/reactive/SingleOnErrorResumeWith.java | 4 +++- .../common/reactive/MultiOnErrorResumeTest.java | 11 +++++++++++ .../common/reactive/MultiOnErrorResumeWithTest.java | 11 +++++++++++ .../common/reactive/SingleOnErrorResumeTest.java | 11 +++++++++++ .../common/reactive/SingleOnErrorResumeWithTest.java | 11 +++++++++++ 11 files changed, 66 insertions(+), 14 deletions(-) diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/Multi.java b/common/reactive/src/main/java/io/helidon/common/reactive/Multi.java index 982eef00902..cdc82edb0d4 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/Multi.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/Multi.java @@ -461,7 +461,7 @@ static Multi never() { * @param item type * @return Multi */ - static Multi concat(Multi firstMulti, Multi secondMulti) { + static Multi concat(Flow.Publisher firstMulti, Flow.Publisher secondMulti) { return ConcatPublisher.create(firstMulti, secondMulti); } @@ -484,15 +484,15 @@ default Multi onTerminate(Runnable onTerminate) { /** * Executes given {@link java.lang.Runnable} when onComplete signal is received. * - * @param onTerminate {@link java.lang.Runnable} to be executed. + * @param onComplete {@link java.lang.Runnable} to be executed. * @return Multi */ - default Multi onComplete(Runnable onTerminate) { + default Multi onComplete(Runnable onComplete) { return new MultiTappedPublisher<>(this, null, null, null, - onTerminate, + onComplete, null, null); } @@ -500,7 +500,7 @@ default Multi onComplete(Runnable onTerminate) { /** * Executes given {@link java.lang.Runnable} when onError signal is received. * - * @param onErrorConsumer {@link java.lang.Runnable} to be executed. + * @param onErrorConsumer {@link java.util.function.Consumer} to be executed. * @return Multi */ default Multi onError(Consumer onErrorConsumer) { diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/MultiFlatMapIterable.java b/common/reactive/src/main/java/io/helidon/common/reactive/MultiFlatMapIterable.java index 20536fe3957..b9f46050d96 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/MultiFlatMapIterable.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/MultiFlatMapIterable.java @@ -194,6 +194,7 @@ void drain() { hasNext = iterator.hasNext(); } catch (Throwable ex) { canceled = true; + upstream.cancel(); downstream.onError(ex); continue; } @@ -222,6 +223,7 @@ void drain() { "The iterator returned a null item"); } catch (Throwable ex) { canceled = true; + upstream.cancel(); downstream.onError(ex); continue outer; } @@ -242,6 +244,7 @@ void drain() { hasNext = iterator.hasNext(); } catch (Throwable ex) { canceled = true; + upstream.cancel(); downstream.onError(ex); continue outer; } diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/MultiFlatMapPublisher.java b/common/reactive/src/main/java/io/helidon/common/reactive/MultiFlatMapPublisher.java index 629fd54fe5e..e10f0f5f16b 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/MultiFlatMapPublisher.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/MultiFlatMapPublisher.java @@ -407,11 +407,10 @@ void drainLoop() { continue; } } - - emitted = e; } } + emitted = e; missed = addAndGet(-missed); if (missed == 0) { break; diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/MultiOnErrorResumeWith.java b/common/reactive/src/main/java/io/helidon/common/reactive/MultiOnErrorResumeWith.java index 4a5dbb5b4e0..17d527c35b9 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/MultiOnErrorResumeWith.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/MultiOnErrorResumeWith.java @@ -93,7 +93,9 @@ public void onError(Throwable throwable) { publisher = Objects.requireNonNull(fallbackFunction.apply(throwable), "The fallback function returned a null Flow.Publisher"); } catch (Throwable ex) { - ex.addSuppressed(throwable); + if (ex != throwable) { + ex.addSuppressed(throwable); + } downstream.onError(ex); return; } diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/Single.java b/common/reactive/src/main/java/io/helidon/common/reactive/Single.java index 51115282b2c..9241351283c 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/Single.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/Single.java @@ -313,15 +313,15 @@ default Single onTerminate(Runnable onTerminate) { /** * Executes given {@link java.lang.Runnable} when onComplete signal is received. * - * @param onTerminate {@link java.lang.Runnable} to be executed. + * @param onComplete {@link java.lang.Runnable} to be executed. * @return Single */ - default Single onComplete(Runnable onTerminate) { + default Single onComplete(Runnable onComplete) { return new SingleTappedPublisher<>(this, null, null, null, - onTerminate, + onComplete, null, null); } @@ -329,7 +329,7 @@ default Single onComplete(Runnable onTerminate) { /** * Executes given {@link java.lang.Runnable} when onError signal is received. * - * @param onErrorConsumer {@link java.lang.Runnable} to be executed. + * @param onErrorConsumer {@link java.util.function.Consumer} to be executed. * @return Single */ default Single onError(Consumer onErrorConsumer) { diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/SingleOnErrorResume.java b/common/reactive/src/main/java/io/helidon/common/reactive/SingleOnErrorResume.java index 77bf3b6c8bd..9c7279df83e 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/SingleOnErrorResume.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/SingleOnErrorResume.java @@ -77,7 +77,9 @@ public void onError(Throwable throwable) { fallback = Objects.requireNonNull(fallbackFunction.apply(throwable), "The fallback function returned a null item"); } catch (Throwable ex) { - ex.addSuppressed(throwable); + if (ex != throwable) { + ex.addSuppressed(throwable); + } error(ex); return; } diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/SingleOnErrorResumeWith.java b/common/reactive/src/main/java/io/helidon/common/reactive/SingleOnErrorResumeWith.java index 092cee1b1b4..65d300a0d86 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/SingleOnErrorResumeWith.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/SingleOnErrorResumeWith.java @@ -82,7 +82,9 @@ public void onError(Throwable throwable) { fallback = Objects.requireNonNull(fallbackFunction.apply(throwable), "The fallback function returned a null Single"); } catch (Throwable ex) { - ex.addSuppressed(throwable); + if (ex != throwable) { + ex.addSuppressed(throwable); + } error(ex); return; } diff --git a/common/reactive/src/test/java/io/helidon/common/reactive/MultiOnErrorResumeTest.java b/common/reactive/src/test/java/io/helidon/common/reactive/MultiOnErrorResumeTest.java index 5658a02a885..fff98cdd603 100644 --- a/common/reactive/src/test/java/io/helidon/common/reactive/MultiOnErrorResumeTest.java +++ b/common/reactive/src/test/java/io/helidon/common/reactive/MultiOnErrorResumeTest.java @@ -76,4 +76,15 @@ public void failAfterItemsBackpressure() { .request(1) .assertResult(1, 2, 3, 4); } + + @Test + public void noSelfSuppressionFailure() { + TestSubscriber ts = new TestSubscriber<>(); + + Multi.error(new IllegalArgumentException()) + .onErrorResume(e -> { throw (IllegalArgumentException)e; }) + .subscribe(ts); + + ts.assertFailure(IllegalArgumentException.class); + } } diff --git a/common/reactive/src/test/java/io/helidon/common/reactive/MultiOnErrorResumeWithTest.java b/common/reactive/src/test/java/io/helidon/common/reactive/MultiOnErrorResumeWithTest.java index 1104019035f..a5f92bc0d9b 100644 --- a/common/reactive/src/test/java/io/helidon/common/reactive/MultiOnErrorResumeWithTest.java +++ b/common/reactive/src/test/java/io/helidon/common/reactive/MultiOnErrorResumeWithTest.java @@ -99,4 +99,15 @@ public void failAfterItemsBackpressure() { .request(2) .assertResult(1, 2, 3, 4, 5); } + + @Test + public void noSelfSuppressionFailure() { + TestSubscriber ts = new TestSubscriber<>(); + + Multi.error(new IllegalArgumentException()) + .onErrorResumeWith(e -> { throw (IllegalArgumentException)e; }) + .subscribe(ts); + + ts.assertFailure(IllegalArgumentException.class); + } } diff --git a/common/reactive/src/test/java/io/helidon/common/reactive/SingleOnErrorResumeTest.java b/common/reactive/src/test/java/io/helidon/common/reactive/SingleOnErrorResumeTest.java index 56df5ab6f0d..7a3deea0983 100644 --- a/common/reactive/src/test/java/io/helidon/common/reactive/SingleOnErrorResumeTest.java +++ b/common/reactive/src/test/java/io/helidon/common/reactive/SingleOnErrorResumeTest.java @@ -55,4 +55,15 @@ public void emptySource() { ts.assertResult(); } + + @Test + public void noSelfSuppressionFailure() { + TestSubscriber ts = new TestSubscriber<>(); + + Single.error(new IllegalArgumentException()) + .onErrorResume(e -> { throw (IllegalArgumentException)e; }) + .subscribe(ts); + + ts.assertFailure(IllegalArgumentException.class); + } } diff --git a/common/reactive/src/test/java/io/helidon/common/reactive/SingleOnErrorResumeWithTest.java b/common/reactive/src/test/java/io/helidon/common/reactive/SingleOnErrorResumeWithTest.java index ebd14413608..0ee63225ee2 100644 --- a/common/reactive/src/test/java/io/helidon/common/reactive/SingleOnErrorResumeWithTest.java +++ b/common/reactive/src/test/java/io/helidon/common/reactive/SingleOnErrorResumeWithTest.java @@ -73,4 +73,15 @@ public void emptyFallback() { ts.assertResult(); } + + @Test + public void noSelfSuppressionFailure() { + TestSubscriber ts = new TestSubscriber<>(); + + Single.error(new IllegalArgumentException()) + .onErrorResumeWith(e -> { throw (IllegalArgumentException)e; }) + .subscribe(ts); + + ts.assertFailure(IllegalArgumentException.class); + } }