Skip to content

Commit

Permalink
2.x: Add finite requirement to various collector operators
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Feb 17, 2018
1 parent 12c0e30 commit 444b9a5
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 33 deletions.
96 changes: 80 additions & 16 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -6706,12 +6706,16 @@ public final <U> Flowable<U> cast(final Class<U> clazz) {
}

/**
* Collects items emitted by the source Publisher into a single mutable data structure and returns
* Collects items emitted by the finite source Publisher into a single mutable data structure and returns
* a Single that emits this structure.
* <p>
* <img width="640" height="330" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/collect.png" alt="">
* <p>
* This is a simplified version of {@code reduce} that does not need to return the state on each pass.
* <p>
* Note that this operator requires the upstream to signal {@code onComplete} for the accumulator to
* be emitted. Sources that are infinite and never complete will never emit anything through this
* operator and an infinite source may lead to a fatal {@code OutOfMemoryError}.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This operator does not support backpressure because by intent it will receive all values and reduce
Expand Down Expand Up @@ -6740,12 +6744,16 @@ public final <U> Single<U> collect(Callable<? extends U> initialItemSupplier, Bi
}

/**
* Collects items emitted by the source Publisher into a single mutable data structure and returns
* Collects items emitted by the finite source Publisher into a single mutable data structure and returns
* a Single that emits this structure.
* <p>
* <img width="640" height="330" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/collect.png" alt="">
* <p>
* This is a simplified version of {@code reduce} that does not need to return the state on each pass.
* <p>
* Note that this operator requires the upstream to signal {@code onComplete} for the accumulator to
* be emitted. Sources that are infinite and never complete will never emit anything through this
* operator and an infinite source may lead to a fatal {@code OutOfMemoryError}.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This operator does not support backpressure because by intent it will receive all values and reduce
Expand Down Expand Up @@ -11065,7 +11073,7 @@ public final Flowable<T> rebatchRequests(int n) {
/**
* Returns a Maybe that applies a specified accumulator function to the first item emitted by a source
* Publisher, then feeds the result of that function along with the second item emitted by the source
* Publisher into the same function, and so on until all items have been emitted by the source Publisher,
* Publisher into the same function, and so on until all items have been emitted by the finite source Publisher,
* and emits the final result from the final call to your function as its sole item.
* <p>
* If the source is empty, a {@code NoSuchElementException} is signalled.
Expand All @@ -11075,6 +11083,10 @@ public final Flowable<T> rebatchRequests(int n) {
* This technique, which is called "reduce" here, is sometimes called "aggregate," "fold," "accumulate,"
* "compress," or "inject" in other programming contexts. Groovy, for instance, has an {@code inject} method
* that does a similar operation on lists.
* <p>
* Note that this operator requires the upstream to signal {@code onComplete} for the accumulator to
* be emitted. Sources that are infinite and never complete will never emit anything through this
* operator and an infinite source may lead to a fatal {@code OutOfMemoryError}.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure of its downstream consumer and consumes the
Expand Down Expand Up @@ -11103,7 +11115,7 @@ public final Maybe<T> reduce(BiFunction<T, T, T> reducer) {
* Returns a Single that applies a specified accumulator function to the first item emitted by a source
* Publisher and a specified seed value, then feeds the result of that function along with the second item
* emitted by a Publisher into the same function, and so on until all items have been emitted by the
* source Publisher, emitting the final result from the final call to your function as its sole item.
* finite source Publisher, emitting the final result from the final call to your function as its sole item.
* <p>
* <img width="640" height="325" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/reduceSeed.png" alt="">
* <p>
Expand All @@ -11128,6 +11140,10 @@ public final Maybe<T> reduce(BiFunction<T, T, T> reducer) {
*
* source.reduceWith(() -&gt; new ArrayList&lt;&gt;(), (list, item) -&gt; list.add(item)));
* </code></pre>
* <p>
* Note that this operator requires the upstream to signal {@code onComplete} for the accumulated list to
* be emitted. Sources that are infinite and never complete will never emit anything through this
* operator and an infinite source may lead to a fatal {@code OutOfMemoryError}.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure of its downstream consumer and consumes the
Expand Down Expand Up @@ -11161,14 +11177,18 @@ public final <R> Single<R> reduce(R seed, BiFunction<R, ? super T, R> reducer) {
* Returns a Single that applies a specified accumulator function to the first item emitted by a source
* Publisher and a seed value derived from calling a specified seedSupplier, then feeds the result
* of that function along with the second item emitted by a Publisher into the same function, and so on until
* all items have been emitted by the source Publisher, emitting the final result from the final call to your
* all items have been emitted by the finite source Publisher, emitting the final result from the final call to your
* function as its sole item.
* <p>
* <img width="640" height="325" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/reduceSeed.png" alt="">
* <p>
* This technique, which is called "reduce" here, is sometimes called "aggregate," "fold," "accumulate,"
* "compress," or "inject" in other programming contexts. Groovy, for instance, has an {@code inject} method
* that does a similar operation on lists.
* <p>
* Note that this operator requires the upstream to signal {@code onComplete} for the accumulated list to
* be emitted. Sources that are infinite and never complete will never emit anything through this
* operator and an infinite source may lead to a fatal {@code OutOfMemoryError}.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure of its downstream consumer and consumes the
Expand Down Expand Up @@ -15092,12 +15112,16 @@ public final <U extends Collection<? super T>> Single<U> toList(Callable<U> coll
}

/**
* Returns a Single that emits a single HashMap containing all items emitted by the source Publisher,
* Returns a Single that emits a single HashMap containing all items emitted by the finite source Publisher,
* mapped by the keys returned by a specified {@code keySelector} function.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/toMap.png" alt="">
* <p>
* If more than one source item maps to the same key, the HashMap will contain the latest of those items.
* <p>
* Note that this operator requires the upstream to signal {@code onComplete} for the accumulated map to
* be emitted. Sources that are infinite and never complete will never emit anything through this
* operator and an infinite source may lead to a fatal {@code OutOfMemoryError}.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in an
Expand All @@ -15123,12 +15147,16 @@ public final <K> Single<Map<K, T>> toMap(final Function<? super T, ? extends K>

/**
* Returns a Single that emits a single HashMap containing values corresponding to items emitted by the
* source Publisher, mapped by the keys returned by a specified {@code keySelector} function.
* finite source Publisher, mapped by the keys returned by a specified {@code keySelector} function.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/toMap.png" alt="">
* <p>
* If more than one source item maps to the same key, the HashMap will contain a single entry that
* corresponds to the latest of those items.
* <p>
* Note that this operator requires the upstream to signal {@code onComplete} for the accumulated map to
* be emitted. Sources that are infinite and never complete will never emit anything through this
* operator and an infinite source may lead to a fatal {@code OutOfMemoryError}.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in an
Expand Down Expand Up @@ -15158,9 +15186,13 @@ public final <K, V> Single<Map<K, V>> toMap(final Function<? super T, ? extends

/**
* Returns a Single that emits a single Map, returned by a specified {@code mapFactory} function, that
* contains keys and values extracted from the items emitted by the source Publisher.
* contains keys and values extracted from the items emitted by the finite source Publisher.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/toMap.png" alt="">
* <p>
* Note that this operator requires the upstream to signal {@code onComplete} for the accumulated map to
* be emitted. Sources that are infinite and never complete will never emit anything through this
* operator and an infinite source may lead to a fatal {@code OutOfMemoryError}.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in an
Expand Down Expand Up @@ -15194,9 +15226,13 @@ public final <K, V> Single<Map<K, V>> toMap(final Function<? super T, ? extends

/**
* Returns a Single that emits a single HashMap that contains an ArrayList of items emitted by the
* source Publisher keyed by a specified {@code keySelector} function.
* finite source Publisher keyed by a specified {@code keySelector} function.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/toMultiMap.png" alt="">
* <p>
* Note that this operator requires the upstream to signal {@code onComplete} for the accumulated map to
* be emitted. Sources that are infinite and never complete will never emit anything through this
* operator and an infinite source may lead to a fatal {@code OutOfMemoryError}.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This operator does not support backpressure as by intent it is requesting and buffering everything.</dd>
Expand All @@ -15223,10 +15259,14 @@ public final <K> Single<Map<K, Collection<T>>> toMultimap(Function<? super T, ?

/**
* Returns a Single that emits a single HashMap that contains an ArrayList of values extracted by a
* specified {@code valueSelector} function from items emitted by the source Publisher, keyed by a
* specified {@code valueSelector} function from items emitted by the finite source Publisher, keyed by a
* specified {@code keySelector} function.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/toMultiMap.png" alt="">
* <p>
* Note that this operator requires the upstream to signal {@code onComplete} for the accumulated map to
* be emitted. Sources that are infinite and never complete will never emit anything through this
* operator and an infinite source may lead to a fatal {@code OutOfMemoryError}.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in an
Expand Down Expand Up @@ -15257,9 +15297,13 @@ public final <K, V> Single<Map<K, Collection<V>>> toMultimap(Function<? super T,
/**
* Returns a Single that emits a single Map, returned by a specified {@code mapFactory} function, that
* contains a custom collection of values, extracted by a specified {@code valueSelector} function from
* items emitted by the source Publisher, and keyed by the {@code keySelector} function.
* items emitted by the finite source Publisher, and keyed by the {@code keySelector} function.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/toMultiMap.png" alt="">
* <p>
* Note that this operator requires the upstream to signal {@code onComplete} for the accumulated map to
* be emitted. Sources that are infinite and never complete will never emit anything through this
* operator and an infinite source may lead to a fatal {@code OutOfMemoryError}.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in an
Expand Down Expand Up @@ -15300,9 +15344,13 @@ public final <K, V> Single<Map<K, Collection<V>>> toMultimap(
/**
* Returns a Single that emits a single Map, returned by a specified {@code mapFactory} function, that
* contains an ArrayList of values, extracted by a specified {@code valueSelector} function from items
* emitted by the source Publisher and keyed by the {@code keySelector} function.
* emitted by the finite source Publisher and keyed by the {@code keySelector} function.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/toMultiMap.png" alt="">
* <p>
* Note that this operator requires the upstream to signal {@code onComplete} for the accumulated map to
* be emitted. Sources that are infinite and never complete will never emit anything through this
* operator and an infinite source may lead to a fatal {@code OutOfMemoryError}.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in an
Expand Down Expand Up @@ -15354,7 +15402,7 @@ public final Observable<T> toObservable() {
}

/**
* Returns a Single that emits a list that contains the items emitted by the source Publisher, in a
* Returns a Single that emits a list that contains the items emitted by the finite source Publisher, in a
* sorted order. Each item emitted by the Publisher must implement {@link Comparable} with respect to all
* other items in the sequence.
*
Expand All @@ -15363,6 +15411,10 @@ public final Observable<T> toObservable() {
* sequence is terminated with a {@link ClassCastException}.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/toSortedList.png" alt="">
* <p>
* Note that this operator requires the upstream to signal {@code onComplete} for the accumulated list to
* be emitted. Sources that are infinite and never complete will never emit anything through this
* operator and an infinite source may lead to a fatal {@code OutOfMemoryError}.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in an
Expand All @@ -15382,10 +15434,14 @@ public final Single<List<T>> toSortedList() {
}

/**
* Returns a Single that emits a list that contains the items emitted by the source Publisher, in a
* Returns a Single that emits a list that contains the items emitted by the finite source Publisher, in a
* sorted order based on a specified comparison function.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/toSortedList.f.png" alt="">
* <p>
* Note that this operator requires the upstream to signal {@code onComplete} for the accumulated list to
* be emitted. Sources that are infinite and never complete will never emit anything through this
* operator and an infinite source may lead to a fatal {@code OutOfMemoryError}.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in an
Expand All @@ -15410,10 +15466,14 @@ public final Single<List<T>> toSortedList(final Comparator<? super T> comparator
}

/**
* Returns a Single that emits a list that contains the items emitted by the source Publisher, in a
* Returns a Single that emits a list that contains the items emitted by the finite source Publisher, in a
* sorted order based on a specified comparison function.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/toSortedList.f.png" alt="">
* <p>
* Note that this operator requires the upstream to signal {@code onComplete} for the accumulated list to
* be emitted. Sources that are infinite and never complete will never emit anything through this
* operator and an infinite source may lead to a fatal {@code OutOfMemoryError}.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in an
Expand Down Expand Up @@ -15441,7 +15501,7 @@ public final Single<List<T>> toSortedList(final Comparator<? super T> comparator
}

/**
* Returns a Flowable that emits a list that contains the items emitted by the source Publisher, in a
* Returns a Flowable that emits a list that contains the items emitted by the finite source Publisher, in a
* sorted order. Each item emitted by the Publisher must implement {@link Comparable} with respect to all
* other items in the sequence.
*
Expand All @@ -15450,6 +15510,10 @@ public final Single<List<T>> toSortedList(final Comparator<? super T> comparator
* sequence is terminated with a {@link ClassCastException}.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/toSortedList.png" alt="">
* <p>
* Note that this operator requires the upstream to signal {@code onComplete} for the accumulated list to
* be emitted. Sources that are infinite and never complete will never emit anything through this
* operator and an infinite source may lead to a fatal {@code OutOfMemoryError}.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in an
Expand Down
Loading

0 comments on commit 444b9a5

Please sign in to comment.