From 5930ff0aa340e957e889e0066eb0aa48685c0c79 Mon Sep 17 00:00:00 2001 From: pjastrz Date: Sat, 12 Jan 2019 17:31:43 +0100 Subject: [PATCH 1/3] 2.x: Fix Completable.andThen(Completable) not running on scheduler defined with observeOn. --- src/main/java/io/reactivex/Completable.java | 7 +- .../CompletableAndThenCompletable.java | 107 ++++++++++ .../CompletableAndThenCompletableabTest.java | 184 ++++++++++++++++++ .../completable/CompletableAndThenTest.java | 45 +---- 4 files changed, 297 insertions(+), 46 deletions(-) create mode 100644 src/main/java/io/reactivex/internal/operators/completable/CompletableAndThenCompletable.java create mode 100644 src/test/java/io/reactivex/internal/operators/completable/CompletableAndThenCompletableabTest.java diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java index 9d8325e3b6..7bcfb9038d 100644 --- a/src/main/java/io/reactivex/Completable.java +++ b/src/main/java/io/reactivex/Completable.java @@ -1171,7 +1171,8 @@ public final Maybe andThen(MaybeSource next) { @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Completable andThen(CompletableSource next) { - return concatWith(next); + ObjectHelper.requireNonNull(next, "next is null"); + return RxJavaPlugins.onAssembly(new CompletableAndThenCompletable(this, next)); } /** @@ -1356,8 +1357,8 @@ public final Completable compose(CompletableTransformer transformer) { @NonNull @SchedulerSupport(SchedulerSupport.NONE) public final Completable concatWith(CompletableSource other) { - ObjectHelper.requireNonNull(other, "other is null"); - return concatArray(this, other); + ObjectHelper.requireNonNull(other, "next is null"); + return RxJavaPlugins.onAssembly(new CompletableAndThenCompletable(this, other)); } /** diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableAndThenCompletable.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableAndThenCompletable.java new file mode 100644 index 0000000000..20d2332214 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableAndThenCompletable.java @@ -0,0 +1,107 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.completable; + +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.disposables.DisposableHelper; + +public final class CompletableAndThenCompletable extends Completable { + + final CompletableSource source; + + final CompletableSource next; + + public CompletableAndThenCompletable(CompletableSource source, CompletableSource next) { + this.source = source; + this.next = next; + } + + @Override + protected void subscribeActual(CompletableObserver observer) { + source.subscribe(new SourceObserver(observer, next)); + } + + static final class SourceObserver + extends AtomicReference + implements CompletableObserver, Disposable { + + private static final long serialVersionUID = -4101678820158072998L; + + final CompletableObserver actualObserver; + + final CompletableSource next; + + SourceObserver(CompletableObserver actualObserver, CompletableSource next) { + this.actualObserver = actualObserver; + this.next = next; + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.setOnce(this, d)) { + actualObserver.onSubscribe(this); + } + } + + @Override + public void onError(Throwable e) { + actualObserver.onError(e); + } + + @Override + public void onComplete() { + next.subscribe(new NextObserver(this, actualObserver)); + } + + @Override + public void dispose() { + DisposableHelper.dispose(this); + } + + @Override + public boolean isDisposed() { + return DisposableHelper.isDisposed(get()); + } + } + + static final class NextObserver implements CompletableObserver { + + final AtomicReference parent; + + final CompletableObserver downstream; + + public NextObserver(AtomicReference parent, CompletableObserver downstream) { + this.parent = parent; + this.downstream = downstream; + } + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.replace(parent, d); + } + + @Override + public void onComplete() { + downstream.onComplete(); + } + + @Override + public void onError(Throwable e) { + downstream.onError(e); + } + } +} diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableAndThenCompletableabTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableAndThenCompletableabTest.java new file mode 100644 index 0000000000..f7b2ed9190 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableAndThenCompletableabTest.java @@ -0,0 +1,184 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.completable; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.functions.Action; +import io.reactivex.observers.TestObserver; +import io.reactivex.schedulers.Schedulers; + +import static org.junit.Assert.*; + +public class CompletableAndThenCompletableabTest { + @Test(expected = NullPointerException.class) + public void andThenCompletableCompleteNull() { + Completable.complete() + .andThen((Completable) null); + } + + @Test + public void andThenCompletableCompleteComplete() { + Completable.complete() + .andThen(Completable.complete()) + .test() + .assertComplete(); + } + + @Test + public void andThenCompletableCompleteError() { + Completable.complete() + .andThen(Completable.error(new RuntimeException("test"))) + .test() + .assertNotComplete() + .assertNoValues() + .assertError(RuntimeException.class) + .assertErrorMessage("test"); + } + + @Test + public void andThenCompletableCompleteNever() { + Completable.complete() + .andThen(Completable.never()) + .test() + .assertNoValues() + .assertNoErrors() + .assertNotComplete(); + } + + @Test + public void andThenCompletableErrorComplete() { + Completable.error(new RuntimeException("bla")) + .andThen(Completable.complete()) + .test() + .assertNotComplete() + .assertNoValues() + .assertError(RuntimeException.class) + .assertErrorMessage("bla"); + } + + @Test + public void andThenCompletableErrorNever() { + Completable.error(new RuntimeException("bla")) + .andThen(Completable.never()) + .test() + .assertNotComplete() + .assertNoValues() + .assertError(RuntimeException.class) + .assertErrorMessage("bla"); + } + + @Test + public void andThenCompletableErrorError() { + Completable.error(new RuntimeException("error1")) + .andThen(Completable.error(new RuntimeException("error2"))) + .test() + .assertNotComplete() + .assertNoValues() + .assertError(RuntimeException.class) + .assertErrorMessage("error1"); + } + + @Test + public void andThenCanceled() { + final AtomicInteger completableRunCount = new AtomicInteger(); + Completable.fromRunnable(new Runnable() { + @Override + public void run() { + completableRunCount.incrementAndGet(); + } + }) + .andThen(Completable.complete()) + .test(true) + .assertEmpty(); + assertEquals(1, completableRunCount.get()); + } + + @Test + public void andThenFirstCancels() { + final TestObserver to = new TestObserver(); + Completable.fromRunnable(new Runnable() { + @Override + public void run() { + to.cancel(); + } + }) + .andThen(Completable.complete()) + .subscribe(to); + to + .assertNotComplete() + .assertNoErrors(); + } + + @Test + public void andThenSecondCancels() { + final TestObserver to = new TestObserver(); + Completable.complete() + .andThen(Completable.fromRunnable(new Runnable() { + @Override + public void run() { + to.cancel(); + } + })) + .subscribe(to); + to + .assertNotComplete() + .assertNoErrors(); + } + + @Test + public void andThenDisposed() { + TestHelper.checkDisposed(Completable.complete() + .andThen(Completable.complete())); + } + + @Test + public void andThenNoInterrupt() throws InterruptedException { + for (int k = 0; k < 100; k++) { + final int count = 10; + final CountDownLatch latch = new CountDownLatch(count); + final boolean[] interrupted = {false}; + + for (int i = 0; i < count; i++) { + Completable.complete() + .subscribeOn(Schedulers.io()) + .observeOn(Schedulers.io()) + .andThen(Completable.fromAction(new Action() { + @Override + public void run() throws Exception { + try { + Thread.sleep(30); + } catch (InterruptedException e) { + System.out.println("Interrupted! " + Thread.currentThread()); + interrupted[0] = true; + } + } + })) + .subscribe(new Action() { + @Override + public void run() throws Exception { + latch.countDown(); + } + }); + } + + latch.await(); + assertFalse("The second Completable was interrupted!", interrupted[0]); + } + } +} diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableAndThenTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableAndThenTest.java index a5d9b3a279..c873d5472d 100644 --- a/src/test/java/io/reactivex/internal/operators/completable/CompletableAndThenTest.java +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableAndThenTest.java @@ -13,15 +13,9 @@ package io.reactivex.internal.operators.completable; -import io.reactivex.Completable; -import io.reactivex.Maybe; -import io.reactivex.functions.Action; -import io.reactivex.schedulers.Schedulers; - -import java.util.concurrent.CountDownLatch; - import org.junit.Test; -import static org.junit.Assert.*; + +import io.reactivex.*; public class CompletableAndThenTest { @Test(expected = NullPointerException.class) @@ -69,39 +63,4 @@ public void andThenMaybeError() { .assertError(RuntimeException.class) .assertErrorMessage("bla"); } - - @Test - public void andThenNoInterrupt() throws InterruptedException { - for (int k = 0; k < 100; k++) { - final int count = 10; - final CountDownLatch latch = new CountDownLatch(count); - final boolean[] interrupted = { false }; - - for (int i = 0; i < count; i++) { - Completable.complete() - .subscribeOn(Schedulers.io()) - .observeOn(Schedulers.io()) - .andThen(Completable.fromAction(new Action() { - @Override - public void run() throws Exception { - try { - Thread.sleep(30); - } catch (InterruptedException e) { - System.out.println("Interrupted! " + Thread.currentThread()); - interrupted[0] = true; - } - } - })) - .subscribe(new Action() { - @Override - public void run() throws Exception { - latch.countDown(); - } - }); - } - - latch.await(); - assertFalse("The second Completable was interrupted!", interrupted[0]); - } - } } From 7d836cfd499740713e20eba0ab229fca0f9e0c0c Mon Sep 17 00:00:00 2001 From: pjastrz Date: Sat, 12 Jan 2019 17:45:16 +0100 Subject: [PATCH 2/3] 2.x: Fix Completable.andThen(Completable) not running on scheduler defined with observeOn. --- src/main/java/io/reactivex/Completable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java index 7bcfb9038d..7ac7ead344 100644 --- a/src/main/java/io/reactivex/Completable.java +++ b/src/main/java/io/reactivex/Completable.java @@ -1357,7 +1357,7 @@ public final Completable compose(CompletableTransformer transformer) { @NonNull @SchedulerSupport(SchedulerSupport.NONE) public final Completable concatWith(CompletableSource other) { - ObjectHelper.requireNonNull(other, "next is null"); + ObjectHelper.requireNonNull(other, "other is null"); return RxJavaPlugins.onAssembly(new CompletableAndThenCompletable(this, other)); } From e424b4b00b00634ab08a911eb5778645eccbf60c Mon Sep 17 00:00:00 2001 From: pjastrz Date: Sat, 12 Jan 2019 18:04:30 +0100 Subject: [PATCH 3/3] 2.x: Fix Completable.andThen(Completable) not running on scheduler defined with observeOn. --- .../CompletableAndThenCompletableabTest.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableAndThenCompletableabTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableAndThenCompletableabTest.java index f7b2ed9190..34b9c82436 100644 --- a/src/test/java/io/reactivex/internal/operators/completable/CompletableAndThenCompletableabTest.java +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableAndThenCompletableabTest.java @@ -19,6 +19,7 @@ import org.junit.Test; import io.reactivex.*; +import io.reactivex.exceptions.TestException; import io.reactivex.functions.Action; import io.reactivex.observers.TestObserver; import io.reactivex.schedulers.Schedulers; @@ -43,11 +44,11 @@ public void andThenCompletableCompleteComplete() { @Test public void andThenCompletableCompleteError() { Completable.complete() - .andThen(Completable.error(new RuntimeException("test"))) + .andThen(Completable.error(new TestException("test"))) .test() .assertNotComplete() .assertNoValues() - .assertError(RuntimeException.class) + .assertError(TestException.class) .assertErrorMessage("test"); } @@ -63,34 +64,34 @@ public void andThenCompletableCompleteNever() { @Test public void andThenCompletableErrorComplete() { - Completable.error(new RuntimeException("bla")) + Completable.error(new TestException("bla")) .andThen(Completable.complete()) .test() .assertNotComplete() .assertNoValues() - .assertError(RuntimeException.class) + .assertError(TestException.class) .assertErrorMessage("bla"); } @Test public void andThenCompletableErrorNever() { - Completable.error(new RuntimeException("bla")) + Completable.error(new TestException("bla")) .andThen(Completable.never()) .test() .assertNotComplete() .assertNoValues() - .assertError(RuntimeException.class) + .assertError(TestException.class) .assertErrorMessage("bla"); } @Test public void andThenCompletableErrorError() { - Completable.error(new RuntimeException("error1")) - .andThen(Completable.error(new RuntimeException("error2"))) + Completable.error(new TestException("error1")) + .andThen(Completable.error(new TestException("error2"))) .test() .assertNotComplete() .assertNoValues() - .assertError(RuntimeException.class) + .assertError(TestException.class) .assertErrorMessage("error1"); }