Skip to content

Commit

Permalink
[Reactive] Apply bugfixes from helidon-io#1511
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Mar 19, 2020
1 parent 9b10b52 commit b69d89c
Show file tree
Hide file tree
Showing 11 changed files with 66 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ static <T> Multi<T> never() {
* @param <T> item type
* @return Multi
*/
static <T> Multi<T> concat(Multi<T> firstMulti, Multi<T> secondMulti) {
static <T> Multi<T> concat(Flow.Publisher<T> firstMulti, Flow.Publisher<T> secondMulti) {
return ConcatPublisher.create(firstMulti, secondMulti);
}

Expand All @@ -484,23 +484,23 @@ default Multi<T> onTerminate(Runnable onTerminate) {
/**
* Executes given {@link java.lang.Runnable} when onComplete signal is received.
*
* @param onTerminate {@link java.lang.Runnable} to be executed.
* @param onComplete {@link java.lang.Runnable} to be executed.
* @return Multi
*/
default Multi<T> onComplete(Runnable onTerminate) {
default Multi<T> onComplete(Runnable onComplete) {
return new MultiTappedPublisher<>(this,
null,
null,
null,
onTerminate,
onComplete,
null,
null);
}

/**
* Executes given {@link java.lang.Runnable} when onError signal is received.
*
* @param onErrorConsumer {@link java.lang.Runnable} to be executed.
* @param onErrorConsumer {@link java.util.function.Consumer} to be executed.
* @return Multi
*/
default Multi<T> onError(Consumer<Throwable> onErrorConsumer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ void drain() {
hasNext = iterator.hasNext();
} catch (Throwable ex) {
canceled = true;
upstream.cancel();
downstream.onError(ex);
continue;
}
Expand Down Expand Up @@ -222,6 +223,7 @@ void drain() {
"The iterator returned a null item");
} catch (Throwable ex) {
canceled = true;
upstream.cancel();
downstream.onError(ex);
continue outer;
}
Expand All @@ -242,6 +244,7 @@ void drain() {
hasNext = iterator.hasNext();
} catch (Throwable ex) {
canceled = true;
upstream.cancel();
downstream.onError(ex);
continue outer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,11 +407,10 @@ void drainLoop() {
continue;
}
}

emitted = e;
}
}

emitted = e;
missed = addAndGet(-missed);
if (missed == 0) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ public void onError(Throwable throwable) {
publisher = Objects.requireNonNull(fallbackFunction.apply(throwable),
"The fallback function returned a null Flow.Publisher");
} catch (Throwable ex) {
ex.addSuppressed(throwable);
if (ex != throwable) {
ex.addSuppressed(throwable);
}
downstream.onError(ex);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,23 +313,23 @@ default Single<T> onTerminate(Runnable onTerminate) {
/**
* Executes given {@link java.lang.Runnable} when onComplete signal is received.
*
* @param onTerminate {@link java.lang.Runnable} to be executed.
* @param onComplete {@link java.lang.Runnable} to be executed.
* @return Single
*/
default Single<T> onComplete(Runnable onTerminate) {
default Single<T> onComplete(Runnable onComplete) {
return new SingleTappedPublisher<>(this,
null,
null,
null,
onTerminate,
onComplete,
null,
null);
}

/**
* Executes given {@link java.lang.Runnable} when onError signal is received.
*
* @param onErrorConsumer {@link java.lang.Runnable} to be executed.
* @param onErrorConsumer {@link java.util.function.Consumer} to be executed.
* @return Single
*/
default Single<T> onError(Consumer<Throwable> onErrorConsumer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ public void onError(Throwable throwable) {
fallback = Objects.requireNonNull(fallbackFunction.apply(throwable),
"The fallback function returned a null item");
} catch (Throwable ex) {
ex.addSuppressed(throwable);
if (ex != throwable) {
ex.addSuppressed(throwable);
}
error(ex);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ public void onError(Throwable throwable) {
fallback = Objects.requireNonNull(fallbackFunction.apply(throwable),
"The fallback function returned a null Single");
} catch (Throwable ex) {
ex.addSuppressed(throwable);
if (ex != throwable) {
ex.addSuppressed(throwable);
}
error(ex);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,15 @@ public void failAfterItemsBackpressure() {
.request(1)
.assertResult(1, 2, 3, 4);
}

@Test
public void noSelfSuppressionFailure() {
TestSubscriber<Integer> ts = new TestSubscriber<>();

Multi.<Integer>error(new IllegalArgumentException())
.onErrorResume(e -> { throw (IllegalArgumentException)e; })
.subscribe(ts);

ts.assertFailure(IllegalArgumentException.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,15 @@ public void failAfterItemsBackpressure() {
.request(2)
.assertResult(1, 2, 3, 4, 5);
}

@Test
public void noSelfSuppressionFailure() {
TestSubscriber<Integer> ts = new TestSubscriber<>();

Multi.<Integer>error(new IllegalArgumentException())
.onErrorResumeWith(e -> { throw (IllegalArgumentException)e; })
.subscribe(ts);

ts.assertFailure(IllegalArgumentException.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,15 @@ public void emptySource() {

ts.assertResult();
}

@Test
public void noSelfSuppressionFailure() {
TestSubscriber<Integer> ts = new TestSubscriber<>();

Single.<Integer>error(new IllegalArgumentException())
.onErrorResume(e -> { throw (IllegalArgumentException)e; })
.subscribe(ts);

ts.assertFailure(IllegalArgumentException.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,15 @@ public void emptyFallback() {

ts.assertResult();
}

@Test
public void noSelfSuppressionFailure() {
TestSubscriber<Integer> ts = new TestSubscriber<>();

Single.<Integer>error(new IllegalArgumentException())
.onErrorResumeWith(e -> { throw (IllegalArgumentException)e; })
.subscribe(ts);

ts.assertFailure(IllegalArgumentException.class);
}
}

0 comments on commit b69d89c

Please sign in to comment.