Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance NPE in operators that invoke functions #2262

Merged
merged 2 commits into from
Jul 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ final class CompletableDefer extends Completable implements CompletableSource {
protected void handleSubscribe(Subscriber subscriber) {
final Completable completable;
try {
completable = requireNonNull(completableFactory.get());
completable = requireNonNull(completableFactory.get(),
() -> "Factory " + completableFactory + "returned null");
} catch (Throwable cause) {
deliverErrorFromSource(subscriber, cause);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public Subscriber<? super T> apply(final Subscriber<? super T> subscriber) {
return new Subscriber<T>() {
@Nullable
private Subscription subscription;
private final Predicate<? super T> predicate = requireNonNull(filterSupplier.get());
private final Predicate<? super T> predicate = requireNonNull(filterSupplier.get(),
() -> "Supplier " + filterSupplier + " returned null");

@Override
public void onSubscribe(Subscription s) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ public void onSubscribe(Subscription s) {
public void onNext(T u) {
// If Function.apply(...) throws we just propagate it to the caller which is responsible to terminate
// its subscriber and cancel the subscription.
currentIterator = requireNonNull(mapper.apply(u).iterator());
currentIterator = requireNonNull(mapper.apply(u).iterator(),
() -> "Iterator from mapper " + mapper + " is null");
tryDrainIterator(ErrorHandlingStrategyInDrain.Throw);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ final class PublisherDefer<T> extends Publisher<T> implements PublisherSource<T>
protected void handleSubscribe(Subscriber<? super T> subscriber) {
final Publisher<? extends T> publisher;
try {
publisher = requireNonNull(publisherFactory.get());
publisher = requireNonNull(publisherFactory.get(), () -> "Factory " + publisherFactory + " returned null");
} catch (Throwable cause) {
deliverErrorFromSource(subscriber, cause);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ public void onSubscribe(final Subscription s) {

@Override
public void onNext(@Nullable final T t) {
final Publisher<? extends R> publisher = requireNonNull(source.mapper.apply(t));
final Publisher<? extends R> publisher = requireNonNull(source.mapper.apply(t),
() -> "Mapper " + source.mapper + " returned null");
FlatMapPublisherSubscriber<T, R> subscriber = new FlatMapPublisherSubscriber<>(this);
if (cancellableSet.add(subscriber) && activeMappedSourcesUpdater.incrementAndGet(this) > 0) {
publisher.subscribeInternal(subscriber);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ public void onSubscribe(Subscription s) {

@Override
public void onNext(T t) {
final Single<? extends R> next = requireNonNull(source.mapper.apply(t));
final Single<? extends R> next = requireNonNull(source.mapper.apply(t),
() -> "Mapper " + source.mapper + " returned null");
if (activeMappedSourcesUpdater.incrementAndGet(this) > 0) {
next.subscribeInternal(new FlatMapSingleSubscriber());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private final class GroupBySubscriber extends AbstractGroupBySubscriber<Key, T>

@Override
public void onNext(@Nullable final T t) {
onNext(requireNonNull(keySelector.apply(t)), t);
onNext(requireNonNull(keySelector.apply(t), () -> "Selector " + keySelector + " returned null"), t);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ private final class GroupBySubscriber extends AbstractGroupBySubscriber<Key, T>

@Override
public void onNext(@Nullable final T t) {
final Iterator<? extends Key> keys = requireNonNull(keySelector.apply(t));
final Iterator<? extends Key> keys = requireNonNull(keySelector.apply(t),
() -> "Selector " + keySelector + " returned null");
keys.forEachRemaining(key -> onNext(key, t));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ Subscription decorate(Subscription s) {
private void redoIfRequired(TerminalNotification terminalNotification) {
final Completable redoDecider;
try {
redoDecider = requireNonNull(redoPublisher.shouldRedo.apply(++redoCount, terminalNotification));
redoDecider = requireNonNull(redoPublisher.shouldRedo.apply(++redoCount, terminalNotification),
() -> "Redo decider " + redoPublisher.shouldRedo + " returned null");
} catch (Throwable cause) {
Throwable originalCause = terminalNotification.cause();
if (originalCause != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ public void onSuccess(@Nullable final T result) {
final Completable completable;
try {
subscriber.onNext(result);
completable = requireNonNull(outer.repeater.apply(++repeatCount, result));
completable = requireNonNull(outer.repeater.apply(++repeatCount, result),
() -> "Repeat decider " + outer.repeater + " returned null");
} catch (Throwable cause) {
onErrorInternal(cause);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ public void onSuccess(@Nullable T t) {
public void onError(Throwable t) {
final Completable retryDecider;
try {
retryDecider = requireNonNull(retrySingle.shouldRetry.apply(++retryCount, t));
retryDecider = requireNonNull(retrySingle.shouldRetry.apply(++retryCount, t),
() -> "Retry decider " + retrySingle.shouldRetry + " returned null");
} catch (Throwable cause) {
target.onError(addSuppressed(cause, t));
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ final class SingleDefer<T> extends Single<T> implements SingleSource<T> {
protected void handleSubscribe(Subscriber<? super T> subscriber) {
final Single<? extends T> single;
try {
single = requireNonNull(singleFactory.get());
single = requireNonNull(singleFactory.get(), () -> "Factory " + singleFactory + " returned null");
} catch (Throwable cause) {
deliverErrorFromSource(subscriber, cause);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void onComplete() {
public void onSuccess(@Nullable T result) {
final Completable next;
try {
next = requireNonNull(nextFactory.apply(result));
next = requireNonNull(nextFactory.apply(result), () -> "Mapper " + nextFactory + " returned null");
} catch (Throwable cause) {
subscriber.onError(cause);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void onSubscribe(Subscription s) {
public void onSuccess(@Nullable T result) {
final Publisher<? extends R> next;
try {
next = requireNonNull(nextFactory.apply(result));
next = requireNonNull(nextFactory.apply(result), () -> "Mapper " + nextFactory + " returned null");
} catch (Throwable cause) {
subscriber.onError(cause);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void onSuccess(@Nullable T result) {
// new object.
final Single<? extends R> next;
try {
next = requireNonNull(nextFactory.apply(result));
next = requireNonNull(nextFactory.apply(result), () -> "Mapper " + nextFactory + " returned null");
} catch (Throwable cause) {
subscriber.onError(cause);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ static TimeoutSubscriber newInstance(TimeoutCompletable parent, Subscriber targe
// it enabled for the Subscriber, however the user explicitly specifies the Executor with this operator
// so they can wrap the Executor in this case.
localTimerCancellable = requireNonNull(
parent.timeoutExecutor.schedule(s::timerFires, parent.durationNs, NANOSECONDS));
parent.timeoutExecutor.schedule(s::timerFires, parent.durationNs, NANOSECONDS),
() -> "Executor.schedule " + parent.timeoutExecutor + " returned null");
} catch (Throwable cause) {
localTimerCancellable = IGNORE_CANCEL;
// We must set this to ignore so there are no further interactions with Subscriber in the future.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ private void timerFires() {
final Cancellable nextTimerCancellable;
try {
nextTimerCancellable = requireNonNull(
parent.timeoutExecutor.schedule(this::timerFires, nextTimeoutNs, NANOSECONDS));
parent.timeoutExecutor.schedule(this::timerFires, nextTimeoutNs, NANOSECONDS),
() -> "Executor.schedule " + parent.timeoutExecutor + " returned null");
} catch (Throwable cause) {
offloadTimeout(cause);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ static <X> TimeoutSubscriber<X> newInstance(TimeoutSingle<X> parent, Subscriber<
// it enabled for the Subscriber, however the user explicitly specifies the Executor with this operator
// so they can wrap the Executor in this case.
localTimerCancellable = requireNonNull(
parent.timeoutExecutor.schedule(s::timerFires, parent.durationNs, NANOSECONDS));
parent.timeoutExecutor.schedule(s::timerFires, parent.durationNs, NANOSECONDS),
() -> "Executor.schedule " + parent.timeoutExecutor + " returned null");
} catch (Throwable cause) {
localTimerCancellable = IGNORE_CANCEL;
// We must set this to ignore so there are no further interactions with Subscriber in the future.
Expand Down