Skip to content

Commit

Permalink
2.x: Add Subject and Processor marbles (#5816)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Jan 22, 2018
1 parent eb426fd commit bcc419c
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 13 deletions.
6 changes: 4 additions & 2 deletions src/main/java/io/reactivex/processors/AsyncProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
/**
* Processor that emits the very last value followed by a completion event or the received error
* to {@link Subscriber}s.
*
* <p>The implementation of onXXX methods are technically thread-safe but non-serialized calls
* <p>
* <img width="640" height="239" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/AsyncProcessor.png" alt="">
* <p>
* The implementation of onXXX methods are technically thread-safe but non-serialized calls
* to them may lead to undefined state in the currently subscribed Subscribers.
*
* @param <T> the value type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* Processor that multicasts all subsequently observed items to its current {@link Subscriber}s.
*
* <p>
* <img width="640" height="405" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/S.PublishSubject.png" alt="">
* <img width="640" height="278" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/PublishProcessor.png" alt="">
*
* <p>The processor does not coordinate backpressure for its subscribers and implements a weaker onSubscribe which
* calls requests Long.MAX_VALUE from the incoming Subscriptions. This makes it possible to subscribe the PublishProcessor
Expand Down
27 changes: 25 additions & 2 deletions src/main/java/io/reactivex/processors/ReplayProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,31 @@
/**
* Replays events to Subscribers.
* <p>
* <img width="640" height="405" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/S.ReplaySubject.png" alt="">
*
* The {@code ReplayProcessor} supports the following item retainment strategies:
* <ul>
* <li>{@link #create()} and {@link #create(int)}: retains and replays all events to current and
* future {@code Subscriber}s.
* <p>
* <img width="640" height="269" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ReplayProcessor.u.png" alt="">
* <p>
* <img width="640" height="345" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ReplayProcessor.ue.png" alt="">
* </li>
* <li>{@link #createWithSize(int)}: retains at most the given number of items and replays only these
* latest items to new {@code Subscriber}s.
* <p>
* <img width="640" height="332" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ReplayProcessor.n.png" alt="">
* </li>
* <li>{@link #createWithTime(long, TimeUnit, Scheduler)}: retains items no older than the specified time
* and replays them to new {@code Subscriber}s (which could mean all items age out).
* <p>
* <img width="640" height="415" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ReplayProcessor.t.png" alt="">
* </li>
* <li>{@link #createWithTimeAndSize(long, TimeUnit, Scheduler, int)}: retaims no more than the given number of items
* which are also no older than the specified time and replays them to new {@code Subscriber}s (which could mean all items age out).
* <p>
* <img width="640" height="404" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ReplayProcessor.nt.png" alt="">
* </li>
* </ul>
* <p>
* The ReplayProcessor can be created in bounded and unbounded mode. It can be bounded by
* size (maximum number of elements retained at most) and/or time (maximum age of elements replayed).
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/reactivex/subjects/AsyncSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
/**
* A Subject that emits the very last value followed by a completion event or the received error to Observers.
* <p>
* <img width="640" height="239" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/AsyncSubject.png" alt="">
* <p>
* This subject does not have a public constructor by design; a new empty instance of this
* {@code AsyncSubject} can be created via the {@link #create()} method.
* <p>
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/reactivex/subjects/CompletableSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
/**
* Represents a hot Completable-like source and consumer of events similar to Subjects.
* <p>
* <img width="640" height="243" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/CompletableSubject.png" alt="">
* <p>
* This subject does not have a public constructor by design; a new non-terminated instance of this
* {@code CompletableSubject} can be created via the {@link #create()} method.
* <p>
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/reactivex/subjects/MaybeSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
/**
* Represents a hot Maybe-like source and consumer of events similar to Subjects.
* <p>
* <img width="640" height="164" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/MaybeSubject.png" alt="">
* <p>
* This subject does not have a public constructor by design; a new non-terminated instance of this
* {@code MaybeSubject} can be created via the {@link #create()} method.
* <p>
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/reactivex/subjects/PublishSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* A Subject that emits (multicasts) items to currently subscribed {@link Observer}s and terminal events to current
* or late {@code Observer}s.
* <p>
* <img width="640" height="405" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/S.PublishSubject.png" alt="">
* <img width="640" height="281" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/PublishSubject.png" alt="">
* <p>
* This subject does not have a public constructor by design; a new empty instance of this
* {@code PublishSubject} can be created via the {@link #create()} method.
Expand Down
27 changes: 20 additions & 7 deletions src/main/java/io/reactivex/subjects/ReplaySubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,36 @@
/**
* Replays events (in a configurable bounded or unbounded manner) to current and late {@link Observer}s.
* <p>
* <img width="640" height="405" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/S.ReplaySubject.png" alt="">
* <p>
* This subject does not have a public constructor by design; a new empty instance of this
* {@code ReplaySubject} can be created via the following {@code create} methods that
* allow specifying the retention policy for items:
* <ul>
* <li>{@link #create()} - creates an empty, unbounded {@code ReplaySubject} that
* caches all items and the terminal event it receives.</li>
* caches all items and the terminal event it receives.
* <p>
* <img width="640" height="299" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ReplaySubject.u.png" alt="">
* <p>
* <img width="640" height="398" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ReplaySubject.ue.png" alt="">
* </li>
* <li>{@link #create(int)} - creates an empty, unbounded {@code ReplaySubject}
* with a hint about how many <b>total</b> items one expects to retain.</li>
* with a hint about how many <b>total</b> items one expects to retain.
* </li>
* <li>{@link #createWithSize(int)} - creates an empty, size-bound {@code ReplaySubject}
* that retains at most the given number of the latest item it receives.</li>
* that retains at most the given number of the latest item it receives.
* <p>
* <img width="640" height="420" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ReplaySubject.n.png" alt="">
* </li>
* <li>{@link #createWithTime(long, TimeUnit, Scheduler)} - creates an empty, time-bound
* {@code ReplaySubject} that retains items no older than the specified time amount.</li>
* {@code ReplaySubject} that retains items no older than the specified time amount.
* <p>
* <img width="640" height="415" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ReplaySubject.t.png" alt="">
* </li>
* <li>{@link #createWithTimeAndSize(long, TimeUnit, Scheduler, int)} - creates an empty,
* time- and size-bound {@code ReplaySubject} that retains at most the given number
* items that are also not older than the specified time amount.</li>
* items that are also not older than the specified time amount.
* <p>
* <img width="640" height="404" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ReplaySubject.nt.png" alt="">
* </li>
* </ul>
* <p>
* Since a {@code Subject} is conceptionally derived from the {@code Processor} type in the Reactive Streams specification,
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/reactivex/subjects/SingleSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
/**
* Represents a hot Single-like source and consumer of events similar to Subjects.
* <p>
* <img width="640" height="236" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/SingleSubject.png" alt="">
* <p>
* This subject does not have a public constructor by design; a new non-terminated instance of this
* {@code SingleSubject} can be created via the {@link #create()} method.
* <p>
Expand Down

0 comments on commit bcc419c

Please sign in to comment.