Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Add X.fromSupplier() #6529

Merged
merged 2 commits into from
Jun 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions src/main/java/io/reactivex/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,8 @@ public static Completable fromAction(final Action run) {
* </dl>
* @param callable the callable instance to execute for each subscriber
* @return the new Completable instance
* @see #defer(Supplier)
* @see #fromSupplier(Supplier)
*/
@CheckReturnValue
@NonNull
Expand Down Expand Up @@ -609,6 +611,36 @@ public static <T> Completable fromSingle(final SingleSource<T> single) {
return RxJavaPlugins.onAssembly(new CompletableFromSingle<T>(single));
}

/**
* Returns a Completable which when subscribed, executes the supplier function, ignores its
* normal result and emits onError or onComplete only.
* <p>
* <img width="640" height="286" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.fromCallable.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromSupplier} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd> If the {@link Supplier} throws an exception, the respective {@link Throwable} is
* delivered to the downstream via {@link CompletableObserver#onError(Throwable)},
* except when the downstream has disposed this {@code Completable} source.
* In this latter case, the {@code Throwable} is delivered to the global error handler via
* {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.exceptions.UndeliverableException UndeliverableException}.
* </dd>
* </dl>
* @param supplier the Supplier instance to execute for each subscriber
* @return the new Completable instance
* @see #defer(Supplier)
* @see #fromCallable(Callable)
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static Completable fromSupplier(final Supplier<?> supplier) {
ObjectHelper.requireNonNull(supplier, "supplier is null");
return RxJavaPlugins.onAssembly(new CompletableFromSupplier(supplier));
}

/**
* Returns a Completable instance that subscribes to all sources at once and
* completes only when all source Completables complete or one of them emits an error.
Expand Down
42 changes: 42 additions & 0 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2087,6 +2087,7 @@ public static <T> Flowable<T> fromArray(T... items) {
* the type of the item emitted by the Publisher
* @return a Flowable whose {@link Subscriber}s' subscriptions trigger an invocation of the given function
* @see #defer(Supplier)
* @see #fromSupplier(Supplier)
* @since 2.0
*/
@CheckReturnValue
Expand Down Expand Up @@ -2331,6 +2332,47 @@ public static <T> Flowable<T> fromPublisher(final Publisher<? extends T> source)
return RxJavaPlugins.onAssembly(new FlowableFromPublisher<T>(source));
}

/**
* Returns a Flowable that, when a Subscriber subscribes to it, invokes a supplier function you specify and then
* emits the value returned from that function.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/fromCallable.png" alt="">
* <p>
* This allows you to defer the execution of the function you specify until a Subscriber subscribes to the
* Publisher. That is to say, it makes the function "lazy."
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromSupplier} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd> If the {@link Supplier} throws an exception, the respective {@link Throwable} is
* delivered to the downstream via {@link Subscriber#onError(Throwable)},
* except when the downstream has canceled this {@code Flowable} source.
* In this latter case, the {@code Throwable} is delivered to the global error handler via
* {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.exceptions.UndeliverableException UndeliverableException}.
* </dd>
* </dl>
*
* @param supplier
* a function, the execution of which should be deferred; {@code fromSupplier} will invoke this
* function only when a Subscriber subscribes to the Publisher that {@code fromSupplier} returns
* @param <T>
* the type of the item emitted by the Publisher
* @return a Flowable whose {@link Subscriber}s' subscriptions trigger an invocation of the given function
* @see #defer(Supplier)
* @see #fromCallable(Callable)
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> fromSupplier(Supplier<? extends T> supplier) {
ObjectHelper.requireNonNull(supplier, "supplier is null");
return RxJavaPlugins.onAssembly(new FlowableFromSupplier<T>(supplier));
}

/**
* Returns a cold, synchronous, stateless and backpressure-aware generator of values.
* <p>
Expand Down
47 changes: 47 additions & 0 deletions src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,8 @@ public static <T> Maybe<T> fromSingle(SingleSource<T> singleSource) {
* @param <T>
* the type of the item emitted by the {@link Maybe}.
* @return a new Maybe instance
* @see #defer(Supplier)
* @see #fromSupplier(Supplier)
*/
@CheckReturnValue
@NonNull
Expand Down Expand Up @@ -865,6 +867,51 @@ public static <T> Maybe<T> fromRunnable(final Runnable run) {
return RxJavaPlugins.onAssembly(new MaybeFromRunnable<T>(run));
}

/**
* Returns a {@link Maybe} that invokes the given {@link Supplier} for each individual {@link MaybeObserver} that
* subscribes and emits the resulting non-null item via {@code onSuccess} while
* considering a {@code null} result from the {@code Supplier} as indication for valueless completion
* via {@code onComplete}.
* <p>
* This operator allows you to defer the execution of the given {@code Supplier} until a {@code MaybeObserver}
* subscribes to the returned {@link Maybe}. In other terms, this source operator evaluates the given
* {@code Supplier} "lazily".
* <p>
* Note that the {@code null} handling of this operator differs from the similar source operators in the other
* {@link io.reactivex base reactive classes}. Those operators signal a {@code NullPointerException} if the value returned by their
* {@code Supplier} is {@code null} while this {@code fromSupplier} considers it to indicate the
* returned {@code Maybe} is empty.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromSupplier} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>Any non-fatal exception thrown by {@link Supplier#get()} will be forwarded to {@code onError},
* except if the {@code MaybeObserver} disposed the subscription in the meantime. In this latter case,
* the exception is forwarded to the global error handler via
* {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} wrapped into a
* {@link io.reactivex.exceptions.UndeliverableException UndeliverableException}.
* Fatal exceptions are rethrown and usually will end up in the executing thread's
* {@link java.lang.Thread.UncaughtExceptionHandler#uncaughtException(Thread, Throwable)} handler.</dd>
* </dl>
*
* @param supplier
* a {@link Supplier} instance whose execution should be deferred and performed for each individual
* {@code MaybeObserver} that subscribes to the returned {@link Maybe}.
* @param <T>
* the type of the item emitted by the {@link Maybe}.
* @return a new Maybe instance
* @see #defer(Supplier)
* @see #fromCallable(Callable)
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Maybe<T> fromSupplier(@NonNull final Supplier<? extends T> supplier) {
ObjectHelper.requireNonNull(supplier, "supplier is null");
return RxJavaPlugins.onAssembly(new MaybeFromSupplier<T>(supplier));
}

/**
* Returns a {@code Maybe} that emits a specified item.
* <p>
Expand Down
38 changes: 38 additions & 0 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1798,6 +1798,7 @@ public static <T> Observable<T> fromArray(T... items) {
* the type of the item emitted by the ObservableSource
* @return an Observable whose {@link Observer}s' subscriptions trigger an invocation of the given function
* @see #defer(Supplier)
* @see #fromSupplier(Supplier)
* @since 2.0
*/
@CheckReturnValue
Expand Down Expand Up @@ -2021,6 +2022,43 @@ public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher)
return RxJavaPlugins.onAssembly(new ObservableFromPublisher<T>(publisher));
}

/**
* Returns an Observable that, when an observer subscribes to it, invokes a supplier function you specify and then
* emits the value returned from that function.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/fromCallable.png" alt="">
* <p>
* This allows you to defer the execution of the function you specify until an observer subscribes to the
* ObservableSource. That is to say, it makes the function "lazy."
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromSupplier} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd> If the {@link Supplier} throws an exception, the respective {@link Throwable} is
* delivered to the downstream via {@link Observer#onError(Throwable)},
* except when the downstream has disposed this {@code Observable} source.
* In this latter case, the {@code Throwable} is delivered to the global error handler via
* {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.exceptions.UndeliverableException UndeliverableException}.
* </dd>
* </dl>
* @param supplier
* a function, the execution of which should be deferred; {@code fromSupplier} will invoke this
* function only when an observer subscribes to the ObservableSource that {@code fromSupplier} returns
* @param <T>
* the type of the item emitted by the ObservableSource
* @return an Observable whose {@link Observer}s' subscriptions trigger an invocation of the given function
* @see #defer(Supplier)
* @see #fromCallable(Callable)
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> fromSupplier(Supplier<? extends T> supplier) {
ObjectHelper.requireNonNull(supplier, "supplier is null");
return RxJavaPlugins.onAssembly(new ObservableFromSupplier<T>(supplier));
}

/**
* Returns a cold, synchronous and stateless generator of values.
* <p>
Expand Down
40 changes: 40 additions & 0 deletions src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,8 @@ public static <T> Single<T> error(final Throwable exception) {
* @param <T>
* the type of the item emitted by the {@link Single}.
* @return a {@link Single} whose {@link SingleObserver}s' subscriptions trigger an invocation of the given function.
* @see #defer(Supplier)
* @see #fromSupplier(Supplier)
*/
@CheckReturnValue
@NonNull
Expand Down Expand Up @@ -811,6 +813,44 @@ public static <T> Single<T> fromObservable(ObservableSource<? extends T> observa
return RxJavaPlugins.onAssembly(new ObservableSingleSingle<T>(observableSource, null));
}

/**
* Returns a {@link Single} that invokes passed supplierfunction and emits its result
* for each new SingleObserver that subscribes.
* <p>
* Allows you to defer execution of passed function until SingleObserver subscribes to the {@link Single}.
* It makes passed function "lazy".
* Result of the function invocation will be emitted by the {@link Single}.
* <p>
* <img width="640" height="467" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.fromCallable.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromSupplier} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd> If the {@link Supplier} throws an exception, the respective {@link Throwable} is
* delivered to the downstream via {@link SingleObserver#onError(Throwable)},
* except when the downstream has disposed this {@code Single} source.
* In this latter case, the {@code Throwable} is delivered to the global error handler via
* {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.exceptions.UndeliverableException UndeliverableException}.
* </dd>
* </dl>
*
* @param supplier
* function which execution should be deferred, it will be invoked when SingleObserver will subscribe to the {@link Single}.
* @param <T>
* the type of the item emitted by the {@link Single}.
* @return a {@link Single} whose {@link SingleObserver}s' subscriptions trigger an invocation of the given function.
* @see #defer(Supplier)
* @see #fromCallable(Callable)
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Single<T> fromSupplier(final Supplier<? extends T> supplier) {
ObjectHelper.requireNonNull(supplier, "supplier is null");
return RxJavaPlugins.onAssembly(new SingleFromSupplier<T>(supplier));
}

/**
* Returns a {@code Single} that emits a specified item.
* <p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.completable;

import io.reactivex.*;
import io.reactivex.disposables.*;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Supplier;
import io.reactivex.plugins.RxJavaPlugins;

/**
* Call a Supplier for each incoming CompletableObserver and signal completion or the thrown exception.
* @since 3.0.0
*/
public final class CompletableFromSupplier extends Completable {

final Supplier<?> supplier;

public CompletableFromSupplier(Supplier<?> supplier) {
this.supplier = supplier;
}

@Override
protected void subscribeActual(CompletableObserver observer) {
Disposable d = Disposables.empty();
observer.onSubscribe(d);
try {
supplier.get();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
if (!d.isDisposed()) {
observer.onError(e);
} else {
RxJavaPlugins.onError(e);
}
return;
}
if (!d.isDisposed()) {
observer.onComplete();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@

import io.reactivex.Flowable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Supplier;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.subscriptions.DeferredScalarSubscription;
import io.reactivex.plugins.RxJavaPlugins;

public final class FlowableFromCallable<T> extends Flowable<T> implements Callable<T> {
public final class FlowableFromCallable<T> extends Flowable<T> implements Supplier<T> {
final Callable<? extends T> callable;
public FlowableFromCallable(Callable<? extends T> callable) {
this.callable = callable;
Expand Down Expand Up @@ -51,7 +52,7 @@ public void subscribeActual(Subscriber<? super T> s) {
}

@Override
public T call() throws Exception {
public T get() throws Throwable {
return ObjectHelper.requireNonNull(callable.call(), "The callable returned a null value");
}
}
Loading