diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnErrorNext.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnErrorNext.java index dd1198d0a9..8a36aad3ce 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnErrorNext.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnErrorNext.java @@ -18,6 +18,7 @@ import io.reactivex.*; import io.reactivex.exceptions.*; import io.reactivex.functions.Function; +import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.subscriptions.SubscriptionArbiter; import io.reactivex.plugins.RxJavaPlugins; @@ -35,30 +36,36 @@ public FlowableOnErrorNext(Flowable source, @Override protected void subscribeActual(Subscriber s) { OnErrorNextSubscriber parent = new OnErrorNextSubscriber(s, nextSupplier, allowFatal); - s.onSubscribe(parent.arbiter); + s.onSubscribe(parent); source.subscribe(parent); } - static final class OnErrorNextSubscriber implements FlowableSubscriber { + static final class OnErrorNextSubscriber + extends SubscriptionArbiter + implements FlowableSubscriber { + private static final long serialVersionUID = 4063763155303814625L; + final Subscriber actual; + final Function> nextSupplier; + final boolean allowFatal; - final SubscriptionArbiter arbiter; boolean once; boolean done; + long produced; + OnErrorNextSubscriber(Subscriber actual, Function> nextSupplier, boolean allowFatal) { this.actual = actual; this.nextSupplier = nextSupplier; this.allowFatal = allowFatal; - this.arbiter = new SubscriptionArbiter(); } @Override public void onSubscribe(Subscription s) { - arbiter.setSubscription(s); + setSubscription(s); } @Override @@ -66,10 +73,10 @@ public void onNext(T t) { if (done) { return; } - actual.onNext(t); if (!once) { - arbiter.produced(1L); + produced++; } + actual.onNext(t); } @Override @@ -92,18 +99,16 @@ public void onError(Throwable t) { Publisher p; try { - p = nextSupplier.apply(t); + p = ObjectHelper.requireNonNull(nextSupplier.apply(t), "The nextSupplier returned a null Publisher"); } catch (Throwable e) { Exceptions.throwIfFatal(e); actual.onError(new CompositeException(t, e)); return; } - if (p == null) { - NullPointerException npe = new NullPointerException("Publisher is null"); - npe.initCause(t); - actual.onError(npe); - return; + long mainProduced = produced; + if (mainProduced != 0L) { + produced(mainProduced); } p.subscribe(this); diff --git a/src/test/java/io/reactivex/flowable/FlowableNullTests.java b/src/test/java/io/reactivex/flowable/FlowableNullTests.java index 12c7a9b5c1..80f70a75ad 100644 --- a/src/test/java/io/reactivex/flowable/FlowableNullTests.java +++ b/src/test/java/io/reactivex/flowable/FlowableNullTests.java @@ -1648,14 +1648,22 @@ public void onErrorResumeNextFunctionNull() { just1.onErrorResumeNext((Function>)null); } - @Test(expected = NullPointerException.class) + @Test public void onErrorResumeNextFunctionReturnsNull() { - Flowable.error(new TestException()).onErrorResumeNext(new Function>() { - @Override - public Publisher apply(Throwable e) { - return null; - } - }).blockingSubscribe(); + try { + Flowable.error(new TestException()).onErrorResumeNext(new Function>() { + @Override + public Publisher apply(Throwable e) { + return null; + } + }).blockingSubscribe(); + fail("Should have thrown"); + } catch (CompositeException ex) { + List errors = ex.getExceptions(); + TestHelper.assertError(errors, 0, TestException.class); + TestHelper.assertError(errors, 1, NullPointerException.class); + assertEquals(2, errors.size()); + } } @Test(expected = NullPointerException.class)