Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SwitchOnNext: fix upstream producer replacing the ops own producer #2655

Merged
merged 2 commits into from
Feb 16, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/main/java/rx/internal/operators/OperatorSwitch.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ public static <T> OperatorSwitch<T> instance() {
private OperatorSwitch() { }
@Override
public Subscriber<? super Observable<? extends T>> call(final Subscriber<? super T> child) {
return new SwitchSubscriber<T>(child);
SwitchSubscriber<T> sws = new SwitchSubscriber<T>(child);
child.add(sws);
return sws;
}

private static final class SwitchSubscriber<T> extends Subscriber<Observable<? extends T>> {
Expand All @@ -75,7 +77,6 @@ private static final class SwitchSubscriber<T> extends Subscriber<Observable<? e
volatile boolean infinite = false;

public SwitchSubscriber(Subscriber<? super T> child) {
super(child);
s = new SerializedSubscriber<T>(child);
ssub = new SerialSubscription();
child.add(ssub);
Expand Down
56 changes: 50 additions & 6 deletions src/test/java/rx/internal/operators/OperatorSwitchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,25 @@
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.*;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.InOrder;

import rx.*;
import rx.Observable;
import rx.Observer;
import rx.Producer;
import rx.Scheduler;
import rx.Subscriber;
import rx.exceptions.TestException;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.observers.TestSubscriber;
import rx.schedulers.TestScheduler;

Expand Down Expand Up @@ -530,4 +532,46 @@ public void call(final Subscriber<? super Observable<Integer>> subscriber) {
).take(1).subscribe();
assertTrue("Switch doesn't propagate 'unsubscribe'", isUnsubscribed.get());
}
/** The upstream producer hijacked the switch producer stopping the requests aimed at the inner observables. */
@Test
public void testIssue2654() {
Observable<String> oneItem = Observable.just("Hello").mergeWith(Observable.<String>never());

Observable<String> src = oneItem.switchMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(final String s) {
return Observable.just(s)
.mergeWith(Observable.interval(10, TimeUnit.MILLISECONDS)
.map(new Func1<Long, String>() {
@Override
public String call(Long i) {
return s + " " + i;
}
})).take(250);
}
})
.share()
;

TestSubscriber<String> ts = new TestSubscriber<String>() {
@Override
public void onNext(String t) {
super.onNext(t);
if (getOnNextEvents().size() == 250) {
onCompleted();
unsubscribe();
}
}
};
src.subscribe(ts);

ts.awaitTerminalEvent(10, TimeUnit.SECONDS);

System.out.println("> testIssue2654: " + ts.getOnNextEvents().size());

ts.assertTerminalEvent();
ts.assertNoErrors();

Assert.assertEquals(250, ts.getOnNextEvents().size());
}
}