Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.x: PublishSubject fail-fast when backpressured #4225

Merged
merged 2 commits into from
Jul 27, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
286 changes: 231 additions & 55 deletions src/main/java/rx/subjects/PublishSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
package rx.subjects;

import java.util.*;
import java.util.concurrent.atomic.*;

import rx.*;
import rx.Observer;
import rx.annotations.Beta;
import rx.exceptions.Exceptions;
import rx.functions.Action1;
import rx.internal.operators.NotificationLite;
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
import rx.exceptions.*;
import rx.internal.operators.BackpressureUtils;

/**
* Subject that, once an {@link Observer} has subscribed, emits all subsequently observed items to the
Expand Down Expand Up @@ -50,8 +50,8 @@
* the type of items observed and emitted by the Subject
*/
public final class PublishSubject<T> extends Subject<T, T> {
final SubjectSubscriptionManager<T> state;
private final NotificationLite<T> nl = NotificationLite.instance();

final PublishSubjectState<T> state;

/**
* Creates and returns a new {@code PublishSubject}.
Expand All @@ -60,63 +60,33 @@ public final class PublishSubject<T> extends Subject<T, T> {
* @return the new {@code PublishSubject}
*/
public static <T> PublishSubject<T> create() {
final SubjectSubscriptionManager<T> state = new SubjectSubscriptionManager<T>();
state.onTerminated = new Action1<SubjectObserver<T>>() {

@Override
public void call(SubjectObserver<T> o) {
o.emitFirst(state.getLatest(), state.nl);
}

};
return new PublishSubject<T>(state, state);
return new PublishSubject<T>(new PublishSubjectState<T>());
}

protected PublishSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> state) {
super(onSubscribe);
protected PublishSubject(PublishSubjectState<T> state) {
super(state);
this.state = state;
}

@Override
public void onCompleted() {
if (state.active) {
Object n = nl.completed();
for (SubjectObserver<T> bo : state.terminate(n)) {
bo.emitNext(n, state.nl);
}
}

public void onNext(T v) {
state.onNext(v);
}

@Override
public void onError(final Throwable e) {
if (state.active) {
Object n = nl.error(e);
List<Throwable> errors = null;
for (SubjectObserver<T> bo : state.terminate(n)) {
try {
bo.emitNext(n, state.nl);
} catch (Throwable e2) {
if (errors == null) {
errors = new ArrayList<Throwable>();
}
errors.add(e2);
}
}
Exceptions.throwIfAny(errors);
}
public void onError(Throwable e) {
state.onError(e);
}

@Override
public void onNext(T v) {
for (SubjectObserver<T> bo : state.observers()) {
bo.onNext(v);
}
public void onCompleted() {
state.onCompleted();
}


@Override
public boolean hasObservers() {
return state.observers().length > 0;
return state.get().length != 0;
}

/**
Expand All @@ -125,17 +95,15 @@ public boolean hasObservers() {
*/
@Beta
public boolean hasThrowable() {
Object o = state.getLatest();
return nl.isError(o);
return state.get() == PublishSubjectState.TERMINATED && state.error != null;
}
/**
* Check if the Subject has terminated normally.
* @return true if the subject completed normally via {@code onCompleted}
*/
@Beta
public boolean hasCompleted() {
Object o = state.getLatest();
return o != null && !nl.isError(o);
return state.get() == PublishSubjectState.TERMINATED && state.error == null;
}
/**
* Returns the Throwable that terminated the Subject.
Expand All @@ -144,10 +112,218 @@ public boolean hasCompleted() {
*/
@Beta
public Throwable getThrowable() {
Object o = state.getLatest();
if (nl.isError(o)) {
return nl.getError(o);
if (state.get() == PublishSubjectState.TERMINATED) {
return state.error;
}
return null;
}

static final class PublishSubjectState<T>
extends AtomicReference<PublishSubjectProducer<T>[]>
implements OnSubscribe<T>, Observer<T> {

/** */
private static final long serialVersionUID = -7568940796666027140L;

@SuppressWarnings("rawtypes")
static final PublishSubjectProducer[] EMPTY = new PublishSubjectProducer[0];
@SuppressWarnings("rawtypes")
static final PublishSubjectProducer[] TERMINATED = new PublishSubjectProducer[0];

Throwable error;

@SuppressWarnings("unchecked")
public PublishSubjectState() {
lazySet(EMPTY);
}

@Override
public void call(Subscriber<? super T> t) {
PublishSubjectProducer<T> pp = new PublishSubjectProducer<T>(this, t);
t.add(pp);
t.setProducer(pp);

if (add(pp)) {
if (pp.isUnsubscribed()) {
remove(pp);
}
} else {
Throwable ex = error;
if (ex != null) {
t.onError(ex);
} else {
t.onCompleted();
}
}
}


boolean add(PublishSubjectProducer<T> inner) {
for (;;) {
PublishSubjectProducer<T>[] curr = get();
if (curr == TERMINATED) {
return false;
}

int n = curr.length;

@SuppressWarnings("unchecked")
PublishSubjectProducer<T>[] next = new PublishSubjectProducer[n + 1];
System.arraycopy(curr, 0, next, 0, n);

next[n] = inner;
if (compareAndSet(curr, next)) {
return true;
}
}
}

@SuppressWarnings("unchecked")
void remove(PublishSubjectProducer<T> inner) {
for (;;) {
PublishSubjectProducer<T>[] curr = get();
if (curr == TERMINATED || curr == EMPTY) {
return;
}

int n = curr.length;
int j = -1;
for (int i = 0; i < n; i++) {
if (curr[i] == inner) {
j = i;
break;
}
}

if (j < 0) {
return;
}

PublishSubjectProducer<T>[] next;
if (n == 1) {
next = EMPTY;
} else {
next = new PublishSubjectProducer[n - 1];
System.arraycopy(curr, 0, next, 0, j);
System.arraycopy(curr, j + 1, next, j, n - j - 1);
}

if (compareAndSet(curr, next)) {
return;
}
}
}

@Override
public void onNext(T t) {
for (PublishSubjectProducer<T> pp : get()) {
pp.onNext(t);
}
}

@SuppressWarnings("unchecked")
@Override
public void onError(Throwable e) {
error = e;
List<Throwable> errors = null;
for (PublishSubjectProducer<T> pp : getAndSet(TERMINATED)) {
try {
pp.onError(e);
} catch (Throwable ex) {
if (errors == null) {
errors = new ArrayList<Throwable>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: (1)

}
errors.add(ex);
}
}

Exceptions.throwIfAny(errors);
}

@SuppressWarnings("unchecked")
@Override
public void onCompleted() {
for (PublishSubjectProducer<T> pp : getAndSet(TERMINATED)) {
pp.onCompleted();
}
}

}

static final class PublishSubjectProducer<T>
extends AtomicLong
implements Producer, Subscription, Observer<T> {
/** */
private static final long serialVersionUID = 6451806817170721536L;

final PublishSubjectState<T> parent;

final Subscriber<? super T> actual;

long produced;

public PublishSubjectProducer(PublishSubjectState<T> parent, Subscriber<? super T> actual) {
this.parent = parent;
this.actual = actual;
}

@Override
public void request(long n) {
if (BackpressureUtils.validate(n)) {
for (;;) {
long r = get();
if (r == Long.MIN_VALUE) {
return;
}
long u = BackpressureUtils.addCap(r, n);
if (compareAndSet(r, u)) {
return;
}
}
}
}

@Override
public boolean isUnsubscribed() {
return get() == Long.MIN_VALUE;
}

@Override
public void unsubscribe() {
if (getAndSet(Long.MIN_VALUE) != Long.MIN_VALUE) {
parent.remove(this);
}
}

@Override
public void onNext(T t) {
long r = get();
if (r != Long.MIN_VALUE) {
long p = produced;
if (r != p) {
produced = p + 1;
actual.onNext(t);
} else {
unsubscribe();
actual.onError(new MissingBackpressureException("PublishSubject: could not emit value due to lack of requests"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}
}
}

@Override
public void onError(Throwable e) {
long r = get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: inline

if (r != Long.MIN_VALUE) {
actual.onError(e);
}
}

@Override
public void onCompleted() {
long r = get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: inline

if (r != Long.MIN_VALUE) {
actual.onCompleted();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,16 @@ public Integer call() {
ps.onNext(3);
ps.onCompleted();

ts.assertValues(2, 3, 4, 5);
ts.assertNoValues();
ts.assertNoErrors();
ts.assertNotCompleted();

ts.requestMore(1);

ts.assertValue(0);
ts.assertNoErrors();
ts.assertCompleted();

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public Observable<Integer> call(Observable<Integer> o) {
ts.assertError(MissingBackpressureException.class);
ts.assertNotCompleted();

Assert.assertEquals("Queue full?!", ts.getOnErrorEvents().get(0).getMessage());
Assert.assertEquals("PublishSubject: could not emit value due to lack of requests", ts.getOnErrorEvents().get(0).getMessage());
Assert.assertFalse("Source has subscribers?", ps.hasObservers());
}

Expand All @@ -249,7 +249,7 @@ public Observable<Integer> call(Observable<Integer> o) {
ts.assertError(MissingBackpressureException.class);
ts.assertNotCompleted();

Assert.assertEquals("Queue full?!", ts.getOnErrorEvents().get(0).getMessage());
Assert.assertEquals("PublishSubject: could not emit value due to lack of requests", ts.getOnErrorEvents().get(0).getMessage());
Assert.assertFalse("Source has subscribers?", ps.hasObservers());
}
}
4 changes: 3 additions & 1 deletion src/test/java/rx/internal/operators/OperatorTakeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,9 @@ public void testReentrantTake() {

TestSubscriber<Integer> ts = new TestSubscriber<Integer>();

source.take(1).doOnNext(new Action1<Integer>() {
source
.rebatchRequests(2) // take(1) requests 1
.take(1).doOnNext(new Action1<Integer>() {
@Override
public void call(Integer v) {
source.onNext(2);
Expand Down
Loading