From f059ded2c1d02d9797cae451a5c879eb0837e3b9 Mon Sep 17 00:00:00 2001 From: David Karnok Date: Fri, 10 Mar 2017 18:22:23 +0100 Subject: [PATCH] 2.x: fix LambdaObserver not cancelling the upstream (#5170) --- src/main/java/io/reactivex/Observable.java | 4 +- .../internal/observers/LambdaObserver.java | 2 + .../observable/ObservableInternalHelper.java | 10 +-- .../observers/LambdaObserverTest.java | 62 +++++++++++++++++++ .../observable/ObservableSwitchTest.java | 12 ++-- .../subscribers/LambdaSubscriberTest.java | 62 +++++++++++++++++++ 6 files changed, 139 insertions(+), 13 deletions(-) diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index d51bac4d81..e1361a22fe 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -10894,7 +10894,7 @@ public final Observable switchMap(Function Observable switchMapSingle(@NonNull Function> mapper) { return ObservableInternalHelper.switchMapSingle(this, mapper); } - + /** * Returns a new ObservableSource by applying a function that you supply to each item emitted by the source * ObservableSource that returns a SingleSource, and then emitting the item emitted by the most recently emitted @@ -10925,7 +10925,7 @@ public final Observable switchMapSingle(@NonNull Function Observable switchMapSingleDelayError(@NonNull Function> mapper) { return ObservableInternalHelper.switchMapSingleDelayError(this, mapper); } - + /** * Returns a new ObservableSource by applying a function that you supply to each item emitted by the source * ObservableSource that returns an ObservableSource, and then emitting the items emitted by the most recently emitted diff --git a/src/main/java/io/reactivex/internal/observers/LambdaObserver.java b/src/main/java/io/reactivex/internal/observers/LambdaObserver.java index 17e386fa52..9f06ecac75 100644 --- a/src/main/java/io/reactivex/internal/observers/LambdaObserver.java +++ b/src/main/java/io/reactivex/internal/observers/LambdaObserver.java @@ -47,6 +47,7 @@ public void onSubscribe(Disposable s) { onSubscribe.accept(this); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); + s.dispose(); onError(ex); } } @@ -59,6 +60,7 @@ public void onNext(T t) { onNext.accept(t); } catch (Throwable e) { Exceptions.throwIfFatal(e); + get().dispose(); onError(e); } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java index a092256501..faf8957d9e 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java @@ -326,27 +326,27 @@ public static Observable switchMapSingleDelayError(Observable source Function> mapper) { return source.switchMapDelayError(convertSingleMapperToObservableMapper(mapper), 1); } - + private static Function> convertSingleMapperToObservableMapper( final Function> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); return new ObservableMapper(mapper); } - + static final class ObservableMapper implements Function> { - + final Function> mapper; ObservableMapper(Function> mapper) { this.mapper = mapper; } - + @Override public Observable apply(T t) throws Exception { return RxJavaPlugins.onAssembly(new SingleToObservable( ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null value"))); } - + } } diff --git a/src/test/java/io/reactivex/internal/observers/LambdaObserverTest.java b/src/test/java/io/reactivex/internal/observers/LambdaObserverTest.java index 4683379f57..d28a92be4b 100644 --- a/src/test/java/io/reactivex/internal/observers/LambdaObserverTest.java +++ b/src/test/java/io/reactivex/internal/observers/LambdaObserverTest.java @@ -26,6 +26,7 @@ import io.reactivex.exceptions.*; import io.reactivex.functions.*; import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.subjects.PublishSubject; public class LambdaObserverTest { @@ -280,4 +281,65 @@ public void accept(Disposable s) throws Exception { assertEquals(Arrays.asList(1, 100), received); } + + @Test + public void onNextThrowsCancelsUpstream() { + PublishSubject ps = PublishSubject.create(); + + final List errors = new ArrayList(); + + ps.subscribe(new Consumer() { + @Override + public void accept(Integer v) throws Exception { + throw new TestException(); + } + }, new Consumer() { + @Override + public void accept(Throwable e) throws Exception { + errors.add(e); + } + }); + + assertTrue("No observers?!", ps.hasObservers()); + assertTrue("Has errors already?!", errors.isEmpty()); + + ps.onNext(1); + + assertFalse("Has observers?!", ps.hasObservers()); + assertFalse("No errors?!", errors.isEmpty()); + + assertTrue(errors.toString(), errors.get(0) instanceof TestException); + } + + @Test + public void onSubscribeThrowsCancelsUpstream() { + PublishSubject ps = PublishSubject.create(); + + final List errors = new ArrayList(); + + ps.subscribe(new Consumer() { + @Override + public void accept(Integer v) throws Exception { + } + }, new Consumer() { + @Override + public void accept(Throwable e) throws Exception { + errors.add(e); + } + }, new Action() { + @Override + public void run() throws Exception { + } + }, new Consumer() { + @Override + public void accept(Disposable s) throws Exception { + throw new TestException(); + } + }); + + assertFalse("Has observers?!", ps.hasObservers()); + assertFalse("No errors?!", errors.isEmpty()); + + assertTrue(errors.toString(), errors.get(0) instanceof TestException); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableSwitchTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableSwitchTest.java index 94c36df7c9..cb6587f589 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableSwitchTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableSwitchTest.java @@ -622,7 +622,7 @@ public void switchMapInnerCancelled() { assertFalse(pp.hasObservers()); } - + @Test public void switchMapSingleJustSource() { Observable.just(0) @@ -635,7 +635,7 @@ public SingleSource apply(Object v) throws Exception { .test() .assertResult(1); } - + @Test public void switchMapSingleMapperReturnsNull() { Observable.just(0) @@ -648,13 +648,13 @@ public SingleSource apply(Object v) throws Exception { .test() .assertError(NullPointerException.class); } - - @Test(expected=NullPointerException.class) + + @Test(expected = NullPointerException.class) public void switchMapSingleMapperIsNull() { Observable.just(0) .switchMapSingle(null); } - + @Test public void switchMapSingleFunctionDoesntReturnSingle() { Observable.just(0) @@ -698,7 +698,7 @@ public void accept(Integer n) throws Exception { .assertError(RuntimeException.class); assertTrue(completed.get()); } - + @Test public void scalarMap() { Observable.switchOnNext(Observable.just(Observable.just(1))) diff --git a/src/test/java/io/reactivex/internal/subscribers/LambdaSubscriberTest.java b/src/test/java/io/reactivex/internal/subscribers/LambdaSubscriberTest.java index 71f64924e7..4666e77569 100644 --- a/src/test/java/io/reactivex/internal/subscribers/LambdaSubscriberTest.java +++ b/src/test/java/io/reactivex/internal/subscribers/LambdaSubscriberTest.java @@ -25,6 +25,7 @@ import io.reactivex.functions.*; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.processors.PublishProcessor; public class LambdaSubscriberTest { @@ -285,4 +286,65 @@ public void accept(Subscription s) throws Exception { assertEquals(Arrays.asList(1, 100), received); } + + @Test + public void onNextThrowsCancelsUpstream() { + PublishProcessor ps = PublishProcessor.create(); + + final List errors = new ArrayList(); + + ps.subscribe(new Consumer() { + @Override + public void accept(Integer v) throws Exception { + throw new TestException(); + } + }, new Consumer() { + @Override + public void accept(Throwable e) throws Exception { + errors.add(e); + } + }); + + assertTrue("No observers?!", ps.hasSubscribers()); + assertTrue("Has errors already?!", errors.isEmpty()); + + ps.onNext(1); + + assertFalse("Has observers?!", ps.hasSubscribers()); + assertFalse("No errors?!", errors.isEmpty()); + + assertTrue(errors.toString(), errors.get(0) instanceof TestException); + } + + @Test + public void onSubscribeThrowsCancelsUpstream() { + PublishProcessor ps = PublishProcessor.create(); + + final List errors = new ArrayList(); + + ps.subscribe(new Consumer() { + @Override + public void accept(Integer v) throws Exception { + } + }, new Consumer() { + @Override + public void accept(Throwable e) throws Exception { + errors.add(e); + } + }, new Action() { + @Override + public void run() throws Exception { + } + }, new Consumer() { + @Override + public void accept(Subscription s) throws Exception { + throw new TestException(); + } + }); + + assertFalse("Has observers?!", ps.hasSubscribers()); + assertFalse("No errors?!", errors.isEmpty()); + + assertTrue(errors.toString(), errors.get(0) instanceof TestException); + } }