Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Improve BehaviorProcessor JavaDoc #5778

Merged
merged 2 commits into from
Dec 27, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 90 additions & 9 deletions src/main/java/io/reactivex/processors/BehaviorProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,87 @@
* <p>
* <img width="640" height="460" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/S.BehaviorProcessor.png" alt="">
* <p>
* This processor does not have a public constructor by design; a new empty instance of this
* {@code BehaviorSubject} can be created via the {@link #create()} method and
* a new non-empty instance can be created via {@link #createDefault(Object)} (named as such to avoid
* overload resolution conflict with {@code Flowable.create} that creates a Flowable, not a {@code BehaviorProcessor}).
* <p>
* In accordance with the Reactive Streams specification (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.13">Rule 2.13</a>)
* {@code null}s are not allowed as default initial values in {@link #createDefault(Object)} or as parameters to {@link #onNext(Object)} and
* {@link #onError(Throwable)}.
* <p>
* When this {@code BehaviorProcessor} is terminated via {@link #onError(Throwable)} or {@link #onComplete()}, the
* last observed item (if any) is cleared and late {@link org.reactivestreams.Subscriber}s only receive
* the respective terminal event.
* <p>
* The {@code BehaviorProcessor} does not support clearing its cached value (to appear empty again), however, the
* effect can be achieved by using a special item and making sure {@code Subscriber}s subscribe through a
* filter whose predicate filters out this special item:
* <pre><code>
* BehaviorProcessor&lt;Integer&gt; processor = BehaviorProcessor.create();
*
* final Integer EMPTY = Integer.MIN_VALUE;
*
* Flowable&lt;Integer&gt; flowable = processor.filter(v -&gt; v != EMPTY);
*
* TestSubscriber&lt;Integer&gt; ts1 = flowable.test();
*
* processor.onNext(1);
* // this will "clear" the cache
* processor.onNext(EMPTY);
*
* TestSubscriber&lt;Integer&gt; ts2 = flowable.test();
*
* processor.onNext(2);
* processor.onComplete();
*
* // ts1 received both non-empty items
* ts1.assertResult(1, 2);
*
* // ts2 received only 2 even though the current item was EMPTY
* // when it got subscribed
* ts2.assertResult(2);
*
* // Subscribers coming after the processor was terminated receive
* // no items and only the onComplete event in this case.
* flowable.test().assertResult();
* </code></pre>
* <p>
* Even though {@code BehaviorProcessor} implements the {@code Subscriber} interface, calling
* {@code onSubscribe} is not required (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.12">Rule 2.12</a>)
* if the processor is used as a standalone source. However, calling {@code onSubscribe} is
* called after the {@code BehaviorProcessor} reached its terminal state will result in the
* given {@code Subscription} being cancelled immediately.
* <p>
* Calling {@link #onNext(Object)}, {@link #onError(Throwable)} and {@link #onComplete()}
* is still required to be serialized (called from the same thread or called non-overlappingly from different threads
* through external means of serialization). The {@link #toSerialized()} method available to all {@code FlowableProcessor}s
* provides such serialization and also protects against reentrance (i.e., when a downstream {@code Subscriber}
* consuming this processor also wants to call {@link #onNext(Object)} on this processor recursively.
* <p>
* This {@code BehaviorProcessor} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()},
* {@link #getThrowable()} and {@link #hasSubscribers()} as well as means to read the latest observed value
* in a non-blocking and thread-safe manner via {@link #hasValue()}, {@link #getValue()},
* {@link #getValues()} or {@link #getValues(Object[])}.
* <p>
* Note that this processor signals {@code MissingBackpressureException} if a particular {@code Subscriber} is not
* ready to receive {@code onNext} events. To avoid this exception being signaled, use {@link #offer(Object)} to only
* try to emit an item when all {@code Subscriber}s have requested item(s).
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The {@code BehaviorProcessor} does not coordinate requests of its downstream {@code Subscriber}s and
* expects each individual {@code Subscriber} is ready to receive {@code onNext} items when {@link #onNext(Object)}
* is called. If a {@code Subscriber} is not ready, a {@code MissingBackpressureException} is signalled to it.
* To avoid overflowing the current {@code Subscriber}s, the conditional {@link #offer(Object)} method is available
* that returns true if any of the {@code Subscriber}s is not ready to receive {@code onNext} events. If
* there are no {@code Subscriber}s to the processor, {@code offer()} always succeeds.
* If the {@code BehaviorProcessor} is (optionally) subscribed to another {@code Publisher}, this upstream
* {@code Publisher} is consumed in an unbounded fashion (requesting {@code Long.MAX_VALUE}).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code BehaviorProcessor} does not operate by default on a particular {@link io.reactivex.Scheduler} and
* the {@code Subscriber}s get notified on the thread the respective {@code onXXX} methods were invoked.</dd>
* </dl>
* <p>
* Example usage:
* <pre> {@code

Expand Down Expand Up @@ -94,7 +175,7 @@ public final class BehaviorProcessor<T> extends FlowableProcessor<T> {
* Creates a {@link BehaviorProcessor} without a default item.
*
* @param <T>
* the type of item the Subject will emit
* the type of item the BehaviorProcessor will emit
* @return the constructed {@link BehaviorProcessor}
*/
@CheckReturnValue
Expand All @@ -107,7 +188,7 @@ public static <T> BehaviorProcessor<T> create() {
* {@link Subscriber} that subscribes to it.
*
* @param <T>
* the type of item the Subject will emit
* the type of item the BehaviorProcessor will emit
* @param defaultValue
* the item that will be emitted first to any {@link Subscriber} as long as the
* {@link BehaviorProcessor} has not yet observed any items from its source {@code Observable}
Expand Down Expand Up @@ -266,9 +347,9 @@ public Throwable getThrowable() {
}

/**
* Returns a single value the Subject currently has or null if no such value exists.
* Returns a single value the BehaviorProcessor currently has or null if no such value exists.
* <p>The method is thread-safe.
* @return a single value the Subject currently has or null if no such value exists
* @return a single value the BehaviorProcessor currently has or null if no such value exists
*/
public T getValue() {
Object o = value.get();
Expand All @@ -279,9 +360,9 @@ public T getValue() {
}

/**
* Returns an Object array containing snapshot all values of the Subject.
* Returns an Object array containing snapshot all values of the BehaviorProcessor.
* <p>The method is thread-safe.
* @return the array containing the snapshot of all values of the Subject
* @return the array containing the snapshot of all values of the BehaviorProcessor
*/
public Object[] getValues() {
@SuppressWarnings("unchecked")
Expand All @@ -295,7 +376,7 @@ public Object[] getValues() {
}

/**
* Returns a typed array containing a snapshot of all values of the Subject.
* Returns a typed array containing a snapshot of all values of the BehaviorProcessor.
* <p>The method follows the conventions of Collection.toArray by setting the array element
* after the last value to null (if the capacity permits).
* <p>The method is thread-safe.
Expand Down Expand Up @@ -337,9 +418,9 @@ public boolean hasThrowable() {
}

/**
* Returns true if the subject has any value.
* Returns true if the BehaviorProcessor has any value.
* <p>The method is thread-safe.
* @return true if the subject has any value
* @return true if the BehaviorProcessor has any value
*/
public boolean hasValue() {
Object o = value.get();
Expand Down