diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java index d332fb5e35..cbdda4a4bc 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java @@ -474,7 +474,7 @@ public R poll() throws Exception { return null; } T[] a = (T[])queue.poll(); - R r = combiner.apply(a); + R r = ObjectHelper.requireNonNull(combiner.apply(a), "The combiner returned a null value"); ((CombineLatestInnerSubscriber)e).requestOne(); return r; } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableInternalHelper.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableInternalHelper.java index 4127d9b9a5..d516b02ab0 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableInternalHelper.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableInternalHelper.java @@ -20,7 +20,7 @@ import io.reactivex.*; import io.reactivex.flowables.ConnectableFlowable; import io.reactivex.functions.*; -import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.functions.*; /** * Helper utility class to support Flowable with inner classes. @@ -77,7 +77,8 @@ static final class ItemDelayFunction implements Function> @Override public Publisher apply(final T v) throws Exception { - return new FlowableTakePublisher(itemDelay.apply(v), 1).map(Functions.justFunction(v)).defaultIfEmpty(v); + Publisher p = ObjectHelper.requireNonNull(itemDelay.apply(v), "The itemDelay returned a null Publisher"); + return new FlowableTakePublisher(p, 1).map(Functions.justFunction(v)).defaultIfEmpty(v); } } @@ -164,7 +165,7 @@ static final class FlatMapWithCombinerOuter implements Function apply(final T t) throws Exception { @SuppressWarnings("unchecked") - Publisher u = (Publisher)mapper.apply(t); + Publisher u = (Publisher)ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null Publisher"); return new FlowableMapPublisher(u, new FlatMapWithCombinerInner(combiner, t)); } } @@ -184,7 +185,7 @@ static final class FlatMapIntoIterable implements Function @Override public Publisher apply(T t) throws Exception { - return new FlowableFromIterable(mapper.apply(t)); + return new FlowableFromIterable(ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null Iterable")); } } @@ -317,7 +318,8 @@ static final class ReplayFunction implements Function, Publish @Override public Publisher apply(Flowable t) throws Exception { - return Flowable.fromPublisher(selector.apply(t)).observeOn(scheduler); + Publisher p = ObjectHelper.requireNonNull(selector.apply(t), "The selector returned a null Publisher"); + return Flowable.fromPublisher(p).observeOn(scheduler); } } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableMapNotification.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableMapNotification.java index faca674aec..0f0ec91d68 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableMapNotification.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableMapNotification.java @@ -18,7 +18,7 @@ import org.reactivestreams.Subscriber; import io.reactivex.Flowable; -import io.reactivex.exceptions.Exceptions; +import io.reactivex.exceptions.*; import io.reactivex.functions.Function; import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.subscribers.SinglePostCompleteSubscriber; @@ -87,7 +87,7 @@ public void onError(Throwable t) { p = ObjectHelper.requireNonNull(onErrorMapper.apply(t), "The onError publisher returned is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); - actual.onError(e); + actual.onError(new CompositeException(t, e)); return; } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableUsing.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableUsing.java index 2a50f50b3c..9cc64d860f 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableUsing.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableUsing.java @@ -21,6 +21,7 @@ import io.reactivex.*; import io.reactivex.exceptions.*; import io.reactivex.functions.*; +import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.subscriptions.*; import io.reactivex.plugins.RxJavaPlugins; @@ -54,7 +55,7 @@ public void subscribeActual(Subscriber s) { Publisher source; try { - source = sourceSupplier.apply(resource); + source = ObjectHelper.requireNonNull(sourceSupplier.apply(resource), "The sourceSupplier returned a null Publisher"); } catch (Throwable e) { Exceptions.throwIfFatal(e); try { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromMany.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromMany.java index 772aebbf5d..2108dadf1a 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromMany.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromMany.java @@ -168,7 +168,7 @@ public void onNext(T t) { R v; try { - v = ObjectHelper.requireNonNull(combiner.apply(objects), "combiner returned a null value"); + v = ObjectHelper.requireNonNull(combiner.apply(objects), "The combiner returned a null value"); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); cancel(); @@ -297,7 +297,7 @@ public void dispose() { final class SingletonArrayFunc implements Function { @Override public R apply(T t) throws Exception { - return combiner.apply(new Object[] { t }); + return ObjectHelper.requireNonNull(combiner.apply(new Object[] { t }), "The combiner returned a null value"); } } } diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipArray.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipArray.java index 46f0179ddd..eeda31ac19 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipArray.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipArray.java @@ -192,7 +192,7 @@ public void onComplete() { final class SingletonArrayFunc implements Function { @Override public R apply(T t) throws Exception { - return zipper.apply(new Object[] { t }); + return ObjectHelper.requireNonNull(zipper.apply(new Object[] { t }), "The zipper returned a null value"); } } } diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipIterable.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipIterable.java index c6e6596809..7815bab095 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipIterable.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipIterable.java @@ -19,6 +19,7 @@ import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Function; import io.reactivex.internal.disposables.EmptyDisposable; +import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.operators.maybe.MaybeZipArray.ZipCoordinator; public final class MaybeZipIterable extends Maybe { @@ -81,7 +82,7 @@ protected void subscribeActual(MaybeObserver observer) { final class SingletonArrayFunc implements Function { @Override public R apply(T t) throws Exception { - return zipper.apply(new Object[] { t }); + return ObjectHelper.requireNonNull(zipper.apply(new Object[] { t }), "The zipper returned a null value"); } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java index 02b6c9d845..b5fe5f48b5 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java @@ -77,7 +77,8 @@ static final class ItemDelayFunction implements Function apply(final T v) throws Exception { - return new ObservableTake(itemDelay.apply(v), 1).map(Functions.justFunction(v)).defaultIfEmpty(v); + ObservableSource o = ObjectHelper.requireNonNull(itemDelay.apply(v), "The itemDelay returned a null ObservableSource"); + return new ObservableTake(o, 1).map(Functions.justFunction(v)).defaultIfEmpty(v); } } @@ -165,7 +166,7 @@ static final class FlatMapWithCombinerOuter implements Function apply(final T t) throws Exception { @SuppressWarnings("unchecked") - ObservableSource u = (ObservableSource)mapper.apply(t); + ObservableSource u = (ObservableSource)ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource"); return new ObservableMap(u, new FlatMapWithCombinerInner(combiner, t)); } } @@ -185,7 +186,7 @@ static final class FlatMapIntoIterable implements Function apply(T t) throws Exception { - return new ObservableFromIterable(mapper.apply(t)); + return new ObservableFromIterable(ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null Iterable")); } } @@ -319,7 +320,7 @@ static final class ObservableMapper implements Function> { @Override public Observable apply(T t) throws Exception { return RxJavaPlugins.onAssembly(new SingleToObservable( - ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null value"))); + ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null SingleSource"))); } } @@ -403,7 +404,8 @@ static final class ReplayFunction implements Function, Obser @Override public ObservableSource apply(Observable t) throws Exception { - return Observable.wrap(selector.apply(t)).observeOn(scheduler); + ObservableSource apply = ObjectHelper.requireNonNull(selector.apply(t), "The selector returned a null ObservableSource"); + return Observable.wrap(apply).observeOn(scheduler); } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableMapNotification.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableMapNotification.java index dd45da1e52..1fb5f94451 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableMapNotification.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableMapNotification.java @@ -13,14 +13,14 @@ package io.reactivex.internal.operators.observable; -import io.reactivex.internal.functions.ObjectHelper; import java.util.concurrent.Callable; import io.reactivex.*; import io.reactivex.disposables.Disposable; -import io.reactivex.exceptions.Exceptions; +import io.reactivex.exceptions.*; import io.reactivex.functions.Function; import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; public final class ObservableMapNotification extends AbstractObservableWithUpstream> { @@ -106,7 +106,7 @@ public void onError(Throwable t) { p = ObjectHelper.requireNonNull(onErrorMapper.apply(t), "The onError ObservableSource returned is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); - actual.onError(e); + actual.onError(new CompositeException(t, e)); return; } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java index af25fa9bf2..c48e83b74b 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java @@ -24,6 +24,7 @@ import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.*; import io.reactivex.internal.disposables.*; +import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.fuseable.HasUpstreamObservableSource; import io.reactivex.internal.util.*; import io.reactivex.observables.ConnectableObservable; @@ -1026,8 +1027,8 @@ protected void subscribeActual(Observer child) { ConnectableObservable co; ObservableSource observable; try { - co = connectableFactory.call(); - observable = selector.apply(co); + co = ObjectHelper.requireNonNull(connectableFactory.call(), "The connectableFactory returned a null ConnectableObservable"); + observable = ObjectHelper.requireNonNull(selector.apply(co), "The selector returned a null ObservableSource"); } catch (Throwable e) { Exceptions.throwIfFatal(e); EmptyDisposable.error(e, child); diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableUsing.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableUsing.java index 61deb185e9..e29c3e739e 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableUsing.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableUsing.java @@ -21,6 +21,7 @@ import io.reactivex.exceptions.*; import io.reactivex.functions.*; import io.reactivex.internal.disposables.*; +import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.plugins.RxJavaPlugins; public final class ObservableUsing extends Observable { @@ -53,7 +54,7 @@ public void subscribeActual(Observer s) { ObservableSource source; try { - source = sourceSupplier.apply(resource); + source = ObjectHelper.requireNonNull(sourceSupplier.apply(resource), "The sourceSupplier returned a null ObservableSource"); } catch (Throwable e) { Exceptions.throwIfFatal(e); try { diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFromMany.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFromMany.java index 5b6360741a..1578d0e950 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFromMany.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFromMany.java @@ -286,7 +286,7 @@ public void dispose() { final class SingletonArrayFunc implements Function { @Override public R apply(T t) throws Exception { - return combiner.apply(new Object[] { t }); + return ObjectHelper.requireNonNull(combiner.apply(new Object[] { t }), "The combiner returned a null value"); } } } diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleZipArray.java b/src/main/java/io/reactivex/internal/operators/single/SingleZipArray.java index 8f4e867db4..e98917b511 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleZipArray.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleZipArray.java @@ -181,7 +181,7 @@ public void onError(Throwable e) { final class SingletonArrayFunc implements Function { @Override public R apply(T t) throws Exception { - return zipper.apply(new Object[] { t }); + return ObjectHelper.requireNonNull(zipper.apply(new Object[] { t }), "The zipper returned a null value"); } } } diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleZipIterable.java b/src/main/java/io/reactivex/internal/operators/single/SingleZipIterable.java index 5b5f711aeb..6936642225 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleZipIterable.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleZipIterable.java @@ -19,6 +19,7 @@ import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Function; import io.reactivex.internal.disposables.EmptyDisposable; +import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.operators.single.SingleZipArray.ZipCoordinator; public final class SingleZipIterable extends Single { @@ -81,7 +82,7 @@ protected void subscribeActual(SingleObserver observer) { final class SingletonArrayFunc implements Function { @Override public R apply(T t) throws Exception { - return zipper.apply(new Object[] { t }); + return ObjectHelper.requireNonNull(zipper.apply(new Object[] { t }), "The zipper returned a null value"); } } } diff --git a/src/main/java/io/reactivex/observers/TestObserver.java b/src/main/java/io/reactivex/observers/TestObserver.java index bb9d1e357a..761414de01 100644 --- a/src/main/java/io/reactivex/observers/TestObserver.java +++ b/src/main/java/io/reactivex/observers/TestObserver.java @@ -142,6 +142,7 @@ public void onNext(T t) { } catch (Throwable ex) { // Exceptions.throwIfFatal(e); TODO add fatal exceptions? errors.add(ex); + qs.dispose(); } return; } diff --git a/src/main/java/io/reactivex/subscribers/TestSubscriber.java b/src/main/java/io/reactivex/subscribers/TestSubscriber.java index 8287efad2a..07a63a4753 100644 --- a/src/main/java/io/reactivex/subscribers/TestSubscriber.java +++ b/src/main/java/io/reactivex/subscribers/TestSubscriber.java @@ -203,6 +203,7 @@ public void onNext(T t) { } catch (Throwable ex) { // Exceptions.throwIfFatal(e); TODO add fatal exceptions? errors.add(ex); + qs.cancel(); } return; } diff --git a/src/test/java/io/reactivex/flowable/FlowableNullTests.java b/src/test/java/io/reactivex/flowable/FlowableNullTests.java index 497abc21ef..7ff326dfbf 100644 --- a/src/test/java/io/reactivex/flowable/FlowableNullTests.java +++ b/src/test/java/io/reactivex/flowable/FlowableNullTests.java @@ -1341,6 +1341,7 @@ public Publisher call() { } @Test(expected = NullPointerException.class) + @Ignore("No longer crashes with NPE but signals it; tested elsewhere.") public void flatMapNotificationOnErrorReturnsNull() { Flowable.error(new TestException()).flatMap(new Function>() { @Override diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableCombineLatestTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableCombineLatestTest.java index 64d6cb39ea..61c87cbf4f 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableCombineLatestTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableCombineLatestTest.java @@ -30,6 +30,7 @@ import io.reactivex.exceptions.*; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.fuseable.QueueFuseable; import io.reactivex.internal.operators.flowable.FlowableZipTest.ArgsToString; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; @@ -1551,4 +1552,21 @@ public Integer apply(Integer t1, Integer t2) throws Exception { pp2.onNext(2); ts.assertResult(3); } + + @Test + public void fusedNullCheck() { + TestSubscriber ts = SubscriberFusion.newTest(QueueFuseable.ASYNC); + + Flowable.combineLatest(Flowable.just(1), Flowable.just(2), new BiFunction() { + @Override + public Integer apply(Integer t1, Integer t2) throws Exception { + return null; + } + }) + .subscribe(ts); + + ts + .assertOf(SubscriberFusion.assertFusionMode(QueueFuseable.ASYNC)) + .assertFailureAndMessage(NullPointerException.class, "The combiner returned a null value"); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDelayTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDelayTest.java index 229d482055..40e6ff58fb 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDelayTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDelayTest.java @@ -1018,4 +1018,15 @@ public void onComplete() { } } + @Test + public void itemDelayReturnsNull() { + Flowable.just(1).delay(new Function>() { + @Override + public Publisher apply(Integer t) throws Exception { + return null; + } + }) + .test() + .assertFailureAndMessage(NullPointerException.class, "The itemDelay returned a null Publisher"); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java index 9a2d1ea0b1..6a509ad802 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java @@ -264,7 +264,7 @@ public void testFlatMapTransformsOnErrorFuncThrows() { source.flatMap(just(onNext), funcThrow((Throwable) null, onError), just0(onComplete)).subscribe(o); - verify(o).onError(any(TestException.class)); + verify(o).onError(any(CompositeException.class)); verify(o, never()).onNext(any()); verify(o, never()).onComplete(); } @@ -997,4 +997,40 @@ public void run() { } } } + + @Test + public void iterableMapperFunctionReturnsNull() { + Flowable.just(1) + .flatMapIterable(new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return null; + } + }, new BiFunction() { + @Override + public Object apply(Integer v, Object w) throws Exception { + return v; + } + }) + .test() + .assertFailureAndMessage(NullPointerException.class, "The mapper returned a null Iterable"); + } + + @Test + public void combinerMapperFunctionReturnsNull() { + Flowable.just(1) + .flatMap(new Function>() { + @Override + public Publisher apply(Integer v) throws Exception { + return null; + } + }, new BiFunction() { + @Override + public Object apply(Integer v, Object w) throws Exception { + return v; + } + }) + .test() + .assertFailureAndMessage(NullPointerException.class, "The mapper returned a null Publisher"); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableMapNotificationTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableMapNotificationTest.java index 4a697c3e90..287b83fb59 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableMapNotificationTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableMapNotificationTest.java @@ -16,9 +16,10 @@ import java.util.concurrent.Callable; import org.junit.Test; -import org.reactivestreams.Subscriber; +import org.reactivestreams.*; import io.reactivex.*; +import io.reactivex.exceptions.*; import io.reactivex.functions.Function; import io.reactivex.internal.functions.Functions; import io.reactivex.internal.operators.flowable.FlowableMapNotification.MapNotificationSubscriber; @@ -177,4 +178,22 @@ public Flowable apply(Flowable o) throws Exception { } }); } + + @Test + public void onErrorCrash() { + TestSubscriber ts = Flowable.error(new TestException("Outer")) + .flatMap(Functions.justFunction(Flowable.just(1)), + new Function>() { + @Override + public Publisher apply(Throwable t) throws Exception { + throw new TestException("Inner"); + } + }, + Functions.justCallable(Flowable.just(3))) + .test() + .assertFailure(CompositeException.class); + + TestHelper.assertError(ts, 0, TestException.class, "Outer"); + TestHelper.assertError(ts, 1, TestException.class, "Inner"); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java index a66a1651f9..a80e9dd8b5 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java @@ -1748,4 +1748,17 @@ public void timedNoOutdatedData() { source.test().assertResult(); } + + @Test + public void replaySelectorReturnsNull() { + Flowable.just(1) + .replay(new Function, Publisher>() { + @Override + public Publisher apply(Flowable v) throws Exception { + return null; + } + }, Schedulers.trampoline()) + .test() + .assertFailureAndMessage(NullPointerException.class, "The selector returned a null Publisher"); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableUsingTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableUsingTest.java index 94b9efd998..ebb7a9722b 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableUsingTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableUsingTest.java @@ -627,4 +627,14 @@ public void accept(Object e) throws Exception { RxJavaPlugins.reset(); } } + + @Test + public void sourceSupplierReturnsNull() { + Flowable.using(Functions.justCallable(1), + Functions.justFunction((Publisher)null), + Functions.emptyConsumer()) + .test() + .assertFailureAndMessage(NullPointerException.class, "The sourceSupplier returned a null Publisher") + ; + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromTest.java index 67c578b462..b4bb8afcde 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromTest.java @@ -26,6 +26,7 @@ import io.reactivex.*; import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.internal.util.CrashingMappedIterable; import io.reactivex.plugins.RxJavaPlugins; @@ -708,4 +709,12 @@ public Object apply(Object[] o) throws Exception { .test() .assertFailure(NullPointerException.class); } + + @Test + public void zeroOtherCombinerReturnsNull() { + Flowable.just(1) + .withLatestFrom(new Flowable[0], Functions.justFunction(null)) + .test() + .assertFailureAndMessage(NullPointerException.class, "The combiner returned a null value"); + } } diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeZipArrayTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeZipArrayTest.java index 504cc971a5..be8bac0b87 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeZipArrayTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeZipArrayTest.java @@ -22,6 +22,7 @@ import io.reactivex.*; import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; import io.reactivex.observers.TestObserver; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; @@ -161,4 +162,12 @@ public Object apply(Object[] v) { }, Maybe.just(1), null) .blockingGet(); } + + @SuppressWarnings("unchecked") + @Test + public void singleSourceZipperReturnsNull() { + Maybe.zipArray(Functions.justFunction(null), Maybe.just(1)) + .test() + .assertFailureAndMessage(NullPointerException.class, "The zipper returned a null value"); + } } diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeZipIterableTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeZipIterableTest.java index 8c3d8882cf..bd43490bc5 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeZipIterableTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeZipIterableTest.java @@ -22,6 +22,7 @@ import io.reactivex.*; import io.reactivex.exceptions.TestException; import io.reactivex.functions.Function; +import io.reactivex.internal.functions.Functions; import io.reactivex.internal.util.CrashingMappedIterable; import io.reactivex.observers.TestObserver; import io.reactivex.plugins.RxJavaPlugins; @@ -212,4 +213,12 @@ public Object apply(Object[] v) { }) .blockingGet(); } + + @SuppressWarnings("unchecked") + @Test + public void singleSourceZipperReturnsNull() { + Maybe.zipArray(Functions.justFunction(null), Maybe.just(1)) + .test() + .assertFailureAndMessage(NullPointerException.class, "The zipper returned a null value"); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableDelayTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableDelayTest.java index d48617de0a..9073b4ee93 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableDelayTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableDelayTest.java @@ -14,7 +14,7 @@ package io.reactivex.internal.operators.observable; import static org.junit.Assert.*; -import static org.junit.Assert.assertNotEquals; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; import java.util.*; @@ -965,4 +965,16 @@ public void onComplete() { // expected } } + + @Test + public void itemDelayReturnsNull() { + Observable.just(1).delay(new Function>() { + @Override + public Observable apply(Integer t) throws Exception { + return null; + } + }) + .test() + .assertFailureAndMessage(NullPointerException.class, "The itemDelay returned a null ObservableSource"); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java index e733c5e3d4..64cb2e7e41 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java @@ -266,7 +266,7 @@ public void testFlatMapTransformsOnErrorFuncThrows() { source.flatMap(just(onNext), funcThrow((Throwable) null, onError), just0(onComplete)).subscribe(o); - verify(o).onError(any(TestException.class)); + verify(o).onError(any(CompositeException.class)); verify(o, never()).onNext(any()); verify(o, never()).onComplete(); } @@ -858,4 +858,40 @@ public void run() { } } } + + @Test + public void iterableMapperFunctionReturnsNull() { + Observable.just(1) + .flatMapIterable(new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return null; + } + }, new BiFunction() { + @Override + public Object apply(Integer v, Object w) throws Exception { + return v; + } + }) + .test() + .assertFailureAndMessage(NullPointerException.class, "The mapper returned a null Iterable"); + } + + @Test + public void combinerMapperFunctionReturnsNull() { + Observable.just(1) + .flatMap(new Function>() { + @Override + public Observable apply(Integer v) throws Exception { + return null; + } + }, new BiFunction() { + @Override + public Object apply(Integer v, Object w) throws Exception { + return v; + } + }) + .test() + .assertFailureAndMessage(NullPointerException.class, "The mapper returned a null ObservableSource"); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableMapNotificationTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableMapNotificationTest.java index e75d454d61..fae7c60afd 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableMapNotificationTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableMapNotificationTest.java @@ -19,6 +19,7 @@ import io.reactivex.*; import io.reactivex.disposables.Disposables; +import io.reactivex.exceptions.*; import io.reactivex.functions.Function; import io.reactivex.internal.functions.Functions; import io.reactivex.internal.operators.observable.ObservableMapNotification.MapNotificationObserver; @@ -85,4 +86,22 @@ public ObservableSource apply(Observable o) throws Exception { } }); } + + @Test + public void onErrorCrash() { + TestObserver ts = Observable.error(new TestException("Outer")) + .flatMap(Functions.justFunction(Observable.just(1)), + new Function>() { + @Override + public Observable apply(Throwable t) throws Exception { + throw new TestException("Inner"); + } + }, + Functions.justCallable(Observable.just(3))) + .test() + .assertFailure(CompositeException.class); + + TestHelper.assertError(ts, 0, TestException.class, "Outer"); + TestHelper.assertError(ts, 1, TestException.class, "Inner"); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java index 2057e33692..d6f2a512c3 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java @@ -1528,4 +1528,37 @@ public void timedNoOutdatedData() { source.test().assertResult(); } + + @Test + public void replaySelectorReturnsNullScheduled() { + Observable.just(1) + .replay(new Function, Observable>() { + @Override + public Observable apply(Observable v) throws Exception { + return null; + } + }, Schedulers.trampoline()) + .test() + .assertFailureAndMessage(NullPointerException.class, "The selector returned a null ObservableSource"); + } + + @Test + public void replaySelectorReturnsNull() { + Observable.just(1) + .replay(new Function, Observable>() { + @Override + public Observable apply(Observable v) throws Exception { + return null; + } + }) + .test() + .assertFailureAndMessage(NullPointerException.class, "The selector returned a null ObservableSource"); + } + + @Test + public void replaySelectorConnectableReturnsNull() { + ObservableReplay.multicastSelector(Functions.justCallable((ConnectableObservable)null), Functions.justFunction(Observable.just(1))) + .test() + .assertFailureAndMessage(NullPointerException.class, "The connectableFactory returned a null ConnectableObservable"); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableUsingTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableUsingTest.java index aa677806bf..2bf61565d9 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableUsingTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableUsingTest.java @@ -557,4 +557,14 @@ public void accept(Object e) throws Exception { RxJavaPlugins.reset(); } } + + @Test + public void sourceSupplierReturnsNull() { + Observable.using(Functions.justCallable(1), + Functions.justFunction((Observable)null), + Functions.emptyConsumer()) + .test() + .assertFailureAndMessage(NullPointerException.class, "The sourceSupplier returned a null ObservableSource") + ; + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableWithLatestFromTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableWithLatestFromTest.java index a27f4ad37a..4cba999e51 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableWithLatestFromTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableWithLatestFromTest.java @@ -27,6 +27,7 @@ import io.reactivex.disposables.Disposables; import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; import io.reactivex.internal.util.CrashingMappedIterable; import io.reactivex.observers.TestObserver; import io.reactivex.plugins.RxJavaPlugins; @@ -647,4 +648,12 @@ public Object apply(Object[] o) throws Exception { .test() .assertFailure(NullPointerException.class); } + + @Test + public void zeroOtherCombinerReturnsNull() { + Observable.just(1) + .withLatestFrom(new Observable[0], Functions.justFunction(null)) + .test() + .assertFailureAndMessage(NullPointerException.class, "The combiner returned a null value"); + } } diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleZipArrayTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleZipArrayTest.java index e56f7c0022..5c2b17c368 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleZipArrayTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleZipArrayTest.java @@ -22,6 +22,7 @@ import io.reactivex.*; import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; import io.reactivex.observers.TestObserver; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; @@ -187,4 +188,12 @@ public Object apply(Object[] a) throws Exception { .test() .assertResult(2); } + + @SuppressWarnings("unchecked") + @Test + public void singleSourceZipperReturnsNull() { + Single.zipArray(Functions.justFunction(null), Single.just(1)) + .test() + .assertFailureAndMessage(NullPointerException.class, "The zipper returned a null value"); + } } diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleZipIterableTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleZipIterableTest.java index 31e8bc73d3..415aeb1db4 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleZipIterableTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleZipIterableTest.java @@ -22,6 +22,7 @@ import io.reactivex.*; import io.reactivex.exceptions.TestException; import io.reactivex.functions.Function; +import io.reactivex.internal.functions.Functions; import io.reactivex.internal.util.CrashingMappedIterable; import io.reactivex.observers.TestObserver; import io.reactivex.plugins.RxJavaPlugins; @@ -236,4 +237,12 @@ public Object apply(Object[] a) throws Exception { .test() .assertResult(2); } + + @SuppressWarnings("unchecked") + @Test + public void singleSourceZipperReturnsNull() { + Single.zip(Arrays.asList(Single.just(1)), Functions.justFunction(null)) + .test() + .assertFailureAndMessage(NullPointerException.class, "The zipper returned a null value"); + } } diff --git a/src/test/java/io/reactivex/observable/ObservableNullTests.java b/src/test/java/io/reactivex/observable/ObservableNullTests.java index c39205f8bc..7f58c9a057 100644 --- a/src/test/java/io/reactivex/observable/ObservableNullTests.java +++ b/src/test/java/io/reactivex/observable/ObservableNullTests.java @@ -1406,6 +1406,7 @@ public Observable call() { } @Test(expected = NullPointerException.class) + @Ignore("No longer crashes with NPE but signals it; tested elsewhere.") public void flatMapNotificationOnErrorReturnsNull() { Observable.error(new TestException()).flatMap(new Function>() { @Override