Skip to content

Commit

Permalink
Fix issue ReactiveX#1451
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Jul 17, 2014
1 parent cb75b97 commit 53b4659
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,30 +33,45 @@ public OperatorDoOnEach(Observer<? super T> doOnEachObserver) {
@Override
public Subscriber<? super T> call(final Subscriber<? super T> observer) {
return new Subscriber<T>(observer) {

private boolean done = false;

@Override
public void onCompleted() {
if (done) {
return;
}
try {
doOnEachObserver.onCompleted();
} catch (Throwable e) {
onError(e);
return;
}
observer.onCompleted();
// Set `done` here so that the error in `doOnEachObserver.onCompleted()` can be noticed by observer
done = true;
}

@Override
public void onError(Throwable e) {
if (done) {
return;
}
try {
doOnEachObserver.onError(e);
} catch (Throwable e2) {
observer.onError(e2);
return;
}
observer.onError(e);
done = true;
}

@Override
public void onNext(T value) {
if (done) {
return;
}
try {
doOnEachObserver.onNext(value);
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package rx.internal.operators;

import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
Expand All @@ -30,6 +31,9 @@
import rx.functions.Action1;
import rx.functions.Func1;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class OperatorDoOnEachTest {

@Mock
Expand Down Expand Up @@ -114,4 +118,55 @@ public void call(String s) {

}

@Test
public void testIssue1451Case1() {
// https://github.com/Netflix/RxJava/issues/1451
int[] nums = {1, 2, 3};
final AtomicInteger count = new AtomicInteger();
for (final int n : nums) {
Observable
.from(Boolean.TRUE, Boolean.FALSE)
.takeWhile(new Func1<Boolean, Boolean>() {
@Override
public Boolean call(Boolean value) {
return value;
}
})
.toList()
.doOnNext(new Action1<List<Boolean>>() {
@Override
public void call(List<Boolean> booleans) {
count.incrementAndGet();
}
})
.subscribe();
}
assertEquals(nums.length, count.get());
}

@Test
public void testIssue1451Case2() {
// https://github.com/Netflix/RxJava/issues/1451
int[] nums = {1, 2, 3};
final AtomicInteger count = new AtomicInteger();
for (final int n : nums) {
Observable
.from(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE)
.takeWhile(new Func1<Boolean, Boolean>() {
@Override
public Boolean call(Boolean value) {
return value;
}
})
.toList()
.doOnNext(new Action1<List<Boolean>>() {
@Override
public void call(List<Boolean> booleans) {
count.incrementAndGet();
}
})
.subscribe();
}
assertEquals(nums.length, count.get());
}
}

2 comments on commit 53b4659

@akarnokd
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just hides the issue in takeWhile and/or from: calling onCompleted() on L72 after the onNext() has unsubscribed the upstream. Could you fix:

  • OnSubscribeFromIterable L73 by surrounding the o.onCompleted() with if (!o.isUnsubscribed()) { }. Same for OnSubscribeRange L67,
  • OperatorTakeWhile move isSelected into an instance variable and check for it in onCompleted and onError,
  • DoOnEach: I'd not modify this but if so, put the done = true; assignment before the downstream onXYZ() is called

instead?

@zsxwing
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DoOnEach: I'd not modify this but if so, put the done = true; assignment before the downstream onXYZ() is called

Considering we don't check isUnsubscribed everywhere, the upstream events may come after calling unsubscribe. So I think the subscriber of DoOnEach should be similar to SafeSubscriber.

Please sign in to comment.