From 0b3b300425c3ae471ad5136a2bbeb9bd50f58849 Mon Sep 17 00:00:00 2001 From: David Karnok Date: Thu, 9 Jan 2020 08:57:22 +0100 Subject: [PATCH] 2.x: Fix Flowable.flatMap not canceling the inner sources on outer error (#6827) --- .../operators/flowable/FlowableFlatMap.java | 5 +++ .../flowable/FlowableFlatMapTest.java | 34 +++++++++++++++++++ .../observable/ObservableFlatMapTest.java | 34 +++++++++++++++++++ 3 files changed, 73 insertions(+) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java index c68e82bb93..a7631de8b8 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java @@ -322,6 +322,11 @@ public void onError(Throwable t) { } if (errs.addThrowable(t)) { done = true; + if (!delayErrors) { + for (InnerSubscriber a : subscribers.getAndSet(CANCELLED)) { + a.dispose(); + } + } drain(); } else { RxJavaPlugins.onError(t); diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java index 953a5f44dc..d121eea54e 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java @@ -1123,4 +1123,38 @@ public void accept(Integer v) throws Exception { assertFalse(pp3.hasSubscribers()); assertFalse(pp4.hasSubscribers()); } + + @Test + public void mainErrorsInnerCancelled() { + PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor pp2 = PublishProcessor.create(); + + pp1 + .flatMap(Functions.justFunction(pp2)) + .test(); + + pp1.onNext(1); + assertTrue("No subscribers?", pp2.hasSubscribers()); + + pp1.onError(new TestException()); + + assertFalse("Has subscribers?", pp2.hasSubscribers()); + } + + @Test + public void innerErrorsMainCancelled() { + PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor pp2 = PublishProcessor.create(); + + pp1 + .flatMap(Functions.justFunction(pp2)) + .test(); + + pp1.onNext(1); + assertTrue("No subscribers?", pp2.hasSubscribers()); + + pp2.onError(new TestException()); + + assertFalse("Has subscribers?", pp1.hasSubscribers()); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java index b4da14e9a1..5e0fa7cf8a 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java @@ -1084,4 +1084,38 @@ public void accept(Integer v) throws Exception { assertFalse(ps3.hasObservers()); assertFalse(ps4.hasObservers()); } + + @Test + public void mainErrorsInnerCancelled() { + PublishSubject ps1 = PublishSubject.create(); + PublishSubject ps2 = PublishSubject.create(); + + ps1 + .flatMap(Functions.justFunction(ps2)) + .test(); + + ps1.onNext(1); + assertTrue("No subscribers?", ps2.hasObservers()); + + ps1.onError(new TestException()); + + assertFalse("Has subscribers?", ps2.hasObservers()); + } + + @Test + public void innerErrorsMainCancelled() { + PublishSubject ps1 = PublishSubject.create(); + PublishSubject ps2 = PublishSubject.create(); + + ps1 + .flatMap(Functions.justFunction(ps2)) + .test(); + + ps1.onNext(1); + assertTrue("No subscribers?", ps2.hasObservers()); + + ps2.onError(new TestException()); + + assertFalse("Has subscribers?", ps1.hasObservers()); + } }