Skip to content

Commit

Permalink
2.x: groupBy add overload with evicting map factory
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmoten committed Feb 21, 2018
1 parent 0f73283 commit dac4f9a
Show file tree
Hide file tree
Showing 4 changed files with 412 additions and 7 deletions.
4 changes: 3 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ apply plugin: "me.champeau.gradle.jmh"
apply plugin: "com.github.hierynomus.license"
apply plugin: "com.jfrog.bintray"
apply plugin: "com.jfrog.artifactory"
apply plugin: "eclipse"

sourceCompatibility = JavaVersion.VERSION_1_6
targetCompatibility = JavaVersion.VERSION_1_6
Expand All @@ -55,7 +56,7 @@ def reactiveStreamsVersion = "1.0.2"
def mockitoVersion = "2.1.0"
def jmhLibVersion = "1.19"
def testNgVersion = "6.11"

def guavaVersion = "19.0"
// --------------------------------------

repositories {
Expand All @@ -73,6 +74,7 @@ dependencies {

testImplementation "org.reactivestreams:reactive-streams-tck:$reactiveStreamsVersion"
testImplementation "org.testng:testng:$testNgVersion"
testImplementation "com.google.guava:guava:$guavaVersion"
}

javadoc {
Expand Down
105 changes: 104 additions & 1 deletion src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -9708,9 +9708,112 @@ public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T,
ObjectHelper.requireNonNull(valueSelector, "valueSelector is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");

return RxJavaPlugins.onAssembly(new FlowableGroupBy<T, K, V>(this, keySelector, valueSelector, bufferSize, delayError));
return RxJavaPlugins.onAssembly(new FlowableGroupBy<T, K, V>(this, keySelector, valueSelector, bufferSize, delayError, null));
}

/**
* Groups the items emitted by a {@code Publisher} according to a specified criterion, and emits these
* grouped items as {@link GroupedFlowable}s. The emitted {@code GroupedPublisher} allows only a single
* {@link Subscriber} during its lifetime and if this {@code Subscriber} cancels before the
* source terminates, the next emission by the source having the same key will trigger a new
* {@code GroupedPublisher} emission. The {@code evictingMapFactory} is used to create a map that will
* be used to hold the {@link GroupedFlowable}s by key. The evicting map created by this factory must
* notify when a value in this map has been evicted. The next source emission will bring about the
* completion of the evicted {@link GroupedFlowable}s and the arrival of an item with the same key as a
* completed {@link GroupedFlowable} will prompt the creation and emission of a new {@link GroupedFlowable}
* with that key.
*
* <p>A use case for specifying an {@code evictingMapFactory} is where the source is infinite and fast and
* over time the number of keys grows enough to be a concern in terms of the memory footprint of the
* internal hash map containing the {@link GroupedFlowables}.
*
* <p>The map created by an {@code evictingMapFactory} must be thread-safe.
*
* <p>An example of an {@code evictingMapFactory} using <a href="https://google.github.io/guava/releases/19.0/api/docs/com/google/common/cache/CacheBuilder.html">CacheBuilder</a> from the Guava library is below:
*
* <pre>
* Function&lt;Consumer&lt;Object&gt;, Map&lt;Integer, Object&gt;&gt; evictingMapFactory =
* notify -&gt;
* CacheBuilder
* .newBuilder()
* .maximumSize(3)
* .removalListener(entry -> {
* try {
* // emit the value not the key!
* notify.accept(entry.getValue());
* } catch (Exception e) {
* throw new RuntimeException(e);
* }
* })
* .&lt;Integer, Object&gt; build()
* .asMap();
*
* // Emit 1000 items but ensure that the
* // internal map never has more than 3 items in it
* Flowable
* .range(1, 1000)
* // note that number of keys is 10
* .groupBy(x -&gt; x % 10, x-&gt; x, true, 16, evictingMapFactory)
* .flatMap(g -&gt; g)
* .forEach(System.out::println);
* </pre>
*
* <p>
* <img width="640" height="360" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/groupBy.png" alt="">
* <p>
* <em>Note:</em> A {@link GroupedFlowable} will cache the items it is to emit until such time as it
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
* {@code GroupedPublisher}s that do not concern you. Instead, you can signal to them that they may
* discard their buffers by applying an operator like {@link #ignoreElements} to them.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>Both the returned and its inner {@code Publisher}s honor backpressure and the source {@code Publisher}
* is consumed in a bounded mode (i.e., requested a fixed amount upfront and replenished based on
* downstream consumption). Note that both the returned and its inner {@code Publisher}s use
* unbounded internal buffers and if the source {@code Publisher} doesn't honor backpressure, that <em>may</em>
* lead to {@code OutOfMemoryError}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param keySelector
* a function that extracts the key for each item
* @param valueSelector
* a function that extracts the return element for each item
* @param delayError
* if true, the exception from the current Flowable is delayed in each group until that specific group emitted
* the normal values; if false, the exception bypasses values in the groups and is reported immediately.
* @param bufferSize
* the hint for how many {@link GroupedFlowable}s and element in each {@link GroupedFlowable} should be buffered
* @param evictingMapFactory
* The factory used to create a map that will be used by the implementation to hold the
* {@link GroupedFlowable}s. The evicting map created by this factory must notify when a
* value in this map has been evicted. The next source emission will bring about the
* completion of the evicted {@link GroupedFlowable}s. See example using Guava above.
* @param <K>
* the key type
* @param <V>
* the element type
* @return a {@code Publisher} that emits {@link GroupedFlowable}s, each of which corresponds to a
* unique key value and each of which emits those items from the source Publisher that share that
* key value
* @see <a href="http://reactivex.io/documentation/operators/groupby.html">ReactiveX operators documentation: GroupBy</a>
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T, ? extends K> keySelector,
Function<? super T, ? extends V> valueSelector,
boolean delayError, int bufferSize,
Function<? super Consumer<Object>, ? extends Map<K, Object>> evictingMapFactory) {
ObjectHelper.requireNonNull(keySelector, "keySelector is null");
ObjectHelper.requireNonNull(valueSelector, "valueSelector is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
ObjectHelper.requireNonNull(evictingMapFactory, "evictingMapFactory is null");

return RxJavaPlugins.onAssembly(new FlowableGroupBy<T, K, V>(this, keySelector, valueSelector, bufferSize, delayError, evictingMapFactory));
}

/**
* Returns a Flowable that correlates two Publishers when they overlap in time and groups the results.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
package io.reactivex.internal.operators.flowable;

import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.*;

import org.reactivestreams.*;
Expand All @@ -23,30 +25,44 @@
import io.reactivex.annotations.Nullable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.flowables.GroupedFlowable;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
import io.reactivex.internal.subscriptions.*;
import io.reactivex.internal.util.BackpressureHelper;
import io.reactivex.internal.util.EmptyComponent;
import io.reactivex.plugins.RxJavaPlugins;

public final class FlowableGroupBy<T, K, V> extends AbstractFlowableWithUpstream<T, GroupedFlowable<K, V>> {
final Function<? super T, ? extends K> keySelector;
final Function<? super T, ? extends V> valueSelector;
final int bufferSize;
final boolean delayError;
final Function<? super Consumer<Object>, ? extends Map<K, Object>> mapFactory;

public FlowableGroupBy(Flowable<T> source, Function<? super T, ? extends K> keySelector, Function<? super T, ? extends V> valueSelector, int bufferSize, boolean delayError) {
public FlowableGroupBy(Flowable<T> source, Function<? super T, ? extends K> keySelector, Function<? super T, ? extends V> valueSelector,
int bufferSize, boolean delayError, Function<? super Consumer<Object>, ? extends Map<K, Object>> mapFactory) {
super(source);
this.keySelector = keySelector;
this.valueSelector = valueSelector;
this.bufferSize = bufferSize;
this.delayError = delayError;
this.mapFactory = mapFactory;
}

@Override
protected void subscribeActual(Subscriber<? super GroupedFlowable<K, V>> s) {
source.subscribe(new GroupBySubscriber<T, K, V>(s, keySelector, valueSelector, bufferSize, delayError));
GroupBySubscriber<T, K, V> subscriber;
try {
subscriber = new GroupBySubscriber<T, K, V>(s, keySelector, valueSelector, bufferSize, delayError, mapFactory);
} catch (Exception e) {
Exceptions.throwIfFatal(e);
s.onSubscribe(EmptyComponent.INSTANCE);
s.onError(e);
return;
}
source.subscribe(subscriber);
}

public static final class GroupBySubscriber<T, K, V>
Expand All @@ -62,6 +78,7 @@ public static final class GroupBySubscriber<T, K, V>
final boolean delayError;
final Map<Object, GroupedUnicast<K, V>> groups;
final SpscLinkedArrayQueue<GroupedFlowable<K, V>> queue;
final Queue<GroupedUnicast<K, V>> evictedGroups;

static final Object NULL_KEY = new Object();

Expand All @@ -78,13 +95,21 @@ public static final class GroupBySubscriber<T, K, V>

boolean outputFused;

public GroupBySubscriber(Subscriber<? super GroupedFlowable<K, V>> actual, Function<? super T, ? extends K> keySelector, Function<? super T, ? extends V> valueSelector, int bufferSize, boolean delayError) {
public GroupBySubscriber(Subscriber<? super GroupedFlowable<K, V>> actual, Function<? super T, ? extends K> keySelector,
Function<? super T, ? extends V> valueSelector, int bufferSize, boolean delayError,
Function<? super Consumer<Object>, ? extends Map<K, Object>> mapFactory) throws Exception {
this.actual = actual;
this.keySelector = keySelector;
this.valueSelector = valueSelector;
this.bufferSize = bufferSize;
this.delayError = delayError;
this.groups = new ConcurrentHashMap<Object, GroupedUnicast<K, V>>();
if (mapFactory == null) {
this.groups = new ConcurrentHashMap<Object, GroupedUnicast<K, V>>();
this.evictedGroups = null;
} else {
this.evictedGroups = new ConcurrentLinkedQueue<GroupedUnicast<K, V>>();
this.groups = createMap(mapFactory, new EvictionAction<K, V>(evictedGroups));
}
this.queue = new SpscLinkedArrayQueue<GroupedFlowable<K, V>>(bufferSize);
}

Expand Down Expand Up @@ -144,6 +169,13 @@ public void onNext(T t) {
}

group.onNext(v);

if (evictedGroups != null) {
GroupedUnicast<K, V> evictedGroup;
while ((evictedGroup = evictedGroups.poll()) != null) {
evictedGroup.onComplete();
}
}

if (newGroup) {
q.offer(group);
Expand All @@ -161,7 +193,9 @@ public void onError(Throwable t) {
g.onError(t);
}
groups.clear();

if (evictedGroups != null) {
evictedGroups.clear();
}
error = t;
done = true;
drain();
Expand All @@ -174,6 +208,9 @@ public void onComplete() {
g.onComplete();
}
groups.clear();
if (evictedGroups != null) {
evictedGroups.clear();
}
done = true;
drain();
}
Expand Down Expand Up @@ -371,6 +408,27 @@ public boolean isEmpty() {
return queue.isEmpty();
}
}

@SuppressWarnings("unchecked")
private static <K,V> Map<Object, GroupedUnicast<K, V>> createMap(
Function<? super Consumer<Object>, ? extends Map<K, Object>> mapFactory,
EvictionAction<K, V> evictionAction) throws Exception {
return (Map<Object, GroupedUnicast<K,V>>)(Map<Object, ?>) mapFactory.apply((Consumer<Object>) (Consumer<?>) evictionAction);
}

static final class EvictionAction<K, V> implements Consumer<GroupedUnicast<K,V>> {

final Queue<GroupedUnicast<K, V>> evictedGroups;

EvictionAction(Queue<GroupedUnicast<K, V>> evictedGroups) {
this.evictedGroups = evictedGroups;
}

@Override
public void accept(GroupedUnicast<K,V> value) {
evictedGroups.offer(value);
}
}

static final class GroupedUnicast<K, T> extends GroupedFlowable<K, T> {

Expand Down
Loading

0 comments on commit dac4f9a

Please sign in to comment.