diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 36d48b46fce..3639227a9d1 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -9713,19 +9713,19 @@ public final Flowable> groupBy(Function} with the entry value (not the key!) when an entry in this + * map has been evicted. The next source emission will bring about the completion of the evicted + * {@link GroupedFlowable}s and the arrival of an item with the same key as a completed {@link GroupedFlowable} + * will prompt the creation and emission of a new {@link GroupedFlowable} with that key. * *

A use case for specifying an {@code evictingMapFactory} is where the source is infinite and fast and * over time the number of keys grows enough to be a concern in terms of the memory footprint of the - * internal hash map containing the {@link GroupedFlowables}. + * internal hash map containing the {@link GroupedFlowable}s. * *

The map created by an {@code evictingMapFactory} must be thread-safe. * @@ -9737,7 +9737,7 @@ public final Flowable> groupBy(Function { + * .removalListener(entry -> { * try { * // emit the value not the key! * notify.accept(entry.getValue()); @@ -9747,7 +9747,7 @@ public final Flowable> groupBy(Function Flowable> groupBy(Function * Note: A {@link GroupedFlowable} will cache the items it is to emit until such time as it * is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those - * {@code GroupedPublisher}s that do not concern you. Instead, you can signal to them that they may + * {@code GroupedFlowable}s that do not concern you. Instead, you can signal to them that they may * discard their buffers by applying an operator like {@link #ignoreElements} to them. *

*
Backpressure:
- *
Both the returned and its inner {@code Publisher}s honor backpressure and the source {@code Publisher} + *
Both the returned and its inner {@code GroupedFlowable}s honor backpressure and the source {@code Publisher} * is consumed in a bounded mode (i.e., requested a fixed amount upfront and replenished based on - * downstream consumption). Note that both the returned and its inner {@code Publisher}s use + * downstream consumption). Note that both the returned and its inner {@code GroupedFlowable}s use * unbounded internal buffers and if the source {@code Publisher} doesn't honor backpressure, that may * lead to {@code OutOfMemoryError}.
*
Scheduler:
@@ -9787,9 +9787,10 @@ public final Flowable> groupBy(Function} with the entry value (not the key!) when + * an entry in this map has been evicted.. The next source emission will bring about the + * completion of the evicted {@link GroupedFlowable}s. See example above. * @param * the key type * @param @@ -9798,10 +9799,13 @@ public final Flowable> groupBy(FunctionReactiveX operators documentation: GroupBy + * + * @since 2.1.11 */ @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) + @Beta public final Flowable> groupBy(Function keySelector, Function valueSelector, boolean delayError, int bufferSize, diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java index 722853f41b4..15dc85ebd90 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java @@ -53,15 +53,27 @@ public FlowableGroupBy(Flowable source, Function keyS @Override protected void subscribeActual(Subscriber> s) { - GroupBySubscriber subscriber; + + final Map> groups; + final Queue> evictedGroups; + try { - subscriber = new GroupBySubscriber(s, keySelector, valueSelector, bufferSize, delayError, mapFactory); + if (mapFactory == null) { + evictedGroups = null; + groups = new ConcurrentHashMap>(); + } else { + evictedGroups = new ConcurrentLinkedQueue>(); + Consumer evictionAction = (Consumer)(Consumer) new EvictionAction(evictedGroups); + groups = (Map>)(Map) mapFactory.apply(evictionAction); + } } catch (Exception e) { Exceptions.throwIfFatal(e); s.onSubscribe(EmptyComponent.INSTANCE); s.onError(e); return; } + GroupBySubscriber subscriber = + new GroupBySubscriber(s, keySelector, valueSelector, bufferSize, delayError, groups, evictedGroups); source.subscribe(subscriber); } @@ -96,20 +108,15 @@ public static final class GroupBySubscriber boolean outputFused; public GroupBySubscriber(Subscriber> actual, Function keySelector, - Function valueSelector, int bufferSize, boolean delayError, - Function, ? extends Map> mapFactory) throws Exception { + Function valueSelector, int bufferSize, boolean delayError, + Map> groups, Queue> evictedGroups) { this.actual = actual; this.keySelector = keySelector; this.valueSelector = valueSelector; this.bufferSize = bufferSize; this.delayError = delayError; - if (mapFactory == null) { - this.groups = new ConcurrentHashMap>(); - this.evictedGroups = null; - } else { - this.evictedGroups = new ConcurrentLinkedQueue>(); - this.groups = createMap(mapFactory, new EvictionAction(evictedGroups)); - } + this.groups = groups; + this.evictedGroups = evictedGroups; this.queue = new SpscLinkedArrayQueue>(bufferSize); } @@ -408,13 +415,6 @@ public boolean isEmpty() { return queue.isEmpty(); } } - - @SuppressWarnings("unchecked") - private static Map> createMap( - Function, ? extends Map> mapFactory, - EvictionAction evictionAction) throws Exception { - return (Map>)(Map) mapFactory.apply((Consumer) (Consumer) evictionAction); - } static final class EvictionAction implements Consumer> {