Skip to content

Commit

Permalink
Fix related classes for issue ReactiveX#1451
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Jul 17, 2014
1 parent 53b4659 commit 6bbd921
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ public void request(long n) {
}
o.onNext(it.next());
}
o.onCompleted();
if (!o.isUnsubscribed()) {
o.onCompleted();
}
} else if(n > 0) {
// backpressure is requested
long _c = REQUESTED_UPDATER.getAndAdd(this, n);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ public void request(long n) {
}
o.onNext((int) i);
}
o.onCompleted();
if (!o.isUnsubscribed()) {
o.onCompleted();
}
} else if (n > 0) {
// backpressure is requested
long _c = REQUESTED_UPDATER.getAndAdd(this, n);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,24 +47,24 @@ public void onCompleted() {
onError(e);
return;
}
observer.onCompleted();
// Set `done` here so that the error in `doOnEachObserver.onCompleted()` can be noticed by observer
done = true;
observer.onCompleted();
}

@Override
public void onError(Throwable e) {
if (done) {
return;
}
done = true;
try {
doOnEachObserver.onError(e);
} catch (Throwable e2) {
observer.onError(e2);
return;
}
observer.onError(e);
done = true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,32 +49,40 @@ public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {

private int counter = 0;

private boolean done = false;

@Override
public void onNext(T args) {
boolean isSelected;
try {
isSelected = predicate.call(args, counter++);
} catch (Throwable e) {
done = true;
subscriber.onError(e);
unsubscribe();
return;
}
if (isSelected) {
subscriber.onNext(args);
} else {
done = true;
subscriber.onCompleted();
unsubscribe();
}
}

@Override
public void onCompleted() {
subscriber.onCompleted();
if (!done) {
subscriber.onCompleted();
}
}

@Override
public void onError(Throwable e) {
subscriber.onError(e);
if (!done) {
subscriber.onError(e);
}
}

};
Expand Down

0 comments on commit 6bbd921

Please sign in to comment.