diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableRepeatWhen.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableRepeatWhen.java index af27f2f6d0..24725997d1 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableRepeatWhen.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableRepeatWhen.java @@ -108,8 +108,8 @@ public void onError(Throwable e) { @Override public void onComplete() { - active = false; DisposableHelper.replace(upstream, null); + active = false; signaller.onNext(0); } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRepeatTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRepeatTest.java index 51376bc594..fd5061d9ed 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRepeatTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRepeatTest.java @@ -29,6 +29,7 @@ import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; import io.reactivex.schedulers.Schedulers; import io.reactivex.subscribers.TestSubscriber; @@ -441,4 +442,56 @@ public boolean test(Object v) throws Exception { assertEquals(0, counter.get()); } + + @Test + public void repeatFloodNoSubscriptionError() { + List errors = TestHelper.trackPluginErrors(); + + try { + final PublishProcessor source = PublishProcessor.create(); + final PublishProcessor signaller = PublishProcessor.create(); + + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + + TestSubscriber ts = source.take(1) + .repeatWhen(new Function, Flowable>() { + @Override + public Flowable apply(Flowable v) + throws Exception { + return signaller; + } + }).test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + source.onNext(1); + } + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + signaller.offer(1); + } + } + }; + + TestHelper.race(r1, r2); + + ts.dispose(); + } + + if (!errors.isEmpty()) { + for (Throwable e : errors) { + e.printStackTrace(); + } + fail(errors + ""); + } + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRetryTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRetryTest.java index a5dd4918ab..a47a5d6eca 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRetryTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRetryTest.java @@ -32,6 +32,7 @@ import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; import io.reactivex.schedulers.Schedulers; import io.reactivex.subscribers.*; @@ -1221,4 +1222,64 @@ public boolean test(Object v) throws Exception { assertEquals(0, counter.get()); } + + @Test + public void repeatFloodNoSubscriptionError() { + List errors = TestHelper.trackPluginErrors(); + + final TestException error = new TestException(); + + try { + final PublishProcessor source = PublishProcessor.create(); + final PublishProcessor signaller = PublishProcessor.create(); + + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + + TestSubscriber ts = source.take(1) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + throw error; + } + }) + .retryWhen(new Function, Flowable>() { + @Override + public Flowable apply(Flowable v) + throws Exception { + return signaller; + } + }).test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + source.onNext(1); + } + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + signaller.offer(1); + } + } + }; + + TestHelper.race(r1, r2); + + ts.dispose(); + } + + if (!errors.isEmpty()) { + for (Throwable e : errors) { + e.printStackTrace(); + } + fail(errors + ""); + } + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableRepeatTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableRepeatTest.java index 3afc1f1cc7..dbb72e21ef 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableRepeatTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableRepeatTest.java @@ -30,6 +30,7 @@ import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.Schedulers; import io.reactivex.subjects.PublishSubject; @@ -397,4 +398,56 @@ public boolean test(Object v) throws Exception { assertEquals(0, counter.get()); } + + @Test + public void repeatFloodNoSubscriptionError() { + List errors = TestHelper.trackPluginErrors(); + + try { + final PublishSubject source = PublishSubject.create(); + final PublishSubject signaller = PublishSubject.create(); + + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + + TestObserver to = source.take(1) + .repeatWhen(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Observable v) + throws Exception { + return signaller; + } + }).test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + source.onNext(1); + } + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + signaller.onNext(1); + } + } + }; + + TestHelper.race(r1, r2); + + to.dispose(); + } + + if (!errors.isEmpty()) { + for (Throwable e : errors) { + e.printStackTrace(); + } + fail(errors + ""); + } + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableRetryTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableRetryTest.java index e576479921..0055449ef5 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableRetryTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableRetryTest.java @@ -34,6 +34,7 @@ import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.observables.GroupedObservable; import io.reactivex.observers.*; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.Schedulers; import io.reactivex.subjects.PublishSubject; @@ -1131,4 +1132,64 @@ public boolean test(Object v) throws Exception { assertEquals(0, counter.get()); } + + @Test + public void repeatFloodNoSubscriptionError() { + List errors = TestHelper.trackPluginErrors(); + + final TestException error = new TestException(); + + try { + final PublishSubject source = PublishSubject.create(); + final PublishSubject signaller = PublishSubject.create(); + + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + + TestObserver to = source.take(1) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + throw error; + } + }) + .retryWhen(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Observable v) + throws Exception { + return signaller; + } + }).test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + source.onNext(1); + } + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + signaller.onNext(1); + } + } + }; + + TestHelper.race(r1, r2); + + to.dispose(); + } + + if (!errors.isEmpty()) { + for (Throwable e : errors) { + e.printStackTrace(); + } + fail(errors + ""); + } + } finally { + RxJavaPlugins.reset(); + } + } }