diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index e6f4c9aed9..c4486b81ed 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -10063,34 +10063,150 @@ public final Single lastOrError() { } /** - * This method requires advanced knowledge about building operators; please consider + * This method requires advanced knowledge about building operators, please consider * other standard composition methods first; - * Lifts a function to the current Publisher and returns a new Publisher that when subscribed to will pass - * the values of the current Publisher through the Operator function. + * Returns a {@code Flowable} which, when subscribed to, invokes the {@link FlowableOperator#apply(Subscriber) apply(Subscriber)} method + * of the provided {@link FlowableOperator} for each individual downstream {@link Subscriber} and allows the + * insertion of a custom operator by accessing the downstream's {@link Subscriber} during this subscription phase + * and providing a new {@code Subscriber}, containing the custom operator's intended business logic, that will be + * used in the subscription process going further upstream. + *

+ * Generally, such a new {@code Subscriber} will wrap the downstream's {@code Subscriber} and forwards the + * {@code onNext}, {@code onError} and {@code onComplete} events from the upstream directly or according to the + * emission pattern the custom operator's business logic requires. In addition, such operator can intercept the + * flow control calls of {@code cancel} and {@code request} that would have travelled upstream and perform + * additional actions depending on the same business logic requirements. *

- * In other words, this allows chaining Subscribers together on a Publisher for acting on the values within - * the Publisher. - *

{@code - * Publisher.map(...).filter(...).take(5).lift(new OperatorA()).lift(new OperatorB(...)).subscribe() + * Example: + *


+     * // Step 1: Create the consumer type that will be returned by the FlowableOperator.apply():
+     * 
+     * public final class CustomSubscriber<T> implements FlowableSubscriber<T>, Subscription {
+     *
+     *     // The donstream's Subscriber that will receive the onXXX events
+     *     final Subscriber<? super String> downstream;
+     *
+     *     // The connection to the upstream source that will call this class' onXXX methods
+     *     Subscription upstream;
+     *
+     *     // The constructor takes the downstream subscriber and usually any other parameters
+     *     public CustomSubscriber(Subscriber<? super String> downstream) {
+     *         this.downstream = downstream;
+     *     }
+     *
+     *     // In the subscription phase, the upstream sends a Subscription to this class
+     *     // and subsequently this class has to send a Subscription to the downstream.
+     *     // Note that relaying the upstream's Subscription directly is not allowed in RxJava
+     *     @Override
+     *     public void onSubscribe(Subscription s) {
+     *         if (upstream != null) {
+     *             s.cancel();
+     *         } else {
+     *             upstream = s;
+     *             downstream.onSubscribe(this);
+     *         }
+     *     }
+     *
+     *     // The upstream calls this with the next item and the implementation's
+     *     // responsibility is to emit an item to the downstream based on the intended
+     *     // business logic, or if it can't do so for the particular item,
+     *     // request more from the upstream
+     *     @Override
+     *     public void onNext(T item) {
+     *         String str = item.toString();
+     *         if (str.length() < 2) {
+     *             downstream.onNext(str);
+     *         } else {
+     *             upstream.request(1);
+     *         }
+     *     }
+     *
+     *     // Some operators may handle the upstream's error while others
+     *     // could just forward it to the downstream.
+     *     @Override
+     *     public void onError(Throwable throwable) {
+     *         downstream.onError(throwable);
+     *     }
+     *
+     *     // When the upstream completes, usually the downstream should complete as well.
+     *     @Override
+     *     public void onComplete() {
+     *         downstream.onComplete();
+     *     }
+     *
+     *     // Some operators have to intercept the downstream's request calls to trigger
+     *     // the emission of queued items while others can simply forward the request
+     *     // amount as is.
+     *     @Override
+     *     public void request(long n) {
+     *         upstream.request(n);
+     *     }
+     *
+     *     // Some operators may use their own resources which should be cleaned up if
+     *     // the downstream cancels the flow before it completed. Operators without
+     *     // resources can simply forward the cancellation to the upstream.
+     *     // In some cases, a cancelled flag may be set by this method so that other parts
+     *     // of this class may detect the cancellation and stop sending events
+     *     // to the downstream.
+     *     @Override
+     *     public void cancel() {
+     *         upstream.cancel();
+     *     }
      * }
+     *
+     * // Step 2: Create a class that implements the FlowableOperator interface and
+     * //         returns the custom consumer type from above in its apply() method.
+     * //         Such class may define additional parameters to be submitted to
+     * //         the custom consumer type.
+     *
+     * final class CustomOperator<T> implements FlowableOperator<String> {
+     *     @Override
+     *     public Subscriber<? super String> apply(Subscriber<? super T> upstream) {
+     *         return new CustomSubscriber<T>(upstream);
+     *     }
+     * }
+     *
+     * // Step 3: Apply the custom operator via lift() in a flow by creating an instance of it
+     * //         or reusing an existing one.
+     *
+     * Flowable.range(5, 10)
+     * .lift(new CustomOperator<Integer>())
+     * .test()
+     * .assertResult("5", "6", "7", "8", "9");
+     * 
*

- * If the operator you are creating is designed to act on the individual items emitted by a source - * Publisher, use {@code lift}. If your operator is designed to transform the source Publisher as a whole - * (for instance, by applying a particular set of existing RxJava operators to it) use {@link #compose}. + * Creating custom operators can be complicated and it is recommended one consults the + * RxJava wiki: Writing operators page about + * the tools, requirements, rules, considerations and pitfalls of implementing them. + *

+ * Note that implementing custom operators via this {@code lift()} method adds slightly more overhead by requiring + * an additional allocation and indirection per assembled flows. Instead, extending the abstract {@code Flowable} + * class and creating a {@link FlowableTransformer} with it is recommended. + *

+ * Note also that it is not possible to stop the subscription phase in {@code lift()} as the {@code apply()} method + * requires a non-null {@code Subscriber} instance to be returned, which is then unconditionally subscribed to + * the upstream {@code Flowable}. For example, if the operator decided there is no reason to subscribe to the + * upstream source because of some optimization possibility or a failure to prepare the operator, it still has to + * return a {@code Subscriber} that should immediately cancel the upstream's {@code Subscription} in its + * {@code onSubscribe} method. Again, using a {@code FlowableTransformer} and extending the {@code Flowable} is + * a better option as {@link #subscribeActual} can decide to not subscribe to its upstream after all. *

*
Backpressure:
- *
The {@code Operator} instance provided is responsible to be backpressure-aware or - * document the fact that the consumer of the returned {@code Publisher} has to apply one of + *
The {@code Subscriber} instance returned by the {@link FlowableOperator} is responsible to be + * backpressure-aware or document the fact that the consumer of the returned {@code Publisher} has to apply one of * the {@code onBackpressureXXX} operators.
*
Scheduler:
- *
{@code lift} does not operate by default on a particular {@link Scheduler}.
+ *
{@code lift} does not operate by default on a particular {@link Scheduler}, however, the + * {@link FlowableOperator} may use a {@code Scheduler} to support its own asynchronous behavior.
*
* * @param the output value type - * @param lifter the Operator that implements the Publisher-operating function to be applied to the source - * Publisher - * @return a Flowable that is the result of applying the lifted Operator to the source Publisher - * @see RxJava wiki: Implementing Your Own Operators + * @param lifter the {@link FlowableOperator} that receives the downstream's {@code Subscriber} and should return + * a {@code Subscriber} with custom behavior to be used as the consumer for the current + * {@code Flowable}. + * @return the new Flowable instance + * @see RxJava wiki: Writing operators + * @see #compose(FlowableTransformer) */ @CheckReturnValue @BackpressureSupport(BackpressureKind.SPECIAL)