Skip to content

Commit

Permalink
Merge pull request #1454 from zsxwing/issue1451
Browse files Browse the repository at this point in the history
Fix issue #1451
  • Loading branch information
benjchristensen committed Jul 17, 2014
2 parents 201f54d + 6bbd921 commit 877ee89
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ public void request(long n) {
}
o.onNext(it.next());
}
o.onCompleted();
if (!o.isUnsubscribed()) {
o.onCompleted();
}
} else if(n > 0) {
// backpressure is requested
long _c = REQUESTED_UPDATER.getAndAdd(this, n);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ public void request(long n) {
}
o.onNext((int) i);
}
o.onCompleted();
if (!o.isUnsubscribed()) {
o.onCompleted();
}
} else if (n > 0) {
// backpressure is requested
long _c = REQUESTED_UPDATER.getAndAdd(this, n);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,31 @@ public OperatorDoOnEach(Observer<? super T> doOnEachObserver) {
@Override
public Subscriber<? super T> call(final Subscriber<? super T> observer) {
return new Subscriber<T>(observer) {

private boolean done = false;

@Override
public void onCompleted() {
if (done) {
return;
}
try {
doOnEachObserver.onCompleted();
} catch (Throwable e) {
onError(e);
return;
}
// Set `done` here so that the error in `doOnEachObserver.onCompleted()` can be noticed by observer
done = true;
observer.onCompleted();
}

@Override
public void onError(Throwable e) {
if (done) {
return;
}
done = true;
try {
doOnEachObserver.onError(e);
} catch (Throwable e2) {
Expand All @@ -57,6 +69,9 @@ public void onError(Throwable e) {

@Override
public void onNext(T value) {
if (done) {
return;
}
try {
doOnEachObserver.onNext(value);
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,32 +49,40 @@ public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {

private int counter = 0;

private boolean done = false;

@Override
public void onNext(T args) {
boolean isSelected;
try {
isSelected = predicate.call(args, counter++);
} catch (Throwable e) {
done = true;
subscriber.onError(e);
unsubscribe();
return;
}
if (isSelected) {
subscriber.onNext(args);
} else {
done = true;
subscriber.onCompleted();
unsubscribe();
}
}

@Override
public void onCompleted() {
subscriber.onCompleted();
if (!done) {
subscriber.onCompleted();
}
}

@Override
public void onError(Throwable e) {
subscriber.onError(e);
if (!done) {
subscriber.onError(e);
}
}

};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package rx.internal.operators;

import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
Expand All @@ -30,6 +31,9 @@
import rx.functions.Action1;
import rx.functions.Func1;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class OperatorDoOnEachTest {

@Mock
Expand Down Expand Up @@ -114,4 +118,55 @@ public void call(String s) {

}

@Test
public void testIssue1451Case1() {
// https://github.com/Netflix/RxJava/issues/1451
int[] nums = {1, 2, 3};
final AtomicInteger count = new AtomicInteger();
for (final int n : nums) {
Observable
.from(Boolean.TRUE, Boolean.FALSE)
.takeWhile(new Func1<Boolean, Boolean>() {
@Override
public Boolean call(Boolean value) {
return value;
}
})
.toList()
.doOnNext(new Action1<List<Boolean>>() {
@Override
public void call(List<Boolean> booleans) {
count.incrementAndGet();
}
})
.subscribe();
}
assertEquals(nums.length, count.get());
}

@Test
public void testIssue1451Case2() {
// https://github.com/Netflix/RxJava/issues/1451
int[] nums = {1, 2, 3};
final AtomicInteger count = new AtomicInteger();
for (final int n : nums) {
Observable
.from(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE)
.takeWhile(new Func1<Boolean, Boolean>() {
@Override
public Boolean call(Boolean value) {
return value;
}
})
.toList()
.doOnNext(new Action1<List<Boolean>>() {
@Override
public void call(List<Boolean> booleans) {
count.incrementAndGet();
}
})
.subscribe();
}
assertEquals(nums.length, count.get());
}
}

0 comments on commit 877ee89

Please sign in to comment.