From eae433993d39b1b07165fdd6a04bf093766f11a3 Mon Sep 17 00:00:00 2001 From: Victor Albertos Date: Fri, 30 Sep 2016 13:52:11 +0200 Subject: [PATCH] 2.x: Merge AmbArray and AmbIterable into Amb for Single, Maybe and Completable types --- src/main/java/io/reactivex/Completable.java | 4 +- src/main/java/io/reactivex/Maybe.java | 4 +- src/main/java/io/reactivex/Single.java | 4 +- ...tableAmbArray.java => CompletableAmb.java} | 44 ++++- .../completable/CompletableAmbIterable.java | 150 ------------------ .../{MaybeAmbArray.java => MaybeAmb.java} | 45 +++++- .../operators/maybe/MaybeAmbIterable.java | 56 ------- .../{SingleAmbArray.java => SingleAmb.java} | 40 ++++- .../operators/single/SingleAmbIterable.java | 131 --------------- .../completable/CompletableTest.java | 72 ++++----- .../operators/single/SingleAmbTest.java | 48 +++++- .../java/io/reactivex/maybe/MaybeTest.java | 31 ++++ .../io/reactivex/single/SingleNullTests.java | 16 +- 13 files changed, 242 insertions(+), 403 deletions(-) rename src/main/java/io/reactivex/internal/operators/completable/{CompletableAmbArray.java => CompletableAmb.java} (60%) delete mode 100644 src/main/java/io/reactivex/internal/operators/completable/CompletableAmbIterable.java rename src/main/java/io/reactivex/internal/operators/maybe/{MaybeAmbArray.java => MaybeAmb.java} (63%) delete mode 100644 src/main/java/io/reactivex/internal/operators/maybe/MaybeAmbIterable.java rename src/main/java/io/reactivex/internal/operators/single/{SingleAmbArray.java => SingleAmb.java} (58%) delete mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleAmbIterable.java diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java index cfcd1c5935..6d64f5f8f3 100644 --- a/src/main/java/io/reactivex/Completable.java +++ b/src/main/java/io/reactivex/Completable.java @@ -61,7 +61,7 @@ public static Completable ambArray(final CompletableSource... sources) { return wrap(sources[0]); } - return RxJavaPlugins.onAssembly(new CompletableAmbArray(sources)); + return RxJavaPlugins.onAssembly(new CompletableAmb(sources, null)); } /** @@ -79,7 +79,7 @@ public static Completable ambArray(final CompletableSource... sources) { public static Completable amb(final Iterable sources) { ObjectHelper.requireNonNull(sources, "sources is null"); - return RxJavaPlugins.onAssembly(new CompletableAmbIterable(sources)); + return RxJavaPlugins.onAssembly(new CompletableAmb(null, sources)); } /** diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index 78537d303a..7fbb057c26 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -57,7 +57,7 @@ public abstract class Maybe implements MaybeSource { @SchedulerSupport(SchedulerSupport.NONE) public static Maybe amb(final Iterable> sources) { ObjectHelper.requireNonNull(sources, "sources is null"); - return RxJavaPlugins.onAssembly(new MaybeAmbIterable(sources)); + return RxJavaPlugins.onAssembly(new MaybeAmb(null, sources)); } /** @@ -80,7 +80,7 @@ public static Maybe ambArray(final MaybeSource... sources) { if (sources.length == 1) { return wrap((MaybeSource)sources[0]); } - return RxJavaPlugins.onAssembly(new MaybeAmbArray(sources)); + return RxJavaPlugins.onAssembly(new MaybeAmb(sources, null)); } /** diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index 937aafe511..3902167d8b 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -73,7 +73,7 @@ public abstract class Single implements SingleSource { @SchedulerSupport(SchedulerSupport.NONE) public static Single amb(final Iterable> sources) { ObjectHelper.requireNonNull(sources, "sources is null"); - return RxJavaPlugins.onAssembly(new SingleAmbIterable(sources)); + return RxJavaPlugins.onAssembly(new SingleAmb(null, sources)); } /** @@ -97,7 +97,7 @@ public static Single ambArray(final SingleSource... sources) if (sources.length == 1) { return wrap((SingleSource)sources[0]); } - return RxJavaPlugins.onAssembly(new SingleAmbArray(sources)); + return RxJavaPlugins.onAssembly(new SingleAmb(sources, null)); } /** diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableAmbArray.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableAmb.java similarity index 60% rename from src/main/java/io/reactivex/internal/operators/completable/CompletableAmbArray.java rename to src/main/java/io/reactivex/internal/operators/completable/CompletableAmb.java index 39663aa36a..d843d4e0f7 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableAmbArray.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableAmb.java @@ -17,18 +17,47 @@ import io.reactivex.*; import io.reactivex.disposables.*; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.internal.disposables.EmptyDisposable; import io.reactivex.plugins.RxJavaPlugins; -public final class CompletableAmbArray extends Completable { +public final class CompletableAmb extends Completable { + private final CompletableSource[] sources; + private final Iterable sourcesIterable; - final CompletableSource[] sources; - - public CompletableAmbArray(CompletableSource[] sources) { + public CompletableAmb(CompletableSource[] sources, Iterable sourcesIterable) { this.sources = sources; + this.sourcesIterable = sourcesIterable; } @Override public void subscribeActual(final CompletableObserver s) { + CompletableSource[] sources = this.sources; + int count = 0; + if (sources == null) { + sources = new CompletableSource[8]; + try { + for (CompletableSource element : sourcesIterable) { + if (element == null) { + EmptyDisposable.error(new NullPointerException("One of the sources is null"), s); + return; + } + if (count == sources.length) { + CompletableSource[] b = new CompletableSource[count + (count >> 2)]; + System.arraycopy(sources, 0, b, 0, count); + sources = b; + } + sources[count++] = element; + }; + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + EmptyDisposable.error(e, s); + return; + } + } else { + count = sources.length; + } + final CompositeDisposable set = new CompositeDisposable(); s.onSubscribe(set); @@ -60,7 +89,8 @@ public void onSubscribe(Disposable d) { }; - for (CompletableSource c : sources) { + for (int i = 0; i < count; i++) { + CompletableSource c = sources[i]; if (set.isDisposed()) { return; } @@ -81,5 +111,9 @@ public void onSubscribe(Disposable d) { // no need to have separate subscribers because inner is stateless c.subscribe(inner); } + + if (count == 0) { + s.onComplete(); + } } } diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableAmbIterable.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableAmbIterable.java deleted file mode 100644 index 8e121ce46d..0000000000 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableAmbIterable.java +++ /dev/null @@ -1,150 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. - */ - -package io.reactivex.internal.operators.completable; - -import java.util.Iterator; -import java.util.concurrent.atomic.AtomicBoolean; - -import io.reactivex.*; -import io.reactivex.disposables.*; -import io.reactivex.exceptions.Exceptions; -import io.reactivex.plugins.RxJavaPlugins; - -public final class CompletableAmbIterable extends Completable { - - final Iterable sources; - - public CompletableAmbIterable(Iterable sources) { - this.sources = sources; - } - - @Override - protected void subscribeActual(final CompletableObserver s) { - final CompositeDisposable set = new CompositeDisposable(); - s.onSubscribe(set); - - Iterator it; - - try { - it = sources.iterator(); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - s.onError(e); - return; - } - - if (it == null) { - s.onError(new NullPointerException("The iterator returned is null")); - return; - } - - boolean empty = true; - - final AtomicBoolean once = new AtomicBoolean(); - - CompletableObserver inner = new CompletableObserver() { - @Override - public void onComplete() { - if (once.compareAndSet(false, true)) { - set.dispose(); - s.onComplete(); - } - } - - @Override - public void onError(Throwable e) { - if (once.compareAndSet(false, true)) { - set.dispose(); - s.onError(e); - } else { - RxJavaPlugins.onError(e); - } - } - - @Override - public void onSubscribe(Disposable d) { - set.add(d); - } - - }; - - for (;;) { - if (once.get() || set.isDisposed()) { - return; - } - - boolean b; - - try { - b = it.hasNext(); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - if (once.compareAndSet(false, true)) { - set.dispose(); - s.onError(e); - } else { - RxJavaPlugins.onError(e); - } - return; - } - - if (!b) { - if (empty) { - s.onComplete(); - } - break; - } - - empty = false; - - if (once.get() || set.isDisposed()) { - return; - } - - CompletableSource c; - - try { - c = it.next(); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - if (once.compareAndSet(false, true)) { - set.dispose(); - s.onError(e); - } else { - RxJavaPlugins.onError(e); - } - return; - } - - if (c == null) { - NullPointerException npe = new NullPointerException("One of the sources is null"); - if (once.compareAndSet(false, true)) { - set.dispose(); - s.onError(npe); - } else { - RxJavaPlugins.onError(npe); - } - return; - } - - if (once.get() || set.isDisposed()) { - return; - } - - // no need to have separate subscribers because inner is stateless - c.subscribe(inner); - } - } - -} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeAmbArray.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeAmb.java similarity index 63% rename from src/main/java/io/reactivex/internal/operators/maybe/MaybeAmbArray.java rename to src/main/java/io/reactivex/internal/operators/maybe/MaybeAmb.java index 5602ac99c0..776cfbadfa 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeAmbArray.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeAmb.java @@ -17,6 +17,8 @@ import io.reactivex.*; import io.reactivex.disposables.*; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.internal.disposables.EmptyDisposable; import io.reactivex.plugins.RxJavaPlugins; /** @@ -24,21 +26,49 @@ * * @param the value type emitted */ -public final class MaybeAmbArray extends Maybe { +public final class MaybeAmb extends Maybe { + private final MaybeSource[] sources; + private final Iterable> sourcesIterable; - final MaybeSource[] sources; - - public MaybeAmbArray(MaybeSource[] sources) { + public MaybeAmb(MaybeSource[] sources, Iterable> sourcesIterable) { this.sources = sources; + this.sourcesIterable = sourcesIterable; } @Override + @SuppressWarnings("unchecked") protected void subscribeActual(MaybeObserver observer) { + MaybeSource[] sources = this.sources; + int count = 0; + if (sources == null) { + sources = new MaybeSource[8]; + try { + for (MaybeSource element : sourcesIterable) { + if (element == null) { + EmptyDisposable.error(new NullPointerException("One of the sources is null"), observer); + return; + } + if (count == sources.length) { + MaybeSource[] b = new MaybeSource[count + (count >> 2)]; + System.arraycopy(sources, 0, b, 0, count); + sources = b; + } + sources[count++] = element; + } + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + EmptyDisposable.error(e, observer); + return; + } + } else { + count = sources.length; + } AmbMaybeObserver parent = new AmbMaybeObserver(observer); observer.onSubscribe(parent); - for (MaybeSource s : sources) { + for (int i = 0; i < count; i++) { + MaybeSource s = sources[i]; if (parent.isDisposed()) { return; } @@ -50,6 +80,11 @@ protected void subscribeActual(MaybeObserver observer) { s.subscribe(parent); } + + if (count == 0) { + observer.onComplete(); + } + } static final class AmbMaybeObserver diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeAmbIterable.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeAmbIterable.java deleted file mode 100644 index 32fa2c9e68..0000000000 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeAmbIterable.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. - */ - -package io.reactivex.internal.operators.maybe; - -import io.reactivex.*; -import io.reactivex.internal.operators.maybe.MaybeAmbArray.AmbMaybeObserver; - -/** - * Signals the event of the first MaybeSource that signals. - * - * @param the value type emitted - */ -public final class MaybeAmbIterable extends Maybe { - - final Iterable> sources; - - public MaybeAmbIterable(Iterable> sources) { - this.sources = sources; - } - - @Override - protected void subscribeActual(MaybeObserver observer) { - AmbMaybeObserver parent = new AmbMaybeObserver(observer); - observer.onSubscribe(parent); - - int i = 0; - for (MaybeSource s : sources) { - if (parent.isDisposed()) { - return; - } - - if (s == null) { - parent.onError(new NullPointerException("One of the MaybeSources is null")); - return; - } - - s.subscribe(parent); - i++; - } - - if (i == 0) { - observer.onComplete(); - } - } -} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleAmbArray.java b/src/main/java/io/reactivex/internal/operators/single/SingleAmb.java similarity index 58% rename from src/main/java/io/reactivex/internal/operators/single/SingleAmbArray.java rename to src/main/java/io/reactivex/internal/operators/single/SingleAmb.java index 1f7eeb8ff9..d6233944f7 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleAmbArray.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleAmb.java @@ -17,24 +17,54 @@ import io.reactivex.*; import io.reactivex.disposables.*; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.internal.disposables.EmptyDisposable; import io.reactivex.plugins.RxJavaPlugins; -public final class SingleAmbArray extends Single { +public final class SingleAmb extends Single { + private final SingleSource[] sources; + private final Iterable> sourcesIterable; - final SingleSource[] sources; - - public SingleAmbArray(SingleSource[] sources) { + public SingleAmb(SingleSource[] sources, Iterable> sourcesIterable) { this.sources = sources; + this.sourcesIterable = sourcesIterable; } @Override + @SuppressWarnings("unchecked") protected void subscribeActual(final SingleObserver s) { + SingleSource[] sources = this.sources; + int count = 0; + if (sources == null) { + sources = new SingleSource[8]; + try { + for (SingleSource element : sourcesIterable) { + if (element == null) { + EmptyDisposable.error(new NullPointerException("One of the sources is null"), s); + return; + } + if (count == sources.length) { + SingleSource[] b = new SingleSource[count + (count >> 2)]; + System.arraycopy(sources, 0, b, 0, count); + sources = b; + } + sources[count++] = element; + } + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + EmptyDisposable.error(e, s); + return; + } + } else { + count = sources.length; + } final AtomicBoolean once = new AtomicBoolean(); final CompositeDisposable set = new CompositeDisposable(); s.onSubscribe(set); - for (SingleSource s1 : sources) { + for (int i = 0; i < count; i++) { + SingleSource s1 = sources[i]; if (once.get()) { return; } diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleAmbIterable.java b/src/main/java/io/reactivex/internal/operators/single/SingleAmbIterable.java deleted file mode 100644 index a4ed7a1da7..0000000000 --- a/src/main/java/io/reactivex/internal/operators/single/SingleAmbIterable.java +++ /dev/null @@ -1,131 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. - */ - -package io.reactivex.internal.operators.single; - -import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; - -import io.reactivex.*; -import io.reactivex.disposables.*; -import io.reactivex.exceptions.Exceptions; -import io.reactivex.plugins.RxJavaPlugins; - -public final class SingleAmbIterable extends Single { - - final Iterable> sources; - - public SingleAmbIterable(Iterable> sources) { - this.sources = sources; - } - - @Override - protected void subscribeActual(final SingleObserver s) { - final CompositeDisposable set = new CompositeDisposable(); - s.onSubscribe(set); - - Iterator> iterator; - - try { - iterator = sources.iterator(); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - s.onError(e); - return; - } - - if (iterator == null) { - s.onError(new NullPointerException("The iterator returned is null")); - return; - } - - final AtomicBoolean once = new AtomicBoolean(); - int c = 0; - - for (;;) { - if (once.get()) { - return; - } - - boolean b; - - try { - b = iterator.hasNext(); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - s.onError(e); - return; - } - - if (once.get()) { - return; - } - - if (!b) { - break; - } - - if (once.get()) { - return; - } - - SingleSource s1; - - try { - s1 = iterator.next(); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - set.dispose(); - s.onError(e); - return; - } - - if (s1 == null) { - set.dispose(); - s.onError(new NullPointerException("The single source returned by the iterator is null")); - return; - } - - s1.subscribe(new SingleObserver() { - - @Override - public void onSubscribe(Disposable d) { - set.add(d); - } - - @Override - public void onSuccess(T value) { - if (once.compareAndSet(false, true)) { - s.onSuccess(value); - } - } - - @Override - public void onError(Throwable e) { - if (once.compareAndSet(false, true)) { - s.onError(e); - } else { - RxJavaPlugins.onError(e); - } - } - - }); - c++; - } - - if (c == 0 && !set.isDisposed()) { - s.onError(new NoSuchElementException()); - } - } - -} diff --git a/src/test/java/io/reactivex/completable/CompletableTest.java b/src/test/java/io/reactivex/completable/CompletableTest.java index 11e842ee6f..66f8ee0f53 100644 --- a/src/test/java/io/reactivex/completable/CompletableTest.java +++ b/src/test/java/io/reactivex/completable/CompletableTest.java @@ -2934,11 +2934,11 @@ public void ambArraySingleNormal() { c.blockingAwait(); } - @Test(timeout = 1000, expected = TestException.class) + @Test(timeout = 1000) public void ambArraySingleError() { - Completable c = Completable.ambArray(error.completable); - - c.blockingAwait(); + Completable.ambArray(error.completable) + .test() + .assertError(TestException.class); } @Test(timeout = 1000) @@ -3065,11 +3065,11 @@ public void accept(Throwable v) { Assert.assertTrue("Not completed", complete.get() instanceof TestException); } - @Test(timeout = 1000, expected = NullPointerException.class) + @Test(timeout = 1000) public void ambMultipleOneIsNull() { - Completable c = Completable.ambArray(null, normal.completable); - - c.blockingAwait(); + Completable.ambArray(null, normal.completable) + .test() + .assertError(NullPointerException.class); } @Test(timeout = 1000) @@ -3084,23 +3084,21 @@ public void ambIterableNull() { Completable.amb((Iterable)null); } - @Test(timeout = 1000, expected = NullPointerException.class) + @Test(timeout = 1000) public void ambIterableIteratorNull() { - Completable c = Completable.amb(new Iterable() { + Completable.amb(new Iterable() { @Override public Iterator iterator() { return null; } - }); - - c.blockingAwait(); + }).test().assertError(NullPointerException.class); } - @Test(timeout = 1000, expected = NullPointerException.class) + @Test(timeout = 1000) public void ambIterableWithNull() { - Completable c = Completable.amb(Arrays.asList(null, normal.completable)); - - c.blockingAwait(); + Completable.amb(Arrays.asList(null, normal.completable)) + .test() + .assertError(NullPointerException.class); } @Test(timeout = 1000) @@ -3121,44 +3119,42 @@ public void ambIterableMany() { normal.assertSubscriptions(1); } - @Test(timeout = 1000, expected = TestException.class) + @Test(timeout = 1000) public void ambIterableOneThrows() { - Completable c = Completable.amb(Collections.singleton(error.completable)); - - c.blockingAwait(); + Completable.amb(Collections.singleton(error.completable)) + .test() + .assertError(TestException.class); } - @Test(timeout = 1000, expected = TestException.class) + @Test(timeout = 1000) public void ambIterableManyOneThrows() { - Completable c = Completable.amb(Arrays.asList(error.completable, normal.completable)); - - c.blockingAwait(); + Completable.amb(Arrays.asList(error.completable, normal.completable)) + .test() + .assertError(TestException.class); } - @Test(expected = TestException.class) + @Test public void ambIterableIterableThrows() { - Completable c = Completable.amb(new Iterable() { + Completable.amb(new Iterable() { @Override public Iterator iterator() { throw new TestException(); } - }); - - c.blockingAwait(); + }).test().assertError(TestException.class); } - @Test(expected = TestException.class) + @Test public void ambIterableIteratorHasNextThrows() { - Completable c = Completable.amb(new IterableIteratorHasNextThrows()); - - c.blockingAwait(); + Completable.amb(new IterableIteratorHasNextThrows()) + .test() + .assertError(TestException.class); } - @Test(expected = TestException.class) + @Test public void ambIterableIteratorNextThrows() { - Completable c = Completable.amb(new IterableIteratorNextThrows()); - - c.blockingAwait(); + Completable.amb(new IterableIteratorNextThrows()) + .test() + .assertError(TestException.class); } @Test(expected = NullPointerException.class) diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleAmbTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleAmbTest.java index 725c3bc1e7..de9a0c9861 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleAmbTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleAmbTest.java @@ -15,8 +15,11 @@ import static org.junit.Assert.*; -import java.util.NoSuchElementException; +import java.util.*; +import io.reactivex.Completable; +import io.reactivex.SingleSource; +import io.reactivex.exceptions.TestException; import org.junit.Test; import io.reactivex.Single; @@ -63,6 +66,49 @@ public void ambWithSecondFires() { ts.assertResult(2); } + @SuppressWarnings("unchecked") + @Test(timeout = 1000) + public void ambIterableWithFirstFires() { + PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor pp2 = PublishProcessor.create(); + + List> singles = Arrays.asList(pp1.single(-99), pp2.single(-99)); + TestObserver ts = Single.amb(singles).test(); + + assertTrue(pp1.hasSubscribers()); + assertTrue(pp2.hasSubscribers()); + + pp1.onNext(1); + pp1.onComplete(); + + assertFalse(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + + ts.assertResult(1); + + } + + @SuppressWarnings("unchecked") + @Test(timeout = 1000) + public void ambIterableWithSecondFires() { + PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor pp2 = PublishProcessor.create(); + + List> singles = Arrays.asList(pp1.single(-99), pp2.single(-99)); + TestObserver ts = Single.amb(singles).test(); + + assertTrue(pp1.hasSubscribers()); + assertTrue(pp2.hasSubscribers()); + + pp2.onNext(2); + pp2.onComplete(); + + assertFalse(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + + ts.assertResult(2); + } + @SuppressWarnings("unchecked") @Test public void ambArrayEmpty() { diff --git a/src/test/java/io/reactivex/maybe/MaybeTest.java b/src/test/java/io/reactivex/maybe/MaybeTest.java index 3570d34e76..bb6e2f9928 100644 --- a/src/test/java/io/reactivex/maybe/MaybeTest.java +++ b/src/test/java/io/reactivex/maybe/MaybeTest.java @@ -1509,6 +1509,14 @@ public void maybeToPublisherEnum() { TestHelper.checkEnum(MaybeToPublisher.class); } + @SuppressWarnings("unchecked") + @Test + public void ambArrayOneIsNull() { + Maybe.ambArray(null, Maybe.just(1)) + .test() + .assertError(NullPointerException.class); + } + @SuppressWarnings("unchecked") @Test public void ambArrayEmpty() { @@ -1836,6 +1844,29 @@ public void ambIterable2SignalsComplete() { ts.assertResult(); } + @Test(expected = NullPointerException.class) + public void ambIterableNull() { + Maybe.amb((Iterable>)null); + } + + @Test + public void ambIterableIteratorNull() { + Maybe.amb(new Iterable>() { + @Override + public Iterator> iterator() { + return null; + } + }).test().assertError(NullPointerException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void ambIterableOneIsNull() { + Maybe.amb(Arrays.asList(null, Maybe.just(1))) + .test() + .assertError(NullPointerException.class); + } + @Test public void ambIterableEmpty() { Maybe.amb(Collections.>emptyList()).test().assertResult(); diff --git a/src/test/java/io/reactivex/single/SingleNullTests.java b/src/test/java/io/reactivex/single/SingleNullTests.java index a7e44ff4ff..e1832469c1 100644 --- a/src/test/java/io/reactivex/single/SingleNullTests.java +++ b/src/test/java/io/reactivex/single/SingleNullTests.java @@ -38,20 +38,22 @@ public void ambIterableNull() { Single.amb((Iterable>)null); } - @Test(expected = NullPointerException.class) + @Test public void ambIterableIteratorNull() { Single.amb(new Iterable>() { @Override public Iterator> iterator() { return null; } - }).blockingGet(); + }).test().assertError(NullPointerException.class); } @SuppressWarnings("unchecked") - @Test(expected = NullPointerException.class) + @Test public void ambIterableOneIsNull() { - Single.amb(Arrays.asList(null, just1)).blockingGet(); + Single.amb(Arrays.asList(null, just1)) + .test() + .assertError(NullPointerException.class); } @Test(expected = NullPointerException.class) @@ -60,9 +62,11 @@ public void ambArrayNull() { } @SuppressWarnings("unchecked") - @Test(expected = NullPointerException.class) + @Test public void ambArrayOneIsNull() { - Single.ambArray(null, just1).blockingGet(); + Single.ambArray(null, just1) + .test() + .assertError(NullPointerException.class); } @Test(expected = NullPointerException.class)