Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Fix switchMap to indicate boundary fusion #5991

Merged
merged 1 commit into from
May 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ public void onSubscribe(Subscription s) {
@SuppressWarnings("unchecked")
QueueSubscription<R> qs = (QueueSubscription<R>) s;

int m = qs.requestFusion(QueueSubscription.ANY);
int m = qs.requestFusion(QueueSubscription.ANY | QueueSubscription.BOUNDARY);
if (m == QueueSubscription.SYNC) {
fusionMode = m;
queue = qs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ public void onSubscribe(Disposable s) {
@SuppressWarnings("unchecked")
QueueDisposable<R> qd = (QueueDisposable<R>) s;

int m = qd.requestFusion(QueueDisposable.ANY);
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
queue = qd;
done = true;
Expand Down
210 changes: 210 additions & 0 deletions src/test/java/io/reactivex/TestHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -2922,4 +2922,214 @@ public void request(long n) {
}
};
}

static final class FlowableStripBoundary<T> extends Flowable<T> implements FlowableTransformer<T, T> {

final Flowable<T> source;

FlowableStripBoundary(Flowable<T> source) {
this.source = source;
}

@Override
public Flowable<T> apply(Flowable<T> upstream) {
return new FlowableStripBoundary<T>(upstream);
}

@Override
protected void subscribeActual(Subscriber<? super T> s) {
source.subscribe(new StripBoundarySubscriber<T>(s));
}

static final class StripBoundarySubscriber<T> implements FlowableSubscriber<T>, QueueSubscription<T> {

final Subscriber<? super T> actual;

Subscription upstream;

QueueSubscription<T> qs;

StripBoundarySubscriber(Subscriber<? super T> actual) {
this.actual = actual;
}

@SuppressWarnings("unchecked")
@Override
public void onSubscribe(Subscription subscription) {
this.upstream = subscription;
if (subscription instanceof QueueSubscription) {
qs = (QueueSubscription<T>)subscription;
}
actual.onSubscribe(this);
}

@Override
public void onNext(T t) {
actual.onNext(t);
}

@Override
public void onError(Throwable throwable) {
actual.onError(throwable);
}

@Override
public void onComplete() {
actual.onComplete();
}

@Override
public int requestFusion(int mode) {
QueueSubscription<T> fs = qs;
if (fs != null) {
return fs.requestFusion(mode & ~BOUNDARY);
}
return NONE;
}

@Override
public boolean offer(T value) {
throw new UnsupportedOperationException("Should not be called");
}

@Override
public boolean offer(T v1, T v2) {
throw new UnsupportedOperationException("Should not be called");
}

@Override
public T poll() throws Exception {
return qs.poll();
}

@Override
public void clear() {
qs.clear();
}

@Override
public boolean isEmpty() {
return qs.isEmpty();
}

@Override
public void request(long n) {
upstream.request(n);
}

@Override
public void cancel() {
upstream.cancel();
}
}
}

public static <T> FlowableTransformer<T, T> flowableStripBoundary() {
return new FlowableStripBoundary<T>(null);
}

static final class ObservableStripBoundary<T> extends Observable<T> implements ObservableTransformer<T, T> {

final Observable<T> source;

ObservableStripBoundary(Observable<T> source) {
this.source = source;
}

@Override
public Observable<T> apply(Observable<T> upstream) {
return new ObservableStripBoundary<T>(upstream);
}

@Override
protected void subscribeActual(Observer<? super T> s) {
source.subscribe(new StripBoundaryObserver<T>(s));
}

static final class StripBoundaryObserver<T> implements Observer<T>, QueueDisposable<T> {

final Observer<? super T> actual;

Disposable upstream;

QueueDisposable<T> qd;

StripBoundaryObserver(Observer<? super T> actual) {
this.actual = actual;
}

@SuppressWarnings("unchecked")
@Override
public void onSubscribe(Disposable d) {
this.upstream = d;
if (d instanceof QueueDisposable) {
qd = (QueueDisposable<T>)d;
}
actual.onSubscribe(this);
}

@Override
public void onNext(T t) {
actual.onNext(t);
}

@Override
public void onError(Throwable throwable) {
actual.onError(throwable);
}

@Override
public void onComplete() {
actual.onComplete();
}

@Override
public int requestFusion(int mode) {
QueueDisposable<T> fs = qd;
if (fs != null) {
return fs.requestFusion(mode & ~BOUNDARY);
}
return NONE;
}

@Override
public boolean offer(T value) {
throw new UnsupportedOperationException("Should not be called");
}

@Override
public boolean offer(T v1, T v2) {
throw new UnsupportedOperationException("Should not be called");
}

@Override
public T poll() throws Exception {
return qd.poll();
}

@Override
public void clear() {
qd.clear();
}

@Override
public boolean isEmpty() {
return qd.isEmpty();
}

@Override
public void dispose() {
upstream.dispose();
}

@Override
public boolean isDisposed() {
return upstream.isDisposed();
}
}
}

public static <T> ObservableTransformer<T, T> observableStripBoundary() {
return new ObservableStripBoundary<T>(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import io.reactivex.internal.util.ExceptionHelper;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.processors.PublishProcessor;
import io.reactivex.schedulers.TestScheduler;
import io.reactivex.schedulers.*;
import io.reactivex.subscribers.*;

public class FlowableSwitchTest {
Expand Down Expand Up @@ -1144,12 +1144,16 @@ public void run() {
@Test
public void fusedInnerCrash() {
Flowable.just(1).hide()
.switchMap(Functions.justFunction(Flowable.just(1).map(new Function<Integer, Object>() {
@Override
public Object apply(Integer v) throws Exception {
throw new TestException();
}
})))
.switchMap(Functions.justFunction(Flowable.just(1)
.map(new Function<Integer, Object>() {
@Override
public Object apply(Integer v) throws Exception {
throw new TestException();
}
})
.compose(TestHelper.<Object>flowableStripBoundary())
)
)
.test()
.assertFailure(TestException.class);
}
Expand All @@ -1174,4 +1178,30 @@ public void innerCancelledOnMainError() {

ts.assertFailure(TestException.class);
}

@Test
public void fusedBoundary() {
String thread = Thread.currentThread().getName();

Flowable.range(1, 10000)
.switchMap(new Function<Integer, Flowable<? extends Object>>() {
@Override
public Flowable<? extends Object> apply(Integer v)
throws Exception {
return Flowable.just(2).hide()
.observeOn(Schedulers.single())
.map(new Function<Integer, Object>() {
@Override
public Object apply(Integer w) throws Exception {
return Thread.currentThread().getName();
}
});
}
})
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertNever(thread)
.assertNoErrors()
.assertComplete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.reactivex.internal.operators.observable;

import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;

import java.util.List;
Expand All @@ -26,15 +27,14 @@
import io.reactivex.*;
import io.reactivex.disposables.*;
import io.reactivex.exceptions.*;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.schedulers.ImmediateThinScheduler;
import io.reactivex.internal.util.ExceptionHelper;
import io.reactivex.observers.TestObserver;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.TestScheduler;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.schedulers.*;
import io.reactivex.subjects.*;

public class ObservableSwitchTest {

Expand Down Expand Up @@ -1121,6 +1121,7 @@ public Integer apply(Integer v) throws Exception {
throw new TestException();
}
})
.compose(TestHelper.<Integer>observableStripBoundary())
))
.test();

Expand Down Expand Up @@ -1148,6 +1149,7 @@ public Integer apply(Integer v) throws Exception {
throw new TestException();
}
})
.compose(TestHelper.<Integer>observableStripBoundary())
))
.test();

Expand All @@ -1166,4 +1168,30 @@ public Integer apply(Integer v) throws Exception {

assertFalse(ps.hasObservers());
}

@Test
public void fusedBoundary() {
String thread = Thread.currentThread().getName();

Observable.range(1, 10000)
.switchMap(new Function<Integer, ObservableSource<? extends Object>>() {
@Override
public ObservableSource<? extends Object> apply(Integer v)
throws Exception {
return Observable.just(2).hide()
.observeOn(Schedulers.single())
.map(new Function<Integer, Object>() {
@Override
public Object apply(Integer w) throws Exception {
return Thread.currentThread().getName();
}
});
}
})
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertNever(thread)
.assertNoErrors()
.assertComplete();
}
}