Skip to content

Commit

Permalink
2.x: Expand the documentation of the Flowable.lift() operator (#5863)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Feb 26, 2018
1 parent 569c5ab commit 3346ff9
Showing 1 changed file with 133 additions and 17 deletions.
150 changes: 133 additions & 17 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -10063,34 +10063,150 @@ public final Single<T> lastOrError() {
}

/**
* <strong>This method requires advanced knowledge about building operators; please consider
* <strong>This method requires advanced knowledge about building operators, please consider
* other standard composition methods first;</strong>
* Lifts a function to the current Publisher and returns a new Publisher that when subscribed to will pass
* the values of the current Publisher through the Operator function.
* Returns a {@code Flowable} which, when subscribed to, invokes the {@link FlowableOperator#apply(Subscriber) apply(Subscriber)} method
* of the provided {@link FlowableOperator} for each individual downstream {@link Subscriber} and allows the
* insertion of a custom operator by accessing the downstream's {@link Subscriber} during this subscription phase
* and providing a new {@code Subscriber}, containing the custom operator's intended business logic, that will be
* used in the subscription process going further upstream.
* <p>
* Generally, such a new {@code Subscriber} will wrap the downstream's {@code Subscriber} and forwards the
* {@code onNext}, {@code onError} and {@code onComplete} events from the upstream directly or according to the
* emission pattern the custom operator's business logic requires. In addition, such operator can intercept the
* flow control calls of {@code cancel} and {@code request} that would have travelled upstream and perform
* additional actions depending on the same business logic requirements.
* <p>
* In other words, this allows chaining Subscribers together on a Publisher for acting on the values within
* the Publisher.
* <p> {@code
* Publisher.map(...).filter(...).take(5).lift(new OperatorA()).lift(new OperatorB(...)).subscribe()
* Example:
* <pre><code>
* // Step 1: Create the consumer type that will be returned by the FlowableOperator.apply():
*
* public final class CustomSubscriber&lt;T&gt; implements FlowableSubscriber&lt;T&gt;, Subscription {
*
* // The donstream's Subscriber that will receive the onXXX events
* final Subscriber&lt;? super String&gt; downstream;
*
* // The connection to the upstream source that will call this class' onXXX methods
* Subscription upstream;
*
* // The constructor takes the downstream subscriber and usually any other parameters
* public CustomSubscriber(Subscriber&lt;? super String&gt; downstream) {
* this.downstream = downstream;
* }
*
* // In the subscription phase, the upstream sends a Subscription to this class
* // and subsequently this class has to send a Subscription to the downstream.
* // Note that relaying the upstream's Subscription directly is not allowed in RxJava
* &#64;Override
* public void onSubscribe(Subscription s) {
* if (upstream != null) {
* s.cancel();
* } else {
* upstream = s;
* downstream.onSubscribe(this);
* }
* }
*
* // The upstream calls this with the next item and the implementation's
* // responsibility is to emit an item to the downstream based on the intended
* // business logic, or if it can't do so for the particular item,
* // request more from the upstream
* &#64;Override
* public void onNext(T item) {
* String str = item.toString();
* if (str.length() &lt; 2) {
* downstream.onNext(str);
* } else {
* upstream.request(1);
* }
* }
*
* // Some operators may handle the upstream's error while others
* // could just forward it to the downstream.
* &#64;Override
* public void onError(Throwable throwable) {
* downstream.onError(throwable);
* }
*
* // When the upstream completes, usually the downstream should complete as well.
* &#64;Override
* public void onComplete() {
* downstream.onComplete();
* }
*
* // Some operators have to intercept the downstream's request calls to trigger
* // the emission of queued items while others can simply forward the request
* // amount as is.
* &#64;Override
* public void request(long n) {
* upstream.request(n);
* }
*
* // Some operators may use their own resources which should be cleaned up if
* // the downstream cancels the flow before it completed. Operators without
* // resources can simply forward the cancellation to the upstream.
* // In some cases, a cancelled flag may be set by this method so that other parts
* // of this class may detect the cancellation and stop sending events
* // to the downstream.
* &#64;Override
* public void cancel() {
* upstream.cancel();
* }
* }
*
* // Step 2: Create a class that implements the FlowableOperator interface and
* // returns the custom consumer type from above in its apply() method.
* // Such class may define additional parameters to be submitted to
* // the custom consumer type.
*
* final class CustomOperator&lt;T&gt; implements FlowableOperator&lt;String&gt; {
* &#64;Override
* public Subscriber&lt;? super String&gt; apply(Subscriber&lt;? super T&gt; upstream) {
* return new CustomSubscriber&lt;T&gt;(upstream);
* }
* }
*
* // Step 3: Apply the custom operator via lift() in a flow by creating an instance of it
* // or reusing an existing one.
*
* Flowable.range(5, 10)
* .lift(new CustomOperator&lt;Integer&gt;())
* .test()
* .assertResult("5", "6", "7", "8", "9");
* </code></pre>
* <p>
* If the operator you are creating is designed to act on the individual items emitted by a source
* Publisher, use {@code lift}. If your operator is designed to transform the source Publisher as a whole
* (for instance, by applying a particular set of existing RxJava operators to it) use {@link #compose}.
* Creating custom operators can be complicated and it is recommended one consults the
* <a href="https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0">RxJava wiki: Writing operators</a> page about
* the tools, requirements, rules, considerations and pitfalls of implementing them.
* <p>
* Note that implementing custom operators via this {@code lift()} method adds slightly more overhead by requiring
* an additional allocation and indirection per assembled flows. Instead, extending the abstract {@code Flowable}
* class and creating a {@link FlowableTransformer} with it is recommended.
* <p>
* Note also that it is not possible to stop the subscription phase in {@code lift()} as the {@code apply()} method
* requires a non-null {@code Subscriber} instance to be returned, which is then unconditionally subscribed to
* the upstream {@code Flowable}. For example, if the operator decided there is no reason to subscribe to the
* upstream source because of some optimization possibility or a failure to prepare the operator, it still has to
* return a {@code Subscriber} that should immediately cancel the upstream's {@code Subscription} in its
* {@code onSubscribe} method. Again, using a {@code FlowableTransformer} and extending the {@code Flowable} is
* a better option as {@link #subscribeActual} can decide to not subscribe to its upstream after all.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The {@code Operator} instance provided is responsible to be backpressure-aware or
* document the fact that the consumer of the returned {@code Publisher} has to apply one of
* <dd>The {@code Subscriber} instance returned by the {@link FlowableOperator} is responsible to be
* backpressure-aware or document the fact that the consumer of the returned {@code Publisher} has to apply one of
* the {@code onBackpressureXXX} operators.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code lift} does not operate by default on a particular {@link Scheduler}.</dd>
* <dd>{@code lift} does not operate by default on a particular {@link Scheduler}, however, the
* {@link FlowableOperator} may use a {@code Scheduler} to support its own asynchronous behavior.</dd>
* </dl>
*
* @param <R> the output value type
* @param lifter the Operator that implements the Publisher-operating function to be applied to the source
* Publisher
* @return a Flowable that is the result of applying the lifted Operator to the source Publisher
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators">RxJava wiki: Implementing Your Own Operators</a>
* @param lifter the {@link FlowableOperator} that receives the downstream's {@code Subscriber} and should return
* a {@code Subscriber} with custom behavior to be used as the consumer for the current
* {@code Flowable}.
* @return the new Flowable instance
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0">RxJava wiki: Writing operators</a>
* @see #compose(FlowableTransformer)
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.SPECIAL)
Expand Down

0 comments on commit 3346ff9

Please sign in to comment.