Skip to content

Commit

Permalink
2.x: Fix Flowable.flatMap not canceling the inner sources on outer er…
Browse files Browse the repository at this point in the history
…ror (#6827)
  • Loading branch information
akarnokd authored Jan 9, 2020
1 parent ea2c796 commit 0b3b300
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,11 @@ public void onError(Throwable t) {
}
if (errs.addThrowable(t)) {
done = true;
if (!delayErrors) {
for (InnerSubscriber<?, ?> a : subscribers.getAndSet(CANCELLED)) {
a.dispose();
}
}
drain();
} else {
RxJavaPlugins.onError(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1123,4 +1123,38 @@ public void accept(Integer v) throws Exception {
assertFalse(pp3.hasSubscribers());
assertFalse(pp4.hasSubscribers());
}

@Test
public void mainErrorsInnerCancelled() {
PublishProcessor<Integer> pp1 = PublishProcessor.create();
PublishProcessor<Integer> pp2 = PublishProcessor.create();

pp1
.flatMap(Functions.justFunction(pp2))
.test();

pp1.onNext(1);
assertTrue("No subscribers?", pp2.hasSubscribers());

pp1.onError(new TestException());

assertFalse("Has subscribers?", pp2.hasSubscribers());
}

@Test
public void innerErrorsMainCancelled() {
PublishProcessor<Integer> pp1 = PublishProcessor.create();
PublishProcessor<Integer> pp2 = PublishProcessor.create();

pp1
.flatMap(Functions.justFunction(pp2))
.test();

pp1.onNext(1);
assertTrue("No subscribers?", pp2.hasSubscribers());

pp2.onError(new TestException());

assertFalse("Has subscribers?", pp1.hasSubscribers());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1084,4 +1084,38 @@ public void accept(Integer v) throws Exception {
assertFalse(ps3.hasObservers());
assertFalse(ps4.hasObservers());
}

@Test
public void mainErrorsInnerCancelled() {
PublishSubject<Integer> ps1 = PublishSubject.create();
PublishSubject<Integer> ps2 = PublishSubject.create();

ps1
.flatMap(Functions.justFunction(ps2))
.test();

ps1.onNext(1);
assertTrue("No subscribers?", ps2.hasObservers());

ps1.onError(new TestException());

assertFalse("Has subscribers?", ps2.hasObservers());
}

@Test
public void innerErrorsMainCancelled() {
PublishSubject<Integer> ps1 = PublishSubject.create();
PublishSubject<Integer> ps2 = PublishSubject.create();

ps1
.flatMap(Functions.justFunction(ps2))
.test();

ps1.onNext(1);
assertTrue("No subscribers?", ps2.hasObservers());

ps2.onError(new TestException());

assertFalse("Has subscribers?", ps1.hasObservers());
}
}

0 comments on commit 0b3b300

Please sign in to comment.