Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: add missing null checks on values returned by user functions #5379

Merged
merged 1 commit into from
Jun 1, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ public R poll() throws Exception {
return null;
}
T[] a = (T[])queue.poll();
R r = combiner.apply(a);
R r = ObjectHelper.requireNonNull(combiner.apply(a), "The combiner returned a null value");
((CombineLatestInnerSubscriber<T>)e).requestOne();
return r;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import io.reactivex.*;
import io.reactivex.flowables.ConnectableFlowable;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.functions.*;

/**
* Helper utility class to support Flowable with inner classes.
Expand Down Expand Up @@ -77,7 +77,8 @@ static final class ItemDelayFunction<T, U> implements Function<T, Publisher<T>>

@Override
public Publisher<T> apply(final T v) throws Exception {
return new FlowableTakePublisher<U>(itemDelay.apply(v), 1).map(Functions.justFunction(v)).defaultIfEmpty(v);
Publisher<U> p = ObjectHelper.requireNonNull(itemDelay.apply(v), "The itemDelay returned a null Publisher");
return new FlowableTakePublisher<U>(p, 1).map(Functions.justFunction(v)).defaultIfEmpty(v);
}
}

Expand Down Expand Up @@ -164,7 +165,7 @@ static final class FlatMapWithCombinerOuter<T, R, U> implements Function<T, Publ
@Override
public Publisher<R> apply(final T t) throws Exception {
@SuppressWarnings("unchecked")
Publisher<U> u = (Publisher<U>)mapper.apply(t);
Publisher<U> u = (Publisher<U>)ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null Publisher");
return new FlowableMapPublisher<U, R>(u, new FlatMapWithCombinerInner<U, R, T>(combiner, t));
}
}
Expand All @@ -184,7 +185,7 @@ static final class FlatMapIntoIterable<T, U> implements Function<T, Publisher<U>

@Override
public Publisher<U> apply(T t) throws Exception {
return new FlowableFromIterable<U>(mapper.apply(t));
return new FlowableFromIterable<U>(ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null Iterable"));
}
}

Expand Down Expand Up @@ -317,7 +318,8 @@ static final class ReplayFunction<T, R> implements Function<Flowable<T>, Publish

@Override
public Publisher<R> apply(Flowable<T> t) throws Exception {
return Flowable.fromPublisher(selector.apply(t)).observeOn(scheduler);
Publisher<R> p = ObjectHelper.requireNonNull(selector.apply(t), "The selector returned a null Publisher");
return Flowable.fromPublisher(p).observeOn(scheduler);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.reactivestreams.Subscriber;

import io.reactivex.Flowable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.exceptions.*;
import io.reactivex.functions.Function;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.subscribers.SinglePostCompleteSubscriber;
Expand Down Expand Up @@ -87,7 +87,7 @@ public void onError(Throwable t) {
p = ObjectHelper.requireNonNull(onErrorMapper.apply(t), "The onError publisher returned is null");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
actual.onError(e);
actual.onError(new CompositeException(t, e));
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.reactivex.*;
import io.reactivex.exceptions.*;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.subscriptions.*;
import io.reactivex.plugins.RxJavaPlugins;

Expand Down Expand Up @@ -54,7 +55,7 @@ public void subscribeActual(Subscriber<? super T> s) {

Publisher<? extends T> source;
try {
source = sourceSupplier.apply(resource);
source = ObjectHelper.requireNonNull(sourceSupplier.apply(resource), "The sourceSupplier returned a null Publisher");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public void onNext(T t) {
R v;

try {
v = ObjectHelper.requireNonNull(combiner.apply(objects), "combiner returned a null value");
v = ObjectHelper.requireNonNull(combiner.apply(objects), "The combiner returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
cancel();
Expand Down Expand Up @@ -297,7 +297,7 @@ public void dispose() {
final class SingletonArrayFunc implements Function<T, R> {
@Override
public R apply(T t) throws Exception {
return combiner.apply(new Object[] { t });
return ObjectHelper.requireNonNull(combiner.apply(new Object[] { t }), "The combiner returned a null value");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public void onComplete() {
final class SingletonArrayFunc implements Function<T, R> {
@Override
public R apply(T t) throws Exception {
return zipper.apply(new Object[] { t });
return ObjectHelper.requireNonNull(zipper.apply(new Object[] { t }), "The zipper returned a null value");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Function;
import io.reactivex.internal.disposables.EmptyDisposable;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.operators.maybe.MaybeZipArray.ZipCoordinator;

public final class MaybeZipIterable<T, R> extends Maybe<R> {
Expand Down Expand Up @@ -81,7 +82,7 @@ protected void subscribeActual(MaybeObserver<? super R> observer) {
final class SingletonArrayFunc implements Function<T, R> {
@Override
public R apply(T t) throws Exception {
return zipper.apply(new Object[] { t });
return ObjectHelper.requireNonNull(zipper.apply(new Object[] { t }), "The zipper returned a null value");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ static final class ItemDelayFunction<T, U> implements Function<T, ObservableSour

@Override
public ObservableSource<T> apply(final T v) throws Exception {
return new ObservableTake<U>(itemDelay.apply(v), 1).map(Functions.justFunction(v)).defaultIfEmpty(v);
ObservableSource<U> o = ObjectHelper.requireNonNull(itemDelay.apply(v), "The itemDelay returned a null ObservableSource");
return new ObservableTake<U>(o, 1).map(Functions.justFunction(v)).defaultIfEmpty(v);
}
}

Expand Down Expand Up @@ -165,7 +166,7 @@ static final class FlatMapWithCombinerOuter<T, R, U> implements Function<T, Obse
@Override
public ObservableSource<R> apply(final T t) throws Exception {
@SuppressWarnings("unchecked")
ObservableSource<U> u = (ObservableSource<U>)mapper.apply(t);
ObservableSource<U> u = (ObservableSource<U>)ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
return new ObservableMap<U, R>(u, new FlatMapWithCombinerInner<U, R, T>(combiner, t));
}
}
Expand All @@ -185,7 +186,7 @@ static final class FlatMapIntoIterable<T, U> implements Function<T, ObservableSo

@Override
public ObservableSource<U> apply(T t) throws Exception {
return new ObservableFromIterable<U>(mapper.apply(t));
return new ObservableFromIterable<U>(ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null Iterable"));
}
}

Expand Down Expand Up @@ -319,7 +320,7 @@ static final class ObservableMapper<T,R> implements Function<T,Observable<R>> {
@Override
public Observable<R> apply(T t) throws Exception {
return RxJavaPlugins.onAssembly(new SingleToObservable<R>(
ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null value")));
ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null SingleSource")));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it Observable / ObservableSource?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just open the collapsed section and see what mapper is: final Function<? super T, ? extends SingleSource<? extends R>> mapper;

}

}
Expand Down Expand Up @@ -403,7 +404,8 @@ static final class ReplayFunction<T, R> implements Function<Observable<T>, Obser

@Override
public ObservableSource<R> apply(Observable<T> t) throws Exception {
return Observable.wrap(selector.apply(t)).observeOn(scheduler);
ObservableSource<R> apply = ObjectHelper.requireNonNull(selector.apply(t), "The selector returned a null ObservableSource");
return Observable.wrap(apply).observeOn(scheduler);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@

package io.reactivex.internal.operators.observable;

import io.reactivex.internal.functions.ObjectHelper;
import java.util.concurrent.Callable;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.exceptions.*;
import io.reactivex.functions.Function;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.functions.ObjectHelper;

public final class ObservableMapNotification<T, R> extends AbstractObservableWithUpstream<T, ObservableSource<? extends R>> {

Expand Down Expand Up @@ -106,7 +106,7 @@ public void onError(Throwable t) {
p = ObjectHelper.requireNonNull(onErrorMapper.apply(t), "The onError ObservableSource returned is null");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
actual.onError(e);
actual.onError(new CompositeException(t, e));
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.*;
import io.reactivex.internal.disposables.*;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.fuseable.HasUpstreamObservableSource;
import io.reactivex.internal.util.*;
import io.reactivex.observables.ConnectableObservable;
Expand Down Expand Up @@ -1026,8 +1027,8 @@ protected void subscribeActual(Observer<? super R> child) {
ConnectableObservable<U> co;
ObservableSource<R> observable;
try {
co = connectableFactory.call();
observable = selector.apply(co);
co = ObjectHelper.requireNonNull(connectableFactory.call(), "The connectableFactory returned a null ConnectableObservable");
observable = ObjectHelper.requireNonNull(selector.apply(co), "The selector returned a null ObservableSource");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptyDisposable.error(e, child);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.reactivex.exceptions.*;
import io.reactivex.functions.*;
import io.reactivex.internal.disposables.*;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.plugins.RxJavaPlugins;

public final class ObservableUsing<T, D> extends Observable<T> {
Expand Down Expand Up @@ -53,7 +54,7 @@ public void subscribeActual(Observer<? super T> s) {

ObservableSource<? extends T> source;
try {
source = sourceSupplier.apply(resource);
source = ObjectHelper.requireNonNull(sourceSupplier.apply(resource), "The sourceSupplier returned a null ObservableSource");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ public void dispose() {
final class SingletonArrayFunc implements Function<T, R> {
@Override
public R apply(T t) throws Exception {
return combiner.apply(new Object[] { t });
return ObjectHelper.requireNonNull(combiner.apply(new Object[] { t }), "The combiner returned a null value");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public void onError(Throwable e) {
final class SingletonArrayFunc implements Function<T, R> {
@Override
public R apply(T t) throws Exception {
return zipper.apply(new Object[] { t });
return ObjectHelper.requireNonNull(zipper.apply(new Object[] { t }), "The zipper returned a null value");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Function;
import io.reactivex.internal.disposables.EmptyDisposable;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.operators.single.SingleZipArray.ZipCoordinator;

public final class SingleZipIterable<T, R> extends Single<R> {
Expand Down Expand Up @@ -81,7 +82,7 @@ protected void subscribeActual(SingleObserver<? super R> observer) {
final class SingletonArrayFunc implements Function<T, R> {
@Override
public R apply(T t) throws Exception {
return zipper.apply(new Object[] { t });
return ObjectHelper.requireNonNull(zipper.apply(new Object[] { t }), "The zipper returned a null value");
}
}
}
1 change: 1 addition & 0 deletions src/main/java/io/reactivex/observers/TestObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ public void onNext(T t) {
} catch (Throwable ex) {
// Exceptions.throwIfFatal(e); TODO add fatal exceptions?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about this one?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still undecided.

errors.add(ex);
qs.dispose();
}
return;
}
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/reactivex/subscribers/TestSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ public void onNext(T t) {
} catch (Throwable ex) {
// Exceptions.throwIfFatal(e); TODO add fatal exceptions?
errors.add(ex);
qs.cancel();
}
return;
}
Expand Down
1 change: 1 addition & 0 deletions src/test/java/io/reactivex/flowable/FlowableNullTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -1341,6 +1341,7 @@ public Publisher<Integer> call() {
}

@Test(expected = NullPointerException.class)
@Ignore("No longer crashes with NPE but signals it; tested elsewhere.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there reasons to keep @Ignored tests in this PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had two options: I remove it and I get a question on why or ignore it with comments and get a question why...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha :D

But now you have explanation in GitHub history so maybe remove them?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First it has to get into the 2.x branch for that. Otherwise, there could be a purge of ignored unit test in the future.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remove it and I get a question on why or ignore it with comments and get a question why...

I like this way of thinking. :D

public void flatMapNotificationOnErrorReturnsNull() {
Flowable.error(new TestException()).flatMap(new Function<Object, Publisher<Integer>>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.reactivex.exceptions.*;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.fuseable.QueueFuseable;
import io.reactivex.internal.operators.flowable.FlowableZipTest.ArgsToString;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.processors.PublishProcessor;
Expand Down Expand Up @@ -1551,4 +1552,21 @@ public Integer apply(Integer t1, Integer t2) throws Exception {
pp2.onNext(2);
ts.assertResult(3);
}

@Test
public void fusedNullCheck() {
TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ASYNC);

Flowable.combineLatest(Flowable.just(1), Flowable.just(2), new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer t1, Integer t2) throws Exception {
return null;
}
})
.subscribe(ts);

ts
.assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.ASYNC))
.assertFailureAndMessage(NullPointerException.class, "The combiner returned a null value");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1018,4 +1018,15 @@ public void onComplete() {
}
}

@Test
public void itemDelayReturnsNull() {
Flowable.just(1).delay(new Function<Integer, Publisher<Object>>() {
@Override
public Publisher<Object> apply(Integer t) throws Exception {
return null;
}
})
.test()
.assertFailureAndMessage(NullPointerException.class, "The itemDelay returned a null Publisher");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ public void testFlatMapTransformsOnErrorFuncThrows() {

source.flatMap(just(onNext), funcThrow((Throwable) null, onError), just0(onComplete)).subscribe(o);

verify(o).onError(any(TestException.class));
verify(o).onError(any(CompositeException.class));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

verify that the exceptions are in right order within the CompositeException?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested separately in the FlowableMapNotificationTest.java additions.

verify(o, never()).onNext(any());
verify(o, never()).onComplete();
}
Expand Down Expand Up @@ -997,4 +997,40 @@ public void run() {
}
}
}

@Test
public void iterableMapperFunctionReturnsNull() {
Flowable.just(1)
.flatMapIterable(new Function<Integer, Iterable<Object>>() {
@Override
public Iterable<Object> apply(Integer v) throws Exception {
return null;
}
}, new BiFunction<Integer, Object, Object>() {
@Override
public Object apply(Integer v, Object w) throws Exception {
return v;
}
})
.test()
.assertFailureAndMessage(NullPointerException.class, "The mapper returned a null Iterable");
}

@Test
public void combinerMapperFunctionReturnsNull() {
Flowable.just(1)
.flatMap(new Function<Integer, Publisher<Object>>() {
@Override
public Publisher<Object> apply(Integer v) throws Exception {
return null;
}
}, new BiFunction<Integer, Object, Object>() {
@Override
public Object apply(Integer v, Object w) throws Exception {
return v;
}
})
.test()
.assertFailureAndMessage(NullPointerException.class, "The mapper returned a null Publisher");
}
}
Loading