From 4a614dca0a3777116c3364aa49ea4191791bae0c Mon Sep 17 00:00:00 2001 From: Niklas Baudy Date: Wed, 12 Oct 2016 23:57:09 +0200 Subject: [PATCH] 2.x: Clean up null usages by using ObjectHelper.requireNonNull (#4699) --- .../CompletableConcatIterable.java | 7 +-- .../completable/CompletableErrorSupplier.java | 6 +- .../CompletableMergeDelayErrorIterable.java | 7 +-- .../completable/CompletableMergeIterable.java | 8 +-- .../operators/flowable/FlowableBuffer.java | 27 ++------ .../flowable/FlowableBufferBoundary.java | 15 +---- .../FlowableBufferBoundarySupplier.java | 35 ++--------- .../flowable/FlowableBufferExactBoundary.java | 17 +----- .../flowable/FlowableBufferTimed.java | 61 +++---------------- .../operators/flowable/FlowableCollect.java | 9 +-- .../flowable/FlowableCombineLatest.java | 17 +----- .../operators/flowable/FlowableConcatMap.java | 19 ++---- .../operators/flowable/FlowableDebounce.java | 9 +-- .../operators/flowable/FlowableDefer.java | 7 +-- .../operators/flowable/FlowableDistinct.java | 16 +---- .../operators/flowable/FlowableError.java | 6 +- .../flowable/FlowableMapNotification.java | 22 ++----- .../operators/flowable/FlowableReplay.java | 15 ++--- .../operators/flowable/FlowableScan.java | 9 +-- .../operators/flowable/FlowableScanSeed.java | 16 +---- .../operators/flowable/FlowableSwitchMap.java | 9 +-- .../operators/flowable/FlowableTimeout.java | 16 +---- .../FlowableWindowBoundarySelector.java | 9 +-- .../FlowableWindowBoundarySupplier.java | 17 +----- .../flowable/FlowableZipIterable.java | 24 ++------ .../operators/maybe/MaybeErrorCallable.java | 7 +-- .../observable/ObservableBuffer.java | 13 +--- .../observable/ObservableBufferBoundary.java | 15 +---- .../ObservableBufferBoundarySupplier.java | 35 ++--------- .../ObservableBufferExactBoundary.java | 17 +----- .../observable/ObservableBufferTimed.java | 36 ++--------- .../observable/ObservableCollect.java | 8 +-- .../observable/ObservableCombineLatest.java | 10 +-- .../observable/ObservableDebounce.java | 9 +-- .../operators/observable/ObservableDefer.java | 7 +-- .../observable/ObservableDistinct.java | 16 +---- .../operators/observable/ObservableError.java | 6 +- .../observable/ObservableFromCallable.java | 11 ++-- .../observable/ObservableFromFuture.java | 11 ++-- .../observable/ObservableGenerate.java | 6 +- .../observable/ObservableGroupBy.java | 9 +-- .../observable/ObservableMapNotification.java | 22 ++----- .../operators/observable/ObservableScan.java | 9 +-- .../observable/ObservableScanSeed.java | 16 +---- .../observable/ObservableSwitchMap.java | 9 +-- .../observable/ObservableTimeout.java | 16 +---- .../ObservableWindowBoundarySelector.java | 9 +-- .../ObservableWindowBoundarySupplier.java | 17 +----- .../operators/observable/ObservableZip.java | 9 +-- .../observable/ObservableZipIterable.java | 24 ++------ .../operators/single/SingleError.java | 7 +-- 51 files changed, 146 insertions(+), 611 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableConcatIterable.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableConcatIterable.java index 1409734e58..f3ce249001 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableConcatIterable.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableConcatIterable.java @@ -109,18 +109,13 @@ void next() { CompletableSource c; try { - c = a.next(); + c = ObjectHelper.requireNonNull(a.next(), "The CompletableSource returned is null"); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); actual.onError(ex); return; } - if (c == null) { - actual.onError(new NullPointerException("The completable returned is null")); - return; - } - c.subscribe(this); } while (decrementAndGet() != 0); } diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableErrorSupplier.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableErrorSupplier.java index e0344b7b1d..bd4ae01f03 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableErrorSupplier.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableErrorSupplier.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.completable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.concurrent.Callable; import io.reactivex.*; @@ -32,15 +33,12 @@ protected void subscribeActual(CompletableObserver s) { Throwable error; try { - error = errorSupplier.call(); + error = ObjectHelper.requireNonNull(errorSupplier.call(), "The error returned is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); error = e; } - if (error == null) { - error = new NullPointerException("The error supplied is null"); - } EmptyDisposable.error(error, s); } diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableMergeDelayErrorIterable.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableMergeDelayErrorIterable.java index 82587b4c27..5c9906f541 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableMergeDelayErrorIterable.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableMergeDelayErrorIterable.java @@ -40,18 +40,13 @@ public void subscribeActual(final CompletableObserver s) { Iterator iterator; try { - iterator = sources.iterator(); + iterator = ObjectHelper.requireNonNull(sources.iterator(), "The source iterator returned is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); s.onError(e); return; } - if (iterator == null) { - s.onError(new NullPointerException("The source iterator returned is null")); - return; - } - final AtomicInteger wip = new AtomicInteger(1); final AtomicThrowable error = new AtomicThrowable(); diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableMergeIterable.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableMergeIterable.java index 55b952b5de..660d3911f3 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableMergeIterable.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableMergeIterable.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.completable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.Iterator; import java.util.concurrent.atomic.*; @@ -38,18 +39,13 @@ public void subscribeActual(final CompletableObserver s) { Iterator iterator; try { - iterator = sources.iterator(); + iterator = ObjectHelper.requireNonNull(sources.iterator(), "The source iterator returned is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); s.onError(e); return; } - if (iterator == null) { - s.onError(new NullPointerException("The source iterator returned is null")); - return; - } - final AtomicInteger wip = new AtomicInteger(1); MergeCompletableObserver shared = new MergeCompletableObserver(s, set, wip); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBuffer.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBuffer.java index 319dcdc1a2..2fae45da89 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBuffer.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBuffer.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.flowable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.atomic.*; @@ -104,7 +105,7 @@ public void onNext(T t) { if (b == null) { try { - b = bufferSupplier.call(); + b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The bufferSupplier returned a null buffer"); } catch (Throwable e) { Exceptions.throwIfFatal(e); cancel(); @@ -112,12 +113,6 @@ public void onNext(T t) { return; } - if (b == null) { - cancel(); - - onError(new NullPointerException("The bufferSupplier returned a null buffer")); - return; - } buffer = b; } @@ -231,7 +226,7 @@ public void onNext(T t) { if (i % skip == 0L) { // FIXME no need for modulo try { - b = bufferSupplier.call(); + b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The bufferSupplier returned a null buffer"); } catch (Throwable e) { Exceptions.throwIfFatal(e); cancel(); @@ -240,13 +235,6 @@ public void onNext(T t) { return; } - if (b == null) { - cancel(); - - onError(new NullPointerException("The bufferSupplier returned a null buffer")); - return; - } - buffer = b; } @@ -390,7 +378,7 @@ public void onNext(T t) { C b; try { - b = bufferSupplier.call(); + b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The bufferSupplier returned a null buffer"); } catch (Throwable e) { Exceptions.throwIfFatal(e); cancel(); @@ -398,13 +386,6 @@ public void onNext(T t) { return; } - if (b == null) { - cancel(); - - onError(new NullPointerException("The bufferSupplier returned a null buffer")); - return; - } - bs.offer(b); } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundary.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundary.java index 0164ecb9ad..891a8d43fe 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundary.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundary.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.flowable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicInteger; @@ -173,33 +174,23 @@ void open(Open window) { U b; try { - b = bufferSupplier.call(); + b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The buffer supplied is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); onError(e); return; } - if (b == null) { - onError(new NullPointerException("The buffer supplied is null")); - return; - } - Publisher p; try { - p = bufferClose.apply(window); + p = ObjectHelper.requireNonNull(bufferClose.apply(window), "The buffer closing publisher is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); onError(e); return; } - if (p == null) { - onError(new NullPointerException("The buffer closing publisher is null")); - return; - } - if (cancelled) { return; } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundarySupplier.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundarySupplier.java index e042136e32..55d49471cc 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundarySupplier.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundarySupplier.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.flowable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.Collection; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicReference; @@ -76,7 +77,7 @@ public void onSubscribe(Subscription s) { U b; try { - b = bufferSupplier.call(); + b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The buffer supplied is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); cancelled = true; @@ -85,18 +86,12 @@ public void onSubscribe(Subscription s) { return; } - if (b == null) { - cancelled = true; - s.cancel(); - EmptySubscription.error(new NullPointerException("The buffer supplied is null"), actual); - return; - } buffer = b; Publisher boundary; try { - boundary = boundarySupplier.call(); + boundary = ObjectHelper.requireNonNull(boundarySupplier.call(), "The boundary publisher supplied is null"); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); cancelled = true; @@ -105,13 +100,6 @@ public void onSubscribe(Subscription s) { return; } - if (boundary == null) { - cancelled = true; - s.cancel(); - EmptySubscription.error(new NullPointerException("The boundary publisher supplied is null"), actual); - return; - } - BufferBoundarySubscriber bs = new BufferBoundarySubscriber(this); other.set(bs); @@ -185,7 +173,7 @@ void next() { U next; try { - next = bufferSupplier.call(); + next = ObjectHelper.requireNonNull(bufferSupplier.call(), "The buffer supplied is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); cancel(); @@ -193,16 +181,10 @@ void next() { return; } - if (next == null) { - cancel(); - actual.onError(new NullPointerException("The buffer supplied is null")); - return; - } - Publisher boundary; try { - boundary = boundarySupplier.call(); + boundary = ObjectHelper.requireNonNull(boundarySupplier.call(), "The boundary publisher supplied is null"); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); cancelled = true; @@ -211,13 +193,6 @@ void next() { return; } - if (boundary == null) { - cancelled = true; - s.cancel(); - actual.onError(new NullPointerException("The boundary publisher supplied is null")); - return; - } - BufferBoundarySubscriber bs = new BufferBoundarySubscriber(this); Disposable o = other.get(); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferExactBoundary.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferExactBoundary.java index 7fbce6a68c..ccad9a66bb 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferExactBoundary.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferExactBoundary.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.flowable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.Collection; import java.util.concurrent.Callable; @@ -71,7 +72,7 @@ public void onSubscribe(Subscription s) { U b; try { - b = bufferSupplier.call(); + b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The buffer supplied is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); cancelled = true; @@ -80,12 +81,6 @@ public void onSubscribe(Subscription s) { return; } - if (b == null) { - cancelled = true; - s.cancel(); - EmptySubscription.error(new NullPointerException("The buffer supplied is null"), actual); - return; - } buffer = b; BufferBoundarySubscriber bs = new BufferBoundarySubscriber(this); @@ -157,7 +152,7 @@ void next() { U next; try { - next = bufferSupplier.call(); + next = ObjectHelper.requireNonNull(bufferSupplier.call(), "The buffer supplied is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); cancel(); @@ -165,12 +160,6 @@ void next() { return; } - if (next == null) { - cancel(); - actual.onError(new NullPointerException("The buffer supplied is null")); - return; - } - U b; synchronized (this) { b = buffer; diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java index 56d6ad3299..535f532730 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.flowable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; @@ -113,7 +114,7 @@ public void onSubscribe(Subscription s) { U b; try { - b = bufferSupplier.call(); + b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The supplied buffer is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); cancel(); @@ -121,12 +122,6 @@ public void onSubscribe(Subscription s) { return; } - if (b == null) { - cancel(); - EmptySubscription.error(new NullPointerException("buffer supplied is null"), actual); - return; - } - buffer = b; actual.onSubscribe(this); @@ -206,7 +201,7 @@ public void run() { U next; try { - next = bufferSupplier.call(); + next = ObjectHelper.requireNonNull(bufferSupplier.call(), "The supplied buffer is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); selfCancel = true; @@ -215,13 +210,6 @@ public void run() { return; } - if (next == null) { - selfCancel = true; - cancel(); - actual.onError(new NullPointerException("buffer supplied is null")); - return; - } - U current; synchronized (this) { @@ -292,7 +280,7 @@ public void onSubscribe(Subscription s) { final U b; // NOPMD try { - b = bufferSupplier.call(); + b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The supplied buffer is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); w.dispose(); @@ -301,13 +289,6 @@ public void onSubscribe(Subscription s) { return; } - if (b == null) { - w.dispose(); - s.cancel(); - EmptySubscription.error(new NullPointerException("The supplied buffer is null"), actual); - return; - } - buffers.add(b); actual.onSubscribe(this); @@ -388,7 +369,7 @@ public void run() { final U b; // NOPMD try { - b = bufferSupplier.call(); + b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The supplied buffer is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); cancel(); @@ -396,11 +377,6 @@ public void run() { return; } - if (b == null) { - cancel(); - actual.onError(new NullPointerException("The supplied buffer is null")); - return; - } synchronized (this) { if (cancelled) { return; @@ -470,7 +446,7 @@ public void onSubscribe(Subscription s) { U b; try { - b = bufferSupplier.call(); + b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The supplied buffer is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); w.dispose(); @@ -479,13 +455,6 @@ public void onSubscribe(Subscription s) { return; } - if (b == null) { - w.dispose(); - s.cancel(); - EmptySubscription.error(new NullPointerException("The supplied buffer is null"), actual); - return; - } - buffer = b; actual.onSubscribe(this); @@ -521,7 +490,7 @@ public void onNext(T t) { fastPathOrderedEmitMax(b, false, this); try { - b = bufferSupplier.call(); + b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The supplied buffer is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); cancel(); @@ -529,14 +498,6 @@ public void onNext(T t) { return; } - if (b == null) { - cancel(); - actual.onError(new NullPointerException("The buffer supplied is null")); - return; - } - - - if (restartTimerOnMaxSize) { synchronized (this) { buffer = b; @@ -616,7 +577,7 @@ public void run() { U next; try { - next = bufferSupplier.call(); + next = ObjectHelper.requireNonNull(bufferSupplier.call(), "The supplied buffer is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); cancel(); @@ -624,12 +585,6 @@ public void run() { return; } - if (next == null) { - cancel(); - actual.onError(new NullPointerException("The buffer supplied is null")); - return; - } - U current; synchronized (this) { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCollect.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCollect.java index c179e19892..68104c6993 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCollect.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCollect.java @@ -12,6 +12,7 @@ */ package io.reactivex.internal.operators.flowable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.concurrent.Callable; import org.reactivestreams.*; @@ -36,15 +37,11 @@ public FlowableCollect(Publisher source, Callable initialSupplie protected void subscribeActual(Subscriber s) { U u; try { - u = initialSupplier.call(); + u = ObjectHelper.requireNonNull(initialSupplier.call(), "The initial value supplied is null"); } catch (Throwable e) { EmptySubscription.error(e, s); return; } - if (u == null) { - EmptySubscription.error(new NullPointerException("The initial value supplied is null"), s); - return; - } source.subscribe(new CollectSubscriber(s, u, collector)); } @@ -115,4 +112,4 @@ public void cancel() { s.cancel(); } } -} \ No newline at end of file +} diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java index d3405d0b17..ce717b07ea 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java @@ -78,18 +78,13 @@ public void subscribeActual(Subscriber s) { Iterator> it; try { - it = iterable.iterator(); + it = ObjectHelper.requireNonNull(iterable.iterator(), "The iterator returned is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); EmptySubscription.error(e, s); return; } - if (it == null) { - EmptySubscription.error(new NullPointerException("The iterator returned is null"), s); - return; - } - for (;;) { boolean b; @@ -109,19 +104,13 @@ public void subscribeActual(Subscriber s) { Publisher p; try { - p = it.next(); + p = ObjectHelper.requireNonNull(it.next(), "The publisher returned by the iterator is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); EmptySubscription.error(e, s); return; } - if (p == null) { - EmptySubscription.error(new NullPointerException("The Publisher returned by the iterator is " + - "null"), s); - return; - } - if (n == a.length) { Publisher[] c = new Publisher[n + (n >> 2)]; System.arraycopy(a, 0, c, 0, n); @@ -562,4 +551,4 @@ public void requestOne() { } } -} \ No newline at end of file +} diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMap.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMap.java index ebe5ee955f..713565bfbb 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMap.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMap.java @@ -12,6 +12,7 @@ */ package io.reactivex.internal.operators.flowable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.concurrent.Callable; import java.util.concurrent.atomic.*; @@ -289,7 +290,7 @@ void drain() { Publisher p; try { - p = mapper.apply(v); + p = ObjectHelper.requireNonNull(mapper.apply(v), "The mapper returned a null Publisher"); } catch (Throwable e) { Exceptions.throwIfFatal(e); @@ -298,12 +299,6 @@ void drain() { return; } - if (p == null) { - s.cancel(); - actual.onError(new NullPointerException("The mapper returned a null Publisher")); - return; - } - if (sourceMode != QueueSubscription.SYNC) { int c = consumed + 1; if (c == limit) { @@ -511,7 +506,7 @@ void drain() { Publisher p; try { - p = mapper.apply(v); + p = ObjectHelper.requireNonNull(mapper.apply(v), "The mapper returned a null Publisher"); } catch (Throwable e) { Exceptions.throwIfFatal(e); @@ -520,12 +515,6 @@ void drain() { return; } - if (p == null) { - s.cancel(); - actual.onError(new NullPointerException("The mapper returned a null Publisher")); - return; - } - if (sourceMode != QueueSubscription.SYNC) { int c = consumed + 1; if (c == limit) { @@ -636,4 +625,4 @@ public void onComplete() { parent.innerComplete(); } } -} \ No newline at end of file +} diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDebounce.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDebounce.java index 0e9501a15d..9d8a64aacf 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDebounce.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDebounce.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.flowable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.concurrent.atomic.*; import org.reactivestreams.*; @@ -86,7 +87,7 @@ public void onNext(T t) { Publisher p; try { - p = debounceSelector.apply(t); + p = ObjectHelper.requireNonNull(debounceSelector.apply(t), "The publisher supplied is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); cancel(); @@ -94,12 +95,6 @@ public void onNext(T t) { return; } - if (p == null) { - cancel(); - actual.onError(new NullPointerException("The publisher supplied is null")); - return; - } - DebounceInnerSubscriber dis = new DebounceInnerSubscriber(this, idx, t); if (debouncer.compareAndSet(d, dis)) { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDefer.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDefer.java index 1ce06b0ff0..9f1cedf58c 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDefer.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDefer.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.flowable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.concurrent.Callable; import org.reactivestreams.*; @@ -30,17 +31,13 @@ public FlowableDefer(Callable> supplier) { public void subscribeActual(Subscriber s) { Publisher pub; try { - pub = supplier.call(); + pub = ObjectHelper.requireNonNull(supplier.call(), "The publisher supplied is null"); } catch (Throwable t) { Exceptions.throwIfFatal(t); EmptySubscription.error(t, s); return; } - if (pub == null) { - EmptySubscription.error(new NullPointerException("null publisher supplied"), s); - return; - } pub.subscribe(s); } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDistinct.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDistinct.java index fd22a9ff0e..1691b3bdf5 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDistinct.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDistinct.java @@ -107,18 +107,13 @@ public boolean test(K t) { protected void subscribeActual(Subscriber s) { Predicate coll; try { - coll = predicateSupplier.call(); + coll = ObjectHelper.requireNonNull(predicateSupplier.call(), "predicateSupplier returned null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); EmptySubscription.error(e, s); return; } - if (coll == null) { - EmptySubscription.error(new NullPointerException("predicateSupplier returned null"), s); - return; - } - source.subscribe(new DistinctSubscriber(s, keySelector, coll)); } @@ -148,7 +143,7 @@ public void onNext(T t) { K key; try { - key = keySelector.apply(t); + key = ObjectHelper.requireNonNull(keySelector.apply(t), "Null key supplied"); } catch (Throwable e) { Exceptions.throwIfFatal(e); s.cancel(); @@ -156,13 +151,6 @@ public void onNext(T t) { return; } - if (key == null) { - s.cancel(); - actual.onError(new NullPointerException("Null key supplied")); - return; - } - - boolean b; try { b = predicate.test(key); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableError.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableError.java index 4ce527a649..c9117c9b1e 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableError.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableError.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.flowable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.concurrent.Callable; import org.reactivestreams.Subscriber; @@ -30,14 +31,11 @@ public FlowableError(Callable errorSupplier) { public void subscribeActual(Subscriber s) { Throwable error; try { - error = errorSupplier.call(); + error = ObjectHelper.requireNonNull(errorSupplier.call(), "Callable returned null throwable. Null values are generally not allowed in 2.x operators and sources."); } catch (Throwable t) { Exceptions.throwIfFatal(t); error = t; } - if (error == null) { - error = new NullPointerException("Callable returned null throwable. Null values are generally not allowed in 2.x operators and sources."); - } EmptySubscription.error(error, s); } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableMapNotification.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableMapNotification.java index be7b5d8271..bfd5c4930f 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableMapNotification.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableMapNotification.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.flowable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.concurrent.Callable; import java.util.concurrent.atomic.*; @@ -93,18 +94,13 @@ public void onNext(T t) { R p; try { - p = onNextMapper.apply(t); + p = ObjectHelper.requireNonNull(onNextMapper.apply(t), "The onNext publisher returned is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); actual.onError(e); return; } - if (p == null) { - actual.onError(new NullPointerException("The onNext publisher returned is null")); - return; - } - actual.onNext(p); long r = get(); @@ -118,18 +114,13 @@ public void onError(Throwable t) { R p; try { - p = onErrorMapper.apply(t); + p = ObjectHelper.requireNonNull(onErrorMapper.apply(t), "The onError publisher returned is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); actual.onError(e); return; } - if (p == null) { - actual.onError(new NullPointerException("The onError publisher returned is null")); - return; - } - tryEmit(p); } @@ -138,18 +129,13 @@ public void onComplete() { R p; try { - p = onCompleteSupplier.call(); + p = ObjectHelper.requireNonNull(onCompleteSupplier.call(), "The onComplete publisher returned is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); actual.onError(e); return; } - if (p == null) { - actual.onError(new NullPointerException("The onComplete publisher returned is null")); - return; - } - tryEmit(p); } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java index 7b6f0318fc..1d16b92392 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.flowable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -66,29 +67,21 @@ public static Flowable multicastSelector( public void subscribe(Subscriber child) { ConnectableFlowable co; try { - co = connectableFactory.call(); + co = ObjectHelper.requireNonNull(connectableFactory.call(), "The connectableFactory returned null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); EmptySubscription.error(e, child); return; } - if (co == null) { - EmptySubscription.error(new NullPointerException("The connectableFactory returned null"), child); - return; - } Publisher observable; try { - observable = selector.apply(co); + observable = ObjectHelper.requireNonNull(selector.apply(co), "The selector returned a null Publisher"); } catch (Throwable e) { Exceptions.throwIfFatal(e); EmptySubscription.error(e, child); return; } - if (observable == null) { - EmptySubscription.error(new NullPointerException("The selector returned a null Publisher"), child); - return; - } final SubscriberResourceWrapper srw = new SubscriberResourceWrapper(child); @@ -1224,4 +1217,4 @@ void truncateFinal() { } } } -} \ No newline at end of file +} diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableScan.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableScan.java index 7ce55c5386..ed73e01841 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableScan.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableScan.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.flowable; +import io.reactivex.internal.functions.ObjectHelper; import org.reactivestreams.*; import io.reactivex.exceptions.Exceptions; @@ -63,7 +64,7 @@ public void onNext(T t) { T u; try { - u = accumulator.apply(v, t); + u = ObjectHelper.requireNonNull(accumulator.apply(v, t), "The value returned by the accumulator is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); s.cancel(); @@ -71,12 +72,6 @@ public void onNext(T t) { return; } - if (u == null) { - s.cancel(); - a.onError(new NullPointerException("The value returned by the accumulator is null")); - return; - } - value = u; a.onNext(u); } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableScanSeed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableScanSeed.java index 043b730aed..f382dc199b 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableScanSeed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableScanSeed.java @@ -12,6 +12,7 @@ */ package io.reactivex.internal.operators.flowable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.concurrent.Callable; import org.reactivestreams.*; @@ -38,18 +39,13 @@ protected void subscribeActual(Subscriber s) { R r; try { - r = seedSupplier.call(); + r = ObjectHelper.requireNonNull(seedSupplier.call(), "The seed supplied is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); EmptySubscription.error(e, s); return; } - if (r == null) { - EmptySubscription.error(new NullPointerException("The seed supplied is null"), s); - return; - } - source.subscribe(new ScanSeedSubscriber(s, accumulator, r)); } @@ -83,7 +79,7 @@ public void onNext(T t) { R u; try { - u = accumulator.apply(v, t); + u = ObjectHelper.requireNonNull(accumulator.apply(v, t), "The accumulator returned a null value"); } catch (Throwable e) { Exceptions.throwIfFatal(e); s.cancel(); @@ -91,12 +87,6 @@ public void onNext(T t) { return; } - if (u == null) { - s.cancel(); - onError(new NullPointerException("The accumulator returned a null value")); - return; - } - value = u; if (!queue.offer(u)) { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSwitchMap.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSwitchMap.java index c5b76dde8f..b6be3978c3 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSwitchMap.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSwitchMap.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.flowable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.concurrent.atomic.*; import org.reactivestreams.*; @@ -104,7 +105,7 @@ public void onNext(T t) { Publisher p; try { - p = mapper.apply(t); + p = ObjectHelper.requireNonNull(mapper.apply(t), "The publisher returned is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); s.cancel(); @@ -112,12 +113,6 @@ public void onNext(T t) { return; } - if (p == null) { - s.cancel(); - onError(new NullPointerException("The publisher returned is null")); - return; - } - SwitchMapInnerSubscriber nextInner = new SwitchMapInnerSubscriber(this, c, bufferSize); for (;;) { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeout.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeout.java index 7ebad0e418..9590bfe3b6 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeout.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeout.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.flowable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; @@ -118,7 +119,7 @@ public void onNext(T t) { Publisher p; try { - p = itemTimeoutIndicator.apply(t); + p = ObjectHelper.requireNonNull(itemTimeoutIndicator.apply(t), "The publisher returned is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); cancel(); @@ -126,12 +127,6 @@ public void onNext(T t) { return; } - if (p == null) { - cancel(); - actual.onError(new NullPointerException("The publisher returned is null")); - return; - } - TimeoutInnerSubscriber tis = new TimeoutInnerSubscriber(this, idx); if (timeout.compareAndSet(d, tis)) { @@ -287,18 +282,13 @@ public void onNext(T t) { Publisher p; try { - p = itemTimeoutIndicator.apply(t); + p = ObjectHelper.requireNonNull(itemTimeoutIndicator.apply(t), "The publisher returned is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); actual.onError(e); return; } - if (p == null) { - actual.onError(new NullPointerException("The publisher returned is null")); - return; - } - TimeoutInnerSubscriber tis = new TimeoutInnerSubscriber(this, idx); if (timeout.compareAndSet(d, tis)) { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySelector.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySelector.java index 9bb573f749..4240d59a55 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySelector.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySelector.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.flowable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.*; import java.util.concurrent.atomic.*; @@ -281,19 +282,13 @@ void drainLoop() { Publisher p; try { - p = close.apply(wo.open); + p = ObjectHelper.requireNonNull(close.apply(wo.open), "The publisher supplied is null"); } catch (Throwable e) { cancelled = true; a.onError(e); continue; } - if (p == null) { - cancelled = true; - a.onError(new NullPointerException("The publisher supplied is null")); - continue; - } - OperatorWindowBoundaryCloseSubscriber cl = new OperatorWindowBoundaryCloseSubscriber(this, w); if (resources.add(cl)) { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySupplier.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySupplier.java index 26438353e0..e94a8017b6 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySupplier.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySupplier.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.flowable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.concurrent.Callable; import java.util.concurrent.atomic.*; @@ -90,7 +91,7 @@ public void onSubscribe(Subscription s) { Publisher p; try { - p = other.call(); + p = ObjectHelper.requireNonNull(other.call(), "The first window publisher supplied is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); s.cancel(); @@ -98,12 +99,6 @@ public void onSubscribe(Subscription s) { return; } - if (p == null) { - s.cancel(); - a.onError(new NullPointerException("The first window publisher supplied is null")); - return; - } - UnicastProcessor w = UnicastProcessor.create(bufferSize); long r = requested(); @@ -248,7 +243,7 @@ void drainLoop() { Publisher p; try { - p = other.call(); + p = ObjectHelper.requireNonNull(other.call(), "The publisher supplied is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); DisposableHelper.dispose(boundary); @@ -256,12 +251,6 @@ void drainLoop() { return; } - if (p == null) { - DisposableHelper.dispose(boundary); - a.onError(new NullPointerException("The publisher supplied is null")); - return; - } - w = UnicastProcessor.create(bufferSize); long r = requested(); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableZipIterable.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableZipIterable.java index ea0a27763b..5719b3f812 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableZipIterable.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableZipIterable.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.flowable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.Iterator; import org.reactivestreams.*; @@ -41,18 +42,13 @@ public void subscribeActual(Subscriber t) { Iterator it; try { - it = other.iterator(); + it = ObjectHelper.requireNonNull(other.iterator(), "The iterator returned by other is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); EmptySubscription.error(e, t); return; } - if (it == null) { - EmptySubscription.error(new NullPointerException("The iterator returned by other is null"), t); - return; - } - boolean b; try { @@ -104,27 +100,17 @@ public void onNext(T t) { U u; try { - u = iterator.next(); + u = ObjectHelper.requireNonNull(iterator.next(), "The iterator returned a null value"); } catch (Throwable e) { error(e); return; } - if (u == null) { - error(new NullPointerException("The iterator returned a null value")); - return; - } - V v; try { - v = zipper.apply(t, u); + v = ObjectHelper.requireNonNull(zipper.apply(t, u), "The zipper function returned a null value"); } catch (Throwable e) { - error(new NullPointerException("The iterator returned a null value")); - return; - } - - if (v == null) { - error(new NullPointerException("The zipper function returned a null value")); + error(e); return; } diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeErrorCallable.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeErrorCallable.java index d913b4ffb6..8e29b77e4b 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeErrorCallable.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeErrorCallable.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.maybe; +import io.reactivex.internal.functions.ObjectHelper; import java.util.concurrent.Callable; import io.reactivex.*; @@ -38,16 +39,12 @@ protected void subscribeActual(MaybeObserver observer) { Throwable ex; try { - ex = errorSupplier.call(); + ex = ObjectHelper.requireNonNull(errorSupplier.call(), "Callable returned null throwable. Null values are generally not allowed in 2.x operators and sources."); } catch (Throwable ex1) { Exceptions.throwIfFatal(ex1); ex = ex1; } - if (ex == null) { - ex = new NullPointerException("Callable returned null throwable. Null values are generally not allowed in 2.x operators and sources."); - } - observer.onError(ex); } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableBuffer.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableBuffer.java index 26c64a76db..58adba16c7 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableBuffer.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableBuffer.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.observable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; @@ -66,7 +67,7 @@ static final class BufferExactObserver> imple boolean createBuffer() { U b; try { - b = bufferSupplier.call(); + b = ObjectHelper.requireNonNull(bufferSupplier.call(), "Empty buffer supplied"); } catch (Throwable t) { Exceptions.throwIfFatal(t); buffer = null; @@ -80,16 +81,6 @@ boolean createBuffer() { } buffer = b; - if (b == null) { - Throwable t = new NullPointerException("Empty buffer supplied"); - if (s == null) { - EmptyDisposable.error(t, actual); - } else { - s.dispose(); - actual.onError(t); - } - return false; - } return true; } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundary.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundary.java index 5d39f5696b..8528560c4b 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundary.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundary.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.observable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicInteger; @@ -159,33 +160,23 @@ void open(Open window) { U b; try { - b = bufferSupplier.call(); + b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The buffer supplied is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); onError(e); return; } - if (b == null) { - onError(new NullPointerException("The buffer supplied is null")); - return; - } - ObservableSource p; try { - p = bufferClose.apply(window); + p = ObjectHelper.requireNonNull(bufferClose.apply(window), "The buffer closing Observable is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); onError(e); return; } - if (p == null) { - onError(new NullPointerException("The buffer closing Observable is null")); - return; - } - if (cancelled) { return; } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundarySupplier.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundarySupplier.java index 83397e806b..6c77ac5b41 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundarySupplier.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundarySupplier.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.observable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.Collection; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicReference; @@ -72,7 +73,7 @@ public void onSubscribe(Disposable s) { U b; try { - b = bufferSupplier.call(); + b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The buffer supplied is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); cancelled = true; @@ -81,18 +82,12 @@ public void onSubscribe(Disposable s) { return; } - if (b == null) { - cancelled = true; - s.dispose(); - EmptyDisposable.error(new NullPointerException("The buffer supplied is null"), actual); - return; - } buffer = b; ObservableSource boundary; try { - boundary = boundarySupplier.call(); + boundary = ObjectHelper.requireNonNull(boundarySupplier.call(), "The boundary publisher supplied is null"); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); cancelled = true; @@ -101,13 +96,6 @@ public void onSubscribe(Disposable s) { return; } - if (boundary == null) { - cancelled = true; - s.dispose(); - EmptyDisposable.error(new NullPointerException("The boundary publisher supplied is null"), actual); - return; - } - BufferBoundaryObserver bs = new BufferBoundaryObserver(this); other.set(bs); @@ -180,7 +168,7 @@ void next() { U next; try { - next = bufferSupplier.call(); + next = ObjectHelper.requireNonNull(bufferSupplier.call(), "The buffer supplied is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); dispose(); @@ -188,16 +176,10 @@ void next() { return; } - if (next == null) { - dispose(); - actual.onError(new NullPointerException("The buffer supplied is null")); - return; - } - ObservableSource boundary; try { - boundary = boundarySupplier.call(); + boundary = ObjectHelper.requireNonNull(boundarySupplier.call(), "The boundary publisher supplied is null"); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); cancelled = true; @@ -206,13 +188,6 @@ void next() { return; } - if (boundary == null) { - cancelled = true; - s.dispose(); - actual.onError(new NullPointerException("The boundary publisher supplied is null")); - return; - } - BufferBoundaryObserver bs = new BufferBoundaryObserver(this); Disposable o = other.get(); diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferExactBoundary.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferExactBoundary.java index 4b73016211..309e7e6fd9 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferExactBoundary.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferExactBoundary.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.observable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.Collection; import java.util.concurrent.Callable; @@ -68,7 +69,7 @@ public void onSubscribe(Disposable s) { U b; try { - b = bufferSupplier.call(); + b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The buffer supplied is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); cancelled = true; @@ -77,12 +78,6 @@ public void onSubscribe(Disposable s) { return; } - if (b == null) { - cancelled = true; - s.dispose(); - EmptyDisposable.error(new NullPointerException("The buffer supplied is null"), actual); - return; - } buffer = b; BufferBoundaryObserver bs = new BufferBoundaryObserver(this); @@ -153,7 +148,7 @@ void next() { U next; try { - next = bufferSupplier.call(); + next = ObjectHelper.requireNonNull(bufferSupplier.call(), "The buffer supplied is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); dispose(); @@ -161,12 +156,6 @@ void next() { return; } - if (next == null) { - dispose(); - actual.onError(new NullPointerException("The buffer supplied is null")); - return; - } - U b; synchronized (this) { b = buffer; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferTimed.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferTimed.java index 68b8159ecd..c3469c35ad 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferTimed.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferTimed.java @@ -111,7 +111,7 @@ public void onSubscribe(Disposable s) { U b; try { - b = bufferSupplier.call(); + b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The buffer supplied is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); dispose(); @@ -119,12 +119,6 @@ public void onSubscribe(Disposable s) { return; } - if (b == null) { - dispose(); - EmptyDisposable.error(new NullPointerException("buffer supplied is null"), actual); - return; - } - buffer = b; actual.onSubscribe(this); @@ -267,7 +261,7 @@ public void onSubscribe(Disposable s) { final U b; // NOPMD try { - b = bufferSupplier.call(); + b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The buffer supplied is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); w.dispose(); @@ -276,13 +270,6 @@ public void onSubscribe(Disposable s) { return; } - if (b == null) { - w.dispose(); - s.dispose(); - EmptyDisposable.error(new NullPointerException("The supplied buffer is null"), actual); - return; - } - buffers.add(b); actual.onSubscribe(this); @@ -439,7 +426,7 @@ public void onSubscribe(Disposable s) { U b; try { - b = bufferSupplier.call(); + b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The buffer supplied is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); w.dispose(); @@ -448,13 +435,6 @@ public void onSubscribe(Disposable s) { return; } - if (b == null) { - w.dispose(); - s.dispose(); - EmptyDisposable.error(new NullPointerException("The supplied buffer is null"), actual); - return; - } - buffer = b; actual.onSubscribe(this); @@ -489,7 +469,7 @@ public void onNext(T t) { fastPathOrderedEmit(b, false, this); try { - b = bufferSupplier.call(); + b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The buffer supplied is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); dispose(); @@ -497,14 +477,6 @@ public void onNext(T t) { return; } - if (b == null) { - dispose(); - actual.onError(new NullPointerException("The buffer supplied is null")); - return; - } - - - if (restartTimerOnMaxSize) { synchronized (this) { buffer = b; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableCollect.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableCollect.java index 44d70d3127..bb55744374 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableCollect.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableCollect.java @@ -12,6 +12,7 @@ */ package io.reactivex.internal.operators.observable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.concurrent.Callable; import io.reactivex.*; @@ -35,17 +36,12 @@ public ObservableCollect(ObservableSource source, protected void subscribeActual(Observer t) { U u; try { - u = initialSupplier.call(); + u = ObjectHelper.requireNonNull(initialSupplier.call(), "The initialSupplier returned a null value"); } catch (Throwable e) { EmptyDisposable.error(e, t); return; } - if (u == null) { - EmptyDisposable.error(new NullPointerException("The initialSupplier returned a null value"), t); - return; - } - source.subscribe(new CollectObserver(t, u, collector)); } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableCombineLatest.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableCombineLatest.java index 12d9230d27..441a68531f 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableCombineLatest.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableCombineLatest.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.observable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.Arrays; import java.util.concurrent.atomic.*; @@ -235,7 +236,7 @@ void drain() { R v; try { - v = combiner.apply(array); + v = ObjectHelper.requireNonNull(combiner.apply(array), "The combiner returned a null"); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); cancelled = true; @@ -244,13 +245,6 @@ void drain() { return; } - if (v == null) { - cancelled = true; - cancel(q); - a.onError(new NullPointerException("The combiner returned a null")); - return; - } - a.onNext(v); } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableDebounce.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableDebounce.java index ed86d1c68f..650fe7880b 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableDebounce.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableDebounce.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.observable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.concurrent.atomic.*; import io.reactivex.*; @@ -80,7 +81,7 @@ public void onNext(T t) { ObservableSource p; try { - p = debounceSelector.apply(t); + p = ObjectHelper.requireNonNull(debounceSelector.apply(t), "The publisher supplied is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); dispose(); @@ -88,12 +89,6 @@ public void onNext(T t) { return; } - if (p == null) { - dispose(); - actual.onError(new NullPointerException("The publisher supplied is null")); - return; - } - DebounceInnerObserver dis = new DebounceInnerObserver(this, idx, t); if (debouncer.compareAndSet(d, dis)) { diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableDefer.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableDefer.java index 3248b24aa0..5954007b43 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableDefer.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableDefer.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.observable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.concurrent.Callable; import io.reactivex.*; @@ -28,17 +29,13 @@ public ObservableDefer(Callable> supplie public void subscribeActual(Observer s) { ObservableSource pub; try { - pub = supplier.call(); + pub = ObjectHelper.requireNonNull(supplier.call(), "null publisher supplied"); } catch (Throwable t) { Exceptions.throwIfFatal(t); EmptyDisposable.error(t, s); return; } - if (pub == null) { - EmptyDisposable.error(new NullPointerException("null publisher supplied"), s); - return; - } pub.subscribe(s); } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableDistinct.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableDistinct.java index d1317f14ff..d3f0b72886 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableDistinct.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableDistinct.java @@ -106,18 +106,13 @@ public boolean test(K t) { public void subscribeActual(Observer t) { Predicate coll; try { - coll = predicateSupplier.call(); + coll = ObjectHelper.requireNonNull(predicateSupplier.call(), "predicateSupplier returned null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); EmptyDisposable.error(e, t); return; } - if (coll == null) { - EmptyDisposable.error(new NullPointerException("predicateSupplier returned null"), t); - return; - } - source.subscribe(new DistinctObserver(t, keySelector, coll)); } @@ -158,7 +153,7 @@ public void onNext(T t) { K key; try { - key = keySelector.apply(t); + key = ObjectHelper.requireNonNull(keySelector.apply(t), "Null key supplied"); } catch (Throwable e) { Exceptions.throwIfFatal(e); s.dispose(); @@ -166,13 +161,6 @@ public void onNext(T t) { return; } - if (key == null) { - s.dispose(); - actual.onError(new NullPointerException("Null key supplied")); - return; - } - - boolean b; try { b = predicate.test(key); diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableError.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableError.java index 6123e285fc..4157e10037 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableError.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableError.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.observable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.concurrent.Callable; import io.reactivex.*; @@ -28,14 +29,11 @@ public ObservableError(Callable errorSupplier) { public void subscribeActual(Observer s) { Throwable error; try { - error = errorSupplier.call(); + error = ObjectHelper.requireNonNull(errorSupplier.call(), "Callable returned null throwable. Null values are generally not allowed in 2.x operators and sources."); } catch (Throwable t) { Exceptions.throwIfFatal(t); error = t; } - if (error == null) { - error = new NullPointerException("Callable returned null throwable. Null values are generally not allowed in 2.x operators and sources."); - } EmptyDisposable.error(error, s); } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFromCallable.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFromCallable.java index bd33a2e66b..f3ae6d8383 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableFromCallable.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFromCallable.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.observable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.concurrent.Callable; import io.reactivex.*; @@ -33,7 +34,7 @@ public void subscribeActual(Observer s) { } T value; try { - value = callable.call(); + value = ObjectHelper.requireNonNull(callable.call(), "Callable returned null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); if (!d.isDisposed()) { @@ -44,11 +45,7 @@ public void subscribeActual(Observer s) { if (d.isDisposed()) { return; } - if (value != null) { - s.onNext(value); - s.onComplete(); - } else { - s.onError(new NullPointerException("Callable returned null")); - } + s.onNext(value); + s.onComplete(); } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFromFuture.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFromFuture.java index bafcb3025c..6fa6a68ded 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableFromFuture.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFromFuture.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.observable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.concurrent.*; import io.reactivex.*; @@ -37,7 +38,7 @@ public void subscribeActual(Observer s) { if (!d.isDisposed()) { T v; try { - v = unit != null ? future.get(timeout, unit) : future.get(); + v = ObjectHelper.requireNonNull(unit != null ? future.get(timeout, unit) : future.get(), "Future returned null"); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); if (!d.isDisposed()) { @@ -48,12 +49,8 @@ public void subscribeActual(Observer s) { future.cancel(true); // TODO ?? not sure about this } if (!d.isDisposed()) { - if (v != null) { - s.onNext(v); - s.onComplete(); - } else { - s.onError(new NullPointerException("Future returned null")); - } + s.onNext(v); + s.onComplete(); } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableGenerate.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableGenerate.java index f5bcc6516d..f9fc5c8313 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableGenerate.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableGenerate.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.observable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.concurrent.Callable; import io.reactivex.*; @@ -142,11 +143,8 @@ public void onNext(T t) { @Override public void onError(Throwable t) { - if (t == null) { - t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); - } terminate = true; - actual.onError(t); + actual.onError(ObjectHelper.requireNonNull(t, "onError called with null. Null values are generally not allowed in 2.x operators and sources.")); } @Override diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableGroupBy.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableGroupBy.java index 83aeb7f2d4..f899bd6062 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableGroupBy.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableGroupBy.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.observable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.*; @@ -113,7 +114,7 @@ public void onNext(T t) { V v; try { - v = valueSelector.apply(t); + v = ObjectHelper.requireNonNull(valueSelector.apply(t), "The value supplied is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); s.dispose(); @@ -121,12 +122,6 @@ public void onNext(T t) { return; } - if (v == null) { - s.dispose(); - onError(new NullPointerException("The value supplied is null")); - return; - } - group.onNext(v); } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableMapNotification.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableMapNotification.java index 2cc64003cc..cfec07fbb9 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableMapNotification.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableMapNotification.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.observable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.concurrent.Callable; import io.reactivex.*; @@ -87,18 +88,13 @@ public void onNext(T t) { ObservableSource p; try { - p = onNextMapper.apply(t); + p = ObjectHelper.requireNonNull(onNextMapper.apply(t), "The onNext publisher returned is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); actual.onError(e); return; } - if (p == null) { - actual.onError(new NullPointerException("The onNext publisher returned is null")); - return; - } - actual.onNext(p); } @@ -107,18 +103,13 @@ public void onError(Throwable t) { ObservableSource p; try { - p = onErrorMapper.apply(t); + p = ObjectHelper.requireNonNull(onErrorMapper.apply(t), "The onError publisher returned is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); actual.onError(e); return; } - if (p == null) { - actual.onError(new NullPointerException("The onError publisher returned is null")); - return; - } - actual.onNext(p); actual.onComplete(); } @@ -128,18 +119,13 @@ public void onComplete() { ObservableSource p; try { - p = onCompleteSupplier.call(); + p = ObjectHelper.requireNonNull(onCompleteSupplier.call(), "The onComplete publisher returned is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); actual.onError(e); return; } - if (p == null) { - actual.onError(new NullPointerException("The onComplete publisher returned is null")); - return; - } - actual.onNext(p); actual.onComplete(); } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableScan.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableScan.java index 84a6f3ff60..86f323b295 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableScan.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableScan.java @@ -18,6 +18,7 @@ import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.BiFunction; import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; public final class ObservableScan extends AbstractObservableWithUpstream { final BiFunction accumulator; @@ -75,7 +76,7 @@ public void onNext(T t) { T u; try { - u = accumulator.apply(v, t); + u = ObjectHelper.requireNonNull(accumulator.apply(v, t), "The value returned by the accumulator is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); s.dispose(); @@ -83,12 +84,6 @@ public void onNext(T t) { return; } - if (u == null) { - s.dispose(); - a.onError(new NullPointerException("The value returned by the accumulator is null")); - return; - } - value = u; a.onNext(u); } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableScanSeed.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableScanSeed.java index e6649f335f..78b73de5e2 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableScanSeed.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableScanSeed.java @@ -12,6 +12,7 @@ */ package io.reactivex.internal.operators.observable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.concurrent.Callable; import io.reactivex.*; @@ -36,18 +37,13 @@ public void subscribeActual(Observer t) { R r; try { - r = seedSupplier.call(); + r = ObjectHelper.requireNonNull(seedSupplier.call(), "The seed supplied is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); EmptyDisposable.error(e, t); return; } - if (r == null) { - EmptyDisposable.error(new NullPointerException("The seed supplied is null"), t); - return; - } - source.subscribe(new ScanSeedObserver(t, accumulator, r)); } @@ -96,7 +92,7 @@ public void onNext(T t) { R u; try { - u = accumulator.apply(v, t); + u = ObjectHelper.requireNonNull(accumulator.apply(v, t), "The accumulator returned a null value"); } catch (Throwable e) { Exceptions.throwIfFatal(e); s.dispose(); @@ -104,12 +100,6 @@ public void onNext(T t) { return; } - if (u == null) { - s.dispose(); - onError(new NullPointerException("The accumulator returned a null value")); - return; - } - value = u; actual.onNext(u); diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableSwitchMap.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableSwitchMap.java index 78318a0c41..c1f34feba3 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableSwitchMap.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableSwitchMap.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.observable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.concurrent.atomic.*; import io.reactivex.*; @@ -106,7 +107,7 @@ public void onNext(T t) { ObservableSource p; try { - p = mapper.apply(t); + p = ObjectHelper.requireNonNull(mapper.apply(t), "The ObservableSource returned is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); s.dispose(); @@ -114,12 +115,6 @@ public void onNext(T t) { return; } - if (p == null) { - s.dispose(); - onError(new NullPointerException("The publisher returned is null")); - return; - } - SwitchMapInnerObserver nextInner = new SwitchMapInnerObserver(this, c, bufferSize); for (;;) { diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeout.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeout.java index ced337acbe..50bf37217f 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeout.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeout.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.observable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; @@ -111,7 +112,7 @@ public void onNext(T t) { ObservableSource p; try { - p = itemTimeoutIndicator.apply(t); + p = ObjectHelper.requireNonNull(itemTimeoutIndicator.apply(t), "The ObservableSource returned is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); dispose(); @@ -119,12 +120,6 @@ public void onNext(T t) { return; } - if (p == null) { - dispose(); - actual.onError(new NullPointerException("The ObservableSource returned is null")); - return; - } - TimeoutInnerObserver tis = new TimeoutInnerObserver(this, idx); if (compareAndSet(d, tis)) { @@ -279,18 +274,13 @@ public void onNext(T t) { ObservableSource p; try { - p = itemTimeoutIndicator.apply(t); + p = ObjectHelper.requireNonNull(itemTimeoutIndicator.apply(t), "The ObservableSource returned is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); actual.onError(e); return; } - if (p == null) { - actual.onError(new NullPointerException("The ObservableSource returned is null")); - return; - } - TimeoutInnerObserver tis = new TimeoutInnerObserver(this, idx); if (compareAndSet(d, tis)) { diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySelector.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySelector.java index ea0c343ec6..9d1cc782fc 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySelector.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySelector.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.observable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.*; import java.util.concurrent.atomic.*; @@ -252,7 +253,7 @@ void drainLoop() { ObservableSource p; try { - p = close.apply(wo.open); + p = ObjectHelper.requireNonNull(close.apply(wo.open), "The ObservableSource supplied is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); cancelled = true; @@ -260,12 +261,6 @@ void drainLoop() { continue; } - if (p == null) { - cancelled = true; - a.onError(new NullPointerException("The ObservableSource supplied is null")); - continue; - } - OperatorWindowBoundaryCloseObserver cl = new OperatorWindowBoundaryCloseObserver(this, w); if (resources.add(cl)) { diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySupplier.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySupplier.java index bfbe63ec60..c5218df97d 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySupplier.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySupplier.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.observable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.concurrent.Callable; import java.util.concurrent.atomic.*; @@ -85,7 +86,7 @@ public void onSubscribe(Disposable s) { ObservableSource p; try { - p = other.call(); + p = ObjectHelper.requireNonNull(other.call(), "The first window ObservableSource supplied is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); s.dispose(); @@ -93,12 +94,6 @@ public void onSubscribe(Disposable s) { return; } - if (p == null) { - s.dispose(); - a.onError(new NullPointerException("The first window ObservableSource supplied is null")); - return; - } - UnicastSubject w = UnicastSubject.create(bufferSize); window = w; @@ -232,7 +227,7 @@ void drainLoop() { ObservableSource p; try { - p = other.call(); + p = ObjectHelper.requireNonNull(other.call(), "The ObservableSource supplied is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); DisposableHelper.dispose(boundary); @@ -240,12 +235,6 @@ void drainLoop() { return; } - if (p == null) { - DisposableHelper.dispose(boundary); - a.onError(new NullPointerException("The ObservableSource supplied is null")); - return; - } - w = UnicastSubject.create(bufferSize); windows.getAndIncrement(); diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableZip.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableZip.java index 43e5e8c549..0921538132 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableZip.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableZip.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.observable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.Arrays; import java.util.concurrent.atomic.*; @@ -182,7 +183,7 @@ public void drain() { R v; try { - v = zipper.apply(os.clone()); + v = ObjectHelper.requireNonNull(zipper.apply(os.clone()), "The zipper returned a null value"); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); clear(); @@ -190,12 +191,6 @@ public void drain() { return; } - if (v == null) { - clear(); - a.onError(new NullPointerException("The zipper returned a null value")); - return; - } - a.onNext(v); Arrays.fill(os, null); diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableZipIterable.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableZipIterable.java index 27e75eb5e9..a66810e459 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableZipIterable.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableZipIterable.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.observable; +import io.reactivex.internal.functions.ObjectHelper; import java.util.Iterator; import io.reactivex.*; @@ -40,18 +41,13 @@ public void subscribeActual(Observer t) { Iterator it; try { - it = other.iterator(); + it = ObjectHelper.requireNonNull(other.iterator(), "The iterator returned by other is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); EmptyDisposable.error(e, t); return; } - if (it == null) { - EmptyDisposable.error(new NullPointerException("The iterator returned by other is null"), t); - return; - } - boolean b; try { @@ -115,29 +111,19 @@ public void onNext(T t) { U u; try { - u = iterator.next(); + u = ObjectHelper.requireNonNull(iterator.next(), "The iterator returned a null value"); } catch (Throwable e) { Exceptions.throwIfFatal(e); error(e); return; } - if (u == null) { - error(new NullPointerException("The iterator returned a null value")); - return; - } - V v; try { - v = zipper.apply(t, u); + v = ObjectHelper.requireNonNull(zipper.apply(t, u), "The zipper function returned a null value"); } catch (Throwable e) { Exceptions.throwIfFatal(e); - error(new NullPointerException("The iterator returned a null value")); - return; - } - - if (v == null) { - error(new NullPointerException("The zipper function returned a null value")); + error(e); return; } diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleError.java b/src/main/java/io/reactivex/internal/operators/single/SingleError.java index 8c784299ce..5dabcdb9a6 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleError.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleError.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.single; +import io.reactivex.internal.functions.ObjectHelper; import java.util.concurrent.Callable; import io.reactivex.*; @@ -32,16 +33,12 @@ protected void subscribeActual(SingleObserver s) { Throwable error; try { - error = errorSupplier.call(); + error = ObjectHelper.requireNonNull(errorSupplier.call(), "Callable returned null throwable. Null values are generally not allowed in 2.x operators and sources."); } catch (Throwable e) { Exceptions.throwIfFatal(e); error = e; } - if (error == null) { - error = new NullPointerException("Callable returned null throwable. Null values are generally not allowed in 2.x operators and sources."); - } - EmptyDisposable.error(error, s); }