Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Subject NPE fixes, add UnicastProcessor TCK #5760

Merged
merged 4 commits into from
Dec 15, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 5 additions & 18 deletions src/main/java/io/reactivex/processors/AsyncProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicReference;

import org.reactivestreams.*;

import io.reactivex.annotations.*;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.subscriptions.DeferredScalarSubscription;
import io.reactivex.plugins.RxJavaPlugins;
import org.reactivestreams.*;

/**
* Processor that emits the very last value followed by a completion event or the received error
Expand Down Expand Up @@ -77,32 +79,17 @@ public void onSubscribe(Subscription s) {

@Override
public void onNext(T t) {
ObjectHelper.requireNonNull(t, "onNext called with null. Null values are generally not allowed in 2.x operators and sources.");
if (subscribers.get() == TERMINATED) {
return;
}
if (t == null) {
nullOnNext();
return;
}
value = t;
}

@SuppressWarnings("unchecked")
void nullOnNext() {
value = null;
Throwable ex = new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.");
error = ex;
for (AsyncSubscription<T> as : subscribers.getAndSet(TERMINATED)) {
as.onError(ex);
}
}

@SuppressWarnings("unchecked")
@Override
public void onError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
ObjectHelper.requireNonNull(t, "onError called with null. Null values are generally not allowed in 2.x operators and sources.");
if (subscribers.get() == TERMINATED) {
RxJavaPlugins.onError(t);
return;
Expand Down
10 changes: 3 additions & 7 deletions src/main/java/io/reactivex/processors/BehaviorProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,8 @@ public void onSubscribe(Subscription s) {

@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😰😰😰 well this is a little bit breaking change, but since it was always part of RS spec I guess it falls into patch category for RxJava

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You were not supposed to call onNext with null anyway and there is no reason to take this extra long path to get the end subscriber a NullPointerException via onError.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep I understand :)

Just noted that it's a little bit breaking behavior for existing users, but since RS doesn't allow nulls anyway we can make this change as "patch" rather than wait for 3.x

return;
}
ObjectHelper.requireNonNull(t, "onNext called with null. Null values are generally not allowed in 2.x operators and sources.");

if (terminalEvent.get() != null) {
return;
}
Expand All @@ -191,9 +189,7 @@ public void onNext(T t) {

@Override
public void onError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
ObjectHelper.requireNonNull(t, "onError called with null. Null values are generally not allowed in 2.x operators and sources.");
if (!terminalEvent.compareAndSet(null, t)) {
RxJavaPlugins.onError(t);
return;
Expand Down
47 changes: 35 additions & 12 deletions src/main/java/io/reactivex/processors/FlowableProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@

package io.reactivex.processors;

import io.reactivex.*;
import io.reactivex.annotations.NonNull;
import org.reactivestreams.Processor;

import io.reactivex.*;
import io.reactivex.annotations.*;

/**
* Represents a Subscriber and a Flowable (Publisher) at the same time, allowing
* multicasting events from a single source to multiple child Subscribers.
Expand All @@ -28,49 +29,71 @@
public abstract class FlowableProcessor<T> extends Flowable<T> implements Processor<T, T>, FlowableSubscriber<T> {

/**
* Returns true if the subject has subscribers.
* Returns true if the FlowableProcessor has subscribers.
* <p>The method is thread-safe.
* @return true if the subject has subscribers
* @return true if the FlowableProcessor has subscribers
*/
public abstract boolean hasSubscribers();

/**
* Returns true if the subject has reached a terminal state through an error event.
* Returns true if the FlowableProcessor has reached a terminal state through an error event.
* <p>The method is thread-safe.
* @return true if the subject has reached a terminal state through an error event
* @return true if the FlowableProcessor has reached a terminal state through an error event
* @see #getThrowable()
* @see #hasComplete()
*/
public abstract boolean hasThrowable();

/**
* Returns true if the subject has reached a terminal state through a complete event.
* Returns true if the FlowableProcessor has reached a terminal state through a complete event.
* <p>The method is thread-safe.
* @return true if the subject has reached a terminal state through a complete event
* @return true if the FlowableProcessor has reached a terminal state through a complete event
* @see #hasThrowable()
*/
public abstract boolean hasComplete();

/**
* Returns the error that caused the Subject to terminate or null if the Subject
* Returns the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor
* hasn't terminated yet.
* <p>The method is thread-safe.
* @return the error that caused the Subject to terminate or null if the Subject
* @return the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor
* hasn't terminated yet
*/
@Nullable
public abstract Throwable getThrowable();

/**
* Wraps this Subject and serializes the calls to the onSubscribe, onNext, onError and
* Wraps this FlowableProcessor and serializes the calls to the onSubscribe, onNext, onError and
* onComplete methods, making them thread-safe.
* <p>The method is thread-safe.
* @return the wrapped and serialized subject
* @return the wrapped and serialized FlowableProcessor
*/
@NonNull
@CheckReturnValue
public final FlowableProcessor<T> toSerialized() {
if (this instanceof SerializedProcessor) {
return this;
}
return new SerializedProcessor<T>(this);
}

/**
* Wraps this FlowableProcessor and makes sure if all subscribers cancel
* their subscriptions, the upstream's Subscription gets cancelled as well.
* <p>
* This operator is similar to {@link io.reactivex.flowables.ConnectableFlowable#refCount()}
* except the first Subscriber doesn't trigger any sort of connection; that happens
* when the resulting FlowableProcessor is subscribed to a Publisher manually.
* @return the wrapped and reference-counted FlowableProcessor
* @since 2.1.8 - experimental
*/
@NonNull
@CheckReturnValue
@Experimental
public final FlowableProcessor<T> refCount() {
if (this instanceof RefCountProcessor) {
return this;
}
return new RefCountProcessor<T>(this);
}
}
10 changes: 3 additions & 7 deletions src/main/java/io/reactivex/processors/PublishProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.reactivex.annotations.*;
import io.reactivex.exceptions.MissingBackpressureException;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.internal.util.BackpressureHelper;
import io.reactivex.plugins.RxJavaPlugins;
Expand Down Expand Up @@ -186,13 +187,10 @@ public void onSubscribe(Subscription s) {

@Override
public void onNext(T t) {
ObjectHelper.requireNonNull(t, "onNext called with null. Null values are generally not allowed in 2.x operators and sources.");
if (subscribers.get() == TERMINATED) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
for (PublishSubscription<T> s : subscribers.get()) {
s.onNext(t);
}
Expand All @@ -201,13 +199,11 @@ public void onNext(T t) {
@SuppressWarnings("unchecked")
@Override
public void onError(Throwable t) {
ObjectHelper.requireNonNull(t, "onError called with null. Null values are generally not allowed in 2.x operators and sources.");
if (subscribers.get() == TERMINATED) {
RxJavaPlugins.onError(t);
return;
}
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
error = t;

for (PublishSubscription<T> s : subscribers.getAndSet(TERMINATED)) {
Expand Down
Loading