From a30fb4d50f324ffb5c1daa79a2f29c646f7774e7 Mon Sep 17 00:00:00 2001 From: Prat Date: Thu, 29 Sep 2016 02:57:29 -0400 Subject: [PATCH] 2.x: Add Completable.fromRunnable() (#4629) --- src/main/java/io/reactivex/Completable.java | 17 +++++++ .../completable/CompletableFromRunnable.java | 47 +++++++++++++++++++ .../completable/CompletableTest.java | 31 ++++++++++++ 3 files changed, 95 insertions(+) create mode 100644 src/main/java/io/reactivex/internal/operators/completable/CompletableFromRunnable.java diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java index 4ae993775b..cfcd1c5935 100644 --- a/src/main/java/io/reactivex/Completable.java +++ b/src/main/java/io/reactivex/Completable.java @@ -339,6 +339,23 @@ public static Completable fromFuture(final Future future) { return fromAction(Functions.futureAction(future)); } + /** + * Returns a Completable instance that runs the given Runnable for each subscriber and + * emits either its exception or simply completes. + *
+ *
Scheduler:
+ *
{@code fromRunnable} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param run the runnable to run for each subscriber + * @return the new Completable instance + * @throws NullPointerException if run is null + */ + @SchedulerSupport(SchedulerSupport.NONE) + public static Completable fromRunnable(final Runnable run) { + ObjectHelper.requireNonNull(run, "run is null"); + return RxJavaPlugins.onAssembly(new CompletableFromRunnable(run)); + } + /** * Returns a Completable instance that subscribes to the given Observable, ignores all values and * emits only the terminal event. diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableFromRunnable.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableFromRunnable.java new file mode 100644 index 0000000000..851b0df4c3 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableFromRunnable.java @@ -0,0 +1,47 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.completable; + +import io.reactivex.Completable; +import io.reactivex.CompletableObserver; +import io.reactivex.disposables.Disposable; +import io.reactivex.disposables.Disposables; +import io.reactivex.exceptions.Exceptions; + +public final class CompletableFromRunnable extends Completable { + + final Runnable runnable; + + public CompletableFromRunnable(Runnable runnable) { + this.runnable = runnable; + } + + @Override + protected void subscribeActual(CompletableObserver s) { + Disposable d = Disposables.empty(); + s.onSubscribe(d); + try { + runnable.run(); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + if (!d.isDisposed()) { + s.onError(e); + } + return; + } + if (!d.isDisposed()) { + s.onComplete(); + } + } +} diff --git a/src/test/java/io/reactivex/completable/CompletableTest.java b/src/test/java/io/reactivex/completable/CompletableTest.java index 682aeb32a5..421c0ad09f 100644 --- a/src/test/java/io/reactivex/completable/CompletableTest.java +++ b/src/test/java/io/reactivex/completable/CompletableTest.java @@ -4473,6 +4473,37 @@ public void run() { } } + @Test(expected = NullPointerException.class) + public void fromRunnableNull() { + Completable.fromRunnable(null); + } + + @Test(timeout = 1000) + public void fromRunnableNormal() { + final AtomicInteger calls = new AtomicInteger(); + + Completable c = Completable.fromRunnable(new Runnable() { + @Override + public void run() { + calls.getAndIncrement(); + } + }); + + c.blockingAwait(); + + Assert.assertEquals(1, calls.get()); + } + + @Test(timeout = 1000, expected = TestException.class) + public void fromRunnableThrows() { + Completable c = Completable.fromRunnable(new Runnable() { + @Override + public void run() { throw new TestException(); } + }); + + c.blockingAwait(); + } + @Test(expected = NullPointerException.class) public void doOnErrorNullValue() { Completable.complete().doOnError(null);