diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableAmb.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableAmb.java index 5af4200aa9..3523fbcd29 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableAmb.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableAmb.java @@ -15,6 +15,7 @@ import java.util.concurrent.atomic.*; +import io.reactivex.exceptions.Exceptions; import org.reactivestreams.*; import io.reactivex.Flowable; @@ -38,13 +39,23 @@ public void subscribeActual(Subscriber s) { int count = 0; if (sources == null) { sources = new Publisher[8]; - for (Publisher p : sourcesIterable) { - if (count == sources.length) { - Publisher[] b = new Publisher[count + (count >> 2)]; - System.arraycopy(sources, 0, b, 0, count); - sources = b; + try { + for (Publisher p : sourcesIterable) { + if (p == null) { + EmptySubscription.error(new NullPointerException("One of the sources is null"), s); + return; + } + if (count == sources.length) { + Publisher[] b = new Publisher[count + (count >> 2)]; + System.arraycopy(sources, 0, b, 0, count); + sources = b; + } + sources[count++] = p; } - sources[count++] = p; + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + EmptySubscription.error(e, s); + return; } } else { count = sources.length; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableAmb.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableAmb.java index e4c4ef2c2e..078268e5f6 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableAmb.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableAmb.java @@ -17,6 +17,7 @@ import io.reactivex.*; import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; import io.reactivex.internal.disposables.*; import io.reactivex.plugins.RxJavaPlugins; @@ -36,13 +37,23 @@ public void subscribeActual(Observer s) { int count = 0; if (sources == null) { sources = new Observable[8]; - for (ObservableSource p : sourcesIterable) { - if (count == sources.length) { - ObservableSource[] b = new ObservableSource[count + (count >> 2)]; - System.arraycopy(sources, 0, b, 0, count); - sources = b; + try { + for (ObservableSource p : sourcesIterable) { + if (p == null) { + EmptyDisposable.error(new NullPointerException("One of the sources is null"), s); + return; + } + if (count == sources.length) { + ObservableSource[] b = new ObservableSource[count + (count >> 2)]; + System.arraycopy(sources, 0, b, 0, count); + sources = b; + } + sources[count++] = p; } - sources[count++] = p; + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + EmptyDisposable.error(e, s); + return; } } else { count = sources.length; diff --git a/src/test/java/io/reactivex/flowable/FlowableNullTests.java b/src/test/java/io/reactivex/flowable/FlowableNullTests.java index af2f83f67c..b4191f6cd0 100644 --- a/src/test/java/io/reactivex/flowable/FlowableNullTests.java +++ b/src/test/java/io/reactivex/flowable/FlowableNullTests.java @@ -55,20 +55,22 @@ public void ambIterableNull() { Flowable.amb((Iterable>)null); } - @Test(expected = NullPointerException.class) + @Test public void ambIterableIteratorNull() { Flowable.amb(new Iterable>() { @Override public Iterator> iterator() { return null; } - }).blockingLast(); + }).test().assertError(NullPointerException.class); } @SuppressWarnings("unchecked") - @Test(expected = NullPointerException.class) + @Test public void ambIterableOneIsNull() { - Flowable.amb(Arrays.asList(Flowable.never(), null)).blockingLast(); + Flowable.amb(Arrays.asList(Flowable.never(), null)) + .test() + .assertError(NullPointerException.class); } @Test(expected = NullPointerException.class) diff --git a/src/test/java/io/reactivex/observable/ObservableNullTests.java b/src/test/java/io/reactivex/observable/ObservableNullTests.java index 2f43c85452..2dea75f9db 100644 --- a/src/test/java/io/reactivex/observable/ObservableNullTests.java +++ b/src/test/java/io/reactivex/observable/ObservableNullTests.java @@ -56,20 +56,22 @@ public void ambIterableNull() { Observable.amb((Iterable>)null); } - @Test(expected = NullPointerException.class) + @Test public void ambIterableIteratorNull() { Observable.amb(new Iterable>() { @Override public Iterator> iterator() { return null; } - }).blockingLast(); + }).test().assertError(NullPointerException.class); } @SuppressWarnings("unchecked") - @Test(expected = NullPointerException.class) + @Test public void ambIterableOneIsNull() { - Observable.amb(Arrays.asList(Observable.never(), null)).blockingLast(); + Observable.amb(Arrays.asList(Observable.never(), null)) + .test() + .assertError(NullPointerException.class); } @Test(expected = NullPointerException.class)