diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java index 319e31272d..e82664e0a6 100644 --- a/src/main/java/io/reactivex/Completable.java +++ b/src/main/java/io/reactivex/Completable.java @@ -388,6 +388,8 @@ public static Completable error(final Throwable error) { /** * Returns a Completable instance that runs the given Action for each subscriber and * emits either an unchecked exception or simply completes. + *

+ * *

*
Scheduler:
*
{@code fromAction} does not operate by default on a particular {@link Scheduler}.
@@ -406,6 +408,8 @@ public static Completable fromAction(final Action run) { /** * Returns a Completable which when subscribed, executes the callable function, ignores its * normal result and emits onError or onComplete only. + *

+ * *

*
Scheduler:
*
{@code fromCallable} does not operate by default on a particular {@link Scheduler}.
@@ -423,6 +427,8 @@ public static Completable fromCallable(final Callable callable) { /** * Returns a Completable instance that reacts to the termination of the given Future in a blocking fashion. *

+ * + *

* Note that cancellation from any of the subscribers to this Completable will cancel the future. *

*
Scheduler:
@@ -438,9 +444,33 @@ public static Completable fromFuture(final Future future) { return fromAction(Functions.futureAction(future)); } + /** + * Returns a Completable instance that when subscribed to, subscribes to the {@code Maybe} instance and + * emits a completion event if the maybe emits {@code onSuccess}/{@code onComplete} or forwards any + * {@code onError} events. + *

+ * + *

+ *
Scheduler:
+ *
{@code fromMaybe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type of the {@link MaybeSource} element + * @param maybe the Maybe instance to subscribe to, not null + * @return the new Completable instance + * @throws NullPointerException if single is null + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + public static Completable fromMaybe(final MaybeSource maybe) { + ObjectHelper.requireNonNull(maybe, "maybe is null"); + return RxJavaPlugins.onAssembly(new MaybeIgnoreElementCompletable(maybe)); + } + /** * Returns a Completable instance that runs the given Runnable for each subscriber and * emits either its exception or simply completes. + *

+ * *

*
Scheduler:
*
{@code fromRunnable} does not operate by default on a particular {@link Scheduler}.
@@ -459,6 +489,8 @@ public static Completable fromRunnable(final Runnable run) { /** * Returns a Completable instance that subscribes to the given Observable, ignores all values and * emits only the terminal event. + *

+ * *

*
Scheduler:
*
{@code fromObservable} does not operate by default on a particular {@link Scheduler}.
@@ -479,6 +511,8 @@ public static Completable fromObservable(final ObservableSource observabl * Returns a Completable instance that subscribes to the given publisher, ignores all values and * emits only the terminal event. *

+ * + *

* The {@link Publisher} must follow the * Reactive-Streams specification. * Violating the specification may result in undefined behavior. @@ -513,6 +547,8 @@ public static Completable fromPublisher(final Publisher publisher) { /** * Returns a Completable instance that when subscribed to, subscribes to the Single instance and * emits a completion event if the single emits onSuccess or forwards any onError events. + *

+ * *

*
Scheduler:
*
{@code fromSingle} does not operate by default on a particular {@link Scheduler}.
@@ -532,6 +568,8 @@ public static Completable fromSingle(final SingleSource single) { /** * Returns a Completable instance that subscribes to all sources at once and * completes only when all source Completables complete or one of them emits an error. + *

+ * *

*
Scheduler:
*
{@code mergeArray} does not operate by default on a particular {@link Scheduler}.
@@ -569,6 +607,8 @@ public static Completable mergeArray(CompletableSource... sources) { /** * Returns a Completable instance that subscribes to all sources at once and * completes only when all source Completables complete or one of them emits an error. + *

+ * *

*
Scheduler:
*
{@code merge} does not operate by default on a particular {@link Scheduler}.
@@ -601,6 +641,8 @@ public static Completable merge(Iterable sources) { /** * Returns a Completable instance that subscribes to all sources at once and * completes only when all source Completables complete or one of them emits an error. + *

+ * *

*
Backpressure:
*
The returned {@code Completable} honors the backpressure of the downstream consumer @@ -635,6 +677,8 @@ public static Completable merge(Publisher sources) /** * Returns a Completable instance that keeps subscriptions to a limited number of sources at once and * completes only when all source Completables complete or one of them emits an error. + *

+ * *

*
Backpressure:
*
The returned {@code Completable} honors the backpressure of the downstream consumer @@ -699,6 +743,8 @@ private static Completable merge0(Publisher sources * Returns a CompletableConsumable that subscribes to all Completables in the source array and delays * any error emitted by either the sources observable or any of the inner Completables until all of * them terminate in a way or another. + *

+ * *

*
Scheduler:
*
{@code mergeArrayDelayError} does not operate by default on a particular {@link Scheduler}.
@@ -718,6 +764,8 @@ public static Completable mergeArrayDelayError(CompletableSource... sources) { * Returns a Completable that subscribes to all Completables in the source sequence and delays * any error emitted by either the sources observable or any of the inner Completables until all of * them terminate in a way or another. + *

+ * *

*
Scheduler:
*
{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.
@@ -738,6 +786,8 @@ public static Completable mergeDelayError(Iterable * Returns a Completable that subscribes to all Completables in the source sequence and delays * any error emitted by either the sources observable or any of the inner Completables until all of * them terminate in a way or another. + *

+ * *

*
Backpressure:
*
The returned {@code Completable} honors the backpressure of the downstream consumer @@ -761,6 +811,8 @@ public static Completable mergeDelayError(Publisher * the source sequence and delays any error emitted by either the sources * observable or any of the inner Completables until all of * them terminate in a way or another. + *

+ * *

*
Backpressure:
*
The returned {@code Completable} honors the backpressure of the downstream consumer @@ -782,6 +834,8 @@ public static Completable mergeDelayError(Publisher /** * Returns a Completable that never calls onError or onComplete. + *

+ * *

*
Scheduler:
*
{@code never} does not operate by default on a particular {@link Scheduler}.
@@ -796,6 +850,8 @@ public static Completable never() { /** * Returns a Completable instance that fires its onComplete event after the given delay elapsed. + *

+ * *

*
Scheduler:
*
{@code timer} does operate by default on the {@code computation} {@link Scheduler}.
@@ -813,6 +869,8 @@ public static Completable timer(long delay, TimeUnit unit) { /** * Returns a Completable instance that fires its onComplete event after the given delay elapsed * by using the supplied scheduler. + *

+ * *

*
Scheduler:
*
{@code timer} operates on the {@link Scheduler} you specify.
diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableFromMaybeTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableFromMaybeTest.java new file mode 100644 index 0000000000..46f62d0b7a --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableFromMaybeTest.java @@ -0,0 +1,46 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.completable; + +import io.reactivex.Completable; +import io.reactivex.Maybe; +import org.junit.Test; + +public class CompletableFromMaybeTest { + @Test(expected = NullPointerException.class) + public void fromMaybeNull() { + Completable.fromMaybe(null); + } + + @Test + public void fromMaybe() { + Completable.fromMaybe(Maybe.just(1)) + .test() + .assertResult(); + } + + @Test + public void fromMaybeEmpty() { + Completable.fromMaybe(Maybe.empty()) + .test() + .assertResult(); + } + + @Test + public void fromMaybeError() { + Completable.fromMaybe(Maybe.error(new UnsupportedOperationException())) + .test() + .assertFailure(UnsupportedOperationException.class); + } +}