Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: More Completable marbles (+18), add Completable.fromMaybe #6085

Merged
merged 1 commit into from
Jul 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions src/main/java/io/reactivex/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,8 @@ public static Completable error(final Throwable error) {
/**
* Returns a Completable instance that runs the given Action for each subscriber and
* emits either an unchecked exception or simply completes.
* <p>
* <img width="640" height="297" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.fromAction.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromAction} does not operate by default on a particular {@link Scheduler}.</dd>
Expand All @@ -406,6 +408,8 @@ public static Completable fromAction(final Action run) {
/**
* Returns a Completable which when subscribed, executes the callable function, ignores its
* normal result and emits onError or onComplete only.
* <p>
* <img width="640" height="286" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.fromCallable.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromCallable} does not operate by default on a particular {@link Scheduler}.</dd>
Expand All @@ -423,6 +427,8 @@ public static Completable fromCallable(final Callable<?> callable) {
/**
* Returns a Completable instance that reacts to the termination of the given Future in a blocking fashion.
* <p>
* <img width="640" height="628" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.fromFuture.png" alt="">
* <p>
* Note that cancellation from any of the subscribers to this Completable will cancel the future.
* <dl>
* <dt><b>Scheduler:</b></dt>
Expand All @@ -438,9 +444,33 @@ public static Completable fromFuture(final Future<?> future) {
return fromAction(Functions.futureAction(future));
}

/**
* Returns a Completable instance that when subscribed to, subscribes to the {@code Maybe} instance and
* emits a completion event if the maybe emits {@code onSuccess}/{@code onComplete} or forwards any
* {@code onError} events.
* <p>
* <img width="640" height="235" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.fromMaybe.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromMaybe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type of the {@link MaybeSource} element
* @param maybe the Maybe instance to subscribe to, not null
* @return the new Completable instance
* @throws NullPointerException if single is null
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Completable fromMaybe(final MaybeSource<T> maybe) {
ObjectHelper.requireNonNull(maybe, "maybe is null");
return RxJavaPlugins.onAssembly(new MaybeIgnoreElementCompletable<T>(maybe));
}

/**
* Returns a Completable instance that runs the given Runnable for each subscriber and
* emits either its exception or simply completes.
* <p>
* <img width="640" height="297" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.fromRunnable.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromRunnable} does not operate by default on a particular {@link Scheduler}.</dd>
Expand All @@ -459,6 +489,8 @@ public static Completable fromRunnable(final Runnable run) {
/**
* Returns a Completable instance that subscribes to the given Observable, ignores all values and
* emits only the terminal event.
* <p>
* <img width="640" height="414" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.fromObservable.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromObservable} does not operate by default on a particular {@link Scheduler}.</dd>
Expand All @@ -479,6 +511,8 @@ public static <T> Completable fromObservable(final ObservableSource<T> observabl
* Returns a Completable instance that subscribes to the given publisher, ignores all values and
* emits only the terminal event.
* <p>
* <img width="640" height="442" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.fromPublisher.png" alt="">
* <p>
* The {@link Publisher} must follow the
* <a href="https://github.com/reactive-streams/reactive-streams-jvm#reactive-streams">Reactive-Streams specification</a>.
* Violating the specification may result in undefined behavior.
Expand Down Expand Up @@ -513,6 +547,8 @@ public static <T> Completable fromPublisher(final Publisher<T> publisher) {
/**
* Returns a Completable instance that when subscribed to, subscribes to the Single instance and
* emits a completion event if the single emits onSuccess or forwards any onError events.
* <p>
* <img width="640" height="356" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.fromSingle.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromSingle} does not operate by default on a particular {@link Scheduler}.</dd>
Expand All @@ -532,6 +568,8 @@ public static <T> Completable fromSingle(final SingleSource<T> single) {
/**
* Returns a Completable instance that subscribes to all sources at once and
* completes only when all source Completables complete or one of them emits an error.
* <p>
* <img width="640" height="270" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.mergeArray.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeArray} does not operate by default on a particular {@link Scheduler}.</dd>
Expand Down Expand Up @@ -569,6 +607,8 @@ public static Completable mergeArray(CompletableSource... sources) {
/**
* Returns a Completable instance that subscribes to all sources at once and
* completes only when all source Completables complete or one of them emits an error.
* <p>
* <img width="640" height="311" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.merge.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code merge} does not operate by default on a particular {@link Scheduler}.</dd>
Expand Down Expand Up @@ -601,6 +641,8 @@ public static Completable merge(Iterable<? extends CompletableSource> sources) {
/**
* Returns a Completable instance that subscribes to all sources at once and
* completes only when all source Completables complete or one of them emits an error.
* <p>
* <img width="640" height="336" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.merge.p.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Completable} honors the backpressure of the downstream consumer
Expand Down Expand Up @@ -635,6 +677,8 @@ public static Completable merge(Publisher<? extends CompletableSource> sources)
/**
* Returns a Completable instance that keeps subscriptions to a limited number of sources at once and
* completes only when all source Completables complete or one of them emits an error.
* <p>
* <img width="640" height="269" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.merge.pn.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Completable} honors the backpressure of the downstream consumer
Expand Down Expand Up @@ -699,6 +743,8 @@ private static Completable merge0(Publisher<? extends CompletableSource> sources
* Returns a CompletableConsumable that subscribes to all Completables in the source array and delays
* any error emitted by either the sources observable or any of the inner Completables until all of
* them terminate in a way or another.
* <p>
* <img width="640" height="430" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.mergeArrayDelayError.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeArrayDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
Expand All @@ -718,6 +764,8 @@ public static Completable mergeArrayDelayError(CompletableSource... sources) {
* Returns a Completable that subscribes to all Completables in the source sequence and delays
* any error emitted by either the sources observable or any of the inner Completables until all of
* them terminate in a way or another.
* <p>
* <img width="640" height="475" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.mergeDelayError.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
Expand All @@ -738,6 +786,8 @@ public static Completable mergeDelayError(Iterable<? extends CompletableSource>
* Returns a Completable that subscribes to all Completables in the source sequence and delays
* any error emitted by either the sources observable or any of the inner Completables until all of
* them terminate in a way or another.
* <p>
* <img width="640" height="466" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.mergeDelayError.p.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Completable} honors the backpressure of the downstream consumer
Expand All @@ -761,6 +811,8 @@ public static Completable mergeDelayError(Publisher<? extends CompletableSource>
* the source sequence and delays any error emitted by either the sources
* observable or any of the inner Completables until all of
* them terminate in a way or another.
* <p>
* <img width="640" height="440" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.mergeDelayError.pn.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Completable} honors the backpressure of the downstream consumer
Expand All @@ -782,6 +834,8 @@ public static Completable mergeDelayError(Publisher<? extends CompletableSource>

/**
* Returns a Completable that never calls onError or onComplete.
* <p>
* <img width="640" height="512" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.never.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code never} does not operate by default on a particular {@link Scheduler}.</dd>
Expand All @@ -796,6 +850,8 @@ public static Completable never() {

/**
* Returns a Completable instance that fires its onComplete event after the given delay elapsed.
* <p>
* <img width="640" height="413" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.timer.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code timer} does operate by default on the {@code computation} {@link Scheduler}.</dd>
Expand All @@ -813,6 +869,8 @@ public static Completable timer(long delay, TimeUnit unit) {
/**
* Returns a Completable instance that fires its onComplete event after the given delay elapsed
* by using the supplied scheduler.
* <p>
* <img width="640" height="413" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.timer.s.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code timer} operates on the {@link Scheduler} you specify.</dd>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.completable;

import io.reactivex.Completable;
import io.reactivex.Maybe;
import org.junit.Test;

public class CompletableFromMaybeTest {
@Test(expected = NullPointerException.class)
public void fromMaybeNull() {
Completable.fromMaybe(null);
}

@Test
public void fromMaybe() {
Completable.fromMaybe(Maybe.just(1))
.test()
.assertResult();
}

@Test
public void fromMaybeEmpty() {
Completable.fromMaybe(Maybe.<Integer>empty())
.test()
.assertResult();
}

@Test
public void fromMaybeError() {
Completable.fromMaybe(Maybe.error(new UnsupportedOperationException()))
.test()
.assertFailure(UnsupportedOperationException.class);
}
}