Skip to content

Commit

Permalink
address ReactiveX#5860 comments
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmoten committed Feb 21, 2018
1 parent dac4f9a commit 5d9dee5
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 32 deletions.
32 changes: 18 additions & 14 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -9713,19 +9713,19 @@ public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T,

/**
* Groups the items emitted by a {@code Publisher} according to a specified criterion, and emits these
* grouped items as {@link GroupedFlowable}s. The emitted {@code GroupedPublisher} allows only a single
* grouped items as {@link GroupedFlowable}s. The emitted {@code GroupedFlowable} allows only a single
* {@link Subscriber} during its lifetime and if this {@code Subscriber} cancels before the
* source terminates, the next emission by the source having the same key will trigger a new
* {@code GroupedPublisher} emission. The {@code evictingMapFactory} is used to create a map that will
* be used to hold the {@link GroupedFlowable}s by key. The evicting map created by this factory must
* notify when a value in this map has been evicted. The next source emission will bring about the
* completion of the evicted {@link GroupedFlowable}s and the arrival of an item with the same key as a
* completed {@link GroupedFlowable} will prompt the creation and emission of a new {@link GroupedFlowable}
* with that key.
* notify the provided {@code Consumer<Object>} with the entry value (not the key!) when an entry in this
* map has been evicted. The next source emission will bring about the completion of the evicted
* {@link GroupedFlowable}s and the arrival of an item with the same key as a completed {@link GroupedFlowable}
* will prompt the creation and emission of a new {@link GroupedFlowable} with that key.
*
* <p>A use case for specifying an {@code evictingMapFactory} is where the source is infinite and fast and
* over time the number of keys grows enough to be a concern in terms of the memory footprint of the
* internal hash map containing the {@link GroupedFlowables}.
* internal hash map containing the {@link GroupedFlowable}s.
*
* <p>The map created by an {@code evictingMapFactory} must be thread-safe.
*
Expand All @@ -9737,7 +9737,7 @@ public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T,
* CacheBuilder
* .newBuilder()
* .maximumSize(3)
* .removalListener(entry -> {
* .removalListener(entry -&gt; {
* try {
* // emit the value not the key!
* notify.accept(entry.getValue());
Expand All @@ -9747,7 +9747,7 @@ public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T,
* })
* .&lt;Integer, Object&gt; build()
* .asMap();
*
*
* // Emit 1000 items but ensure that the
* // internal map never has more than 3 items in it
* Flowable
Expand All @@ -9763,13 +9763,13 @@ public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T,
* <p>
* <em>Note:</em> A {@link GroupedFlowable} will cache the items it is to emit until such time as it
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
* {@code GroupedPublisher}s that do not concern you. Instead, you can signal to them that they may
* {@code GroupedFlowable}s that do not concern you. Instead, you can signal to them that they may
* discard their buffers by applying an operator like {@link #ignoreElements} to them.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>Both the returned and its inner {@code Publisher}s honor backpressure and the source {@code Publisher}
* <dd>Both the returned and its inner {@code GroupedFlowable}s honor backpressure and the source {@code Publisher}
* is consumed in a bounded mode (i.e., requested a fixed amount upfront and replenished based on
* downstream consumption). Note that both the returned and its inner {@code Publisher}s use
* downstream consumption). Note that both the returned and its inner {@code GroupedFlowable}s use
* unbounded internal buffers and if the source {@code Publisher} doesn't honor backpressure, that <em>may</em>
* lead to {@code OutOfMemoryError}.</dd>
* <dt><b>Scheduler:</b></dt>
Expand All @@ -9787,9 +9787,10 @@ public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T,
* the hint for how many {@link GroupedFlowable}s and element in each {@link GroupedFlowable} should be buffered
* @param evictingMapFactory
* The factory used to create a map that will be used by the implementation to hold the
* {@link GroupedFlowable}s. The evicting map created by this factory must notify when a
* value in this map has been evicted. The next source emission will bring about the
* completion of the evicted {@link GroupedFlowable}s. See example using Guava above.
* {@link GroupedFlowable}s. The evicting map created by this factory must
* notify the provided {@code Consumer<Object>} with the entry value (not the key!) when
* an entry in this map has been evicted.. The next source emission will bring about the
* completion of the evicted {@link GroupedFlowable}s. See example above.
* @param <K>
* the key type
* @param <V>
Expand All @@ -9798,10 +9799,13 @@ public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T,
* unique key value and each of which emits those items from the source Publisher that share that
* key value
* @see <a href="http://reactivex.io/documentation/operators/groupby.html">ReactiveX operators documentation: GroupBy</a>
*
* @since 2.1.11
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@Beta
public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T, ? extends K> keySelector,
Function<? super T, ? extends V> valueSelector,
boolean delayError, int bufferSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,27 @@ public FlowableGroupBy(Flowable<T> source, Function<? super T, ? extends K> keyS

@Override
protected void subscribeActual(Subscriber<? super GroupedFlowable<K, V>> s) {
GroupBySubscriber<T, K, V> subscriber;

final Map<Object, GroupedUnicast<K, V>> groups;
final Queue<GroupedUnicast<K, V>> evictedGroups;

try {
subscriber = new GroupBySubscriber<T, K, V>(s, keySelector, valueSelector, bufferSize, delayError, mapFactory);
if (mapFactory == null) {
evictedGroups = null;
groups = new ConcurrentHashMap<Object, GroupedUnicast<K, V>>();
} else {
evictedGroups = new ConcurrentLinkedQueue<GroupedUnicast<K, V>>();
Consumer<Object> evictionAction = (Consumer<Object>)(Consumer<?>) new EvictionAction<K, V>(evictedGroups);
groups = (Map<Object, GroupedUnicast<K,V>>)(Map<Object, ?>) mapFactory.apply(evictionAction);
}
} catch (Exception e) {
Exceptions.throwIfFatal(e);
s.onSubscribe(EmptyComponent.INSTANCE);
s.onError(e);
return;
}
GroupBySubscriber<T, K, V> subscriber =
new GroupBySubscriber<T, K, V>(s, keySelector, valueSelector, bufferSize, delayError, groups, evictedGroups);
source.subscribe(subscriber);
}

Expand Down Expand Up @@ -96,20 +108,15 @@ public static final class GroupBySubscriber<T, K, V>
boolean outputFused;

public GroupBySubscriber(Subscriber<? super GroupedFlowable<K, V>> actual, Function<? super T, ? extends K> keySelector,
Function<? super T, ? extends V> valueSelector, int bufferSize, boolean delayError,
Function<? super Consumer<Object>, ? extends Map<K, Object>> mapFactory) throws Exception {
Function<? super T, ? extends V> valueSelector, int bufferSize, boolean delayError,
Map<Object, GroupedUnicast<K, V>> groups, Queue<GroupedUnicast<K, V>> evictedGroups) {
this.actual = actual;
this.keySelector = keySelector;
this.valueSelector = valueSelector;
this.bufferSize = bufferSize;
this.delayError = delayError;
if (mapFactory == null) {
this.groups = new ConcurrentHashMap<Object, GroupedUnicast<K, V>>();
this.evictedGroups = null;
} else {
this.evictedGroups = new ConcurrentLinkedQueue<GroupedUnicast<K, V>>();
this.groups = createMap(mapFactory, new EvictionAction<K, V>(evictedGroups));
}
this.groups = groups;
this.evictedGroups = evictedGroups;
this.queue = new SpscLinkedArrayQueue<GroupedFlowable<K, V>>(bufferSize);
}

Expand Down Expand Up @@ -408,13 +415,6 @@ public boolean isEmpty() {
return queue.isEmpty();
}
}

@SuppressWarnings("unchecked")
private static <K,V> Map<Object, GroupedUnicast<K, V>> createMap(
Function<? super Consumer<Object>, ? extends Map<K, Object>> mapFactory,
EvictionAction<K, V> evictionAction) throws Exception {
return (Map<Object, GroupedUnicast<K,V>>)(Map<Object, ?>) mapFactory.apply((Consumer<Object>) (Consumer<?>) evictionAction);
}

static final class EvictionAction<K, V> implements Consumer<GroupedUnicast<K,V>> {

Expand Down

0 comments on commit 5d9dee5

Please sign in to comment.