From d6345dc9283ade1d28afa3507b769108f5929dd0 Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Thu, 22 Feb 2018 23:24:20 +1100 Subject: [PATCH] 2.x: groupBy add overload with evicting map factory (#5860) --- build.gradle | 4 +- src/main/java/io/reactivex/Flowable.java | 109 +++++++- .../operators/flowable/FlowableGroupBy.java | 68 ++++- .../flowable/FlowableGroupByTest.java | 242 ++++++++++++++++++ 4 files changed, 416 insertions(+), 7 deletions(-) diff --git a/build.gradle b/build.gradle index 8cafd79d9d..ce96b1baef 100644 --- a/build.gradle +++ b/build.gradle @@ -43,6 +43,7 @@ apply plugin: "me.champeau.gradle.jmh" apply plugin: "com.github.hierynomus.license" apply plugin: "com.jfrog.bintray" apply plugin: "com.jfrog.artifactory" +apply plugin: "eclipse" sourceCompatibility = JavaVersion.VERSION_1_6 targetCompatibility = JavaVersion.VERSION_1_6 @@ -55,7 +56,7 @@ def reactiveStreamsVersion = "1.0.2" def mockitoVersion = "2.1.0" def jmhLibVersion = "1.19" def testNgVersion = "6.11" - +def guavaVersion = "24.0-jre" // -------------------------------------- repositories { @@ -73,6 +74,7 @@ dependencies { testImplementation "org.reactivestreams:reactive-streams-tck:$reactiveStreamsVersion" testImplementation "org.testng:testng:$testNgVersion" + testImplementation "com.google.guava:guava:$guavaVersion" } javadoc { diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index a80732ef0d..195577d526 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -9708,9 +9708,116 @@ public final Flowable> groupBy(Function(this, keySelector, valueSelector, bufferSize, delayError)); + return RxJavaPlugins.onAssembly(new FlowableGroupBy(this, keySelector, valueSelector, bufferSize, delayError, null)); } + /** + * Groups the items emitted by a {@code Publisher} according to a specified criterion, and emits these + * grouped items as {@link GroupedFlowable}s. The emitted {@code GroupedFlowable} allows only a single + * {@link Subscriber} during its lifetime and if this {@code Subscriber} cancels before the + * source terminates, the next emission by the source having the same key will trigger a new + * {@code GroupedPublisher} emission. The {@code evictingMapFactory} is used to create a map that will + * be used to hold the {@link GroupedFlowable}s by key. The evicting map created by this factory must + * notify the provided {@code Consumer} with the entry value (not the key!) when an entry in this + * map has been evicted. The next source emission will bring about the completion of the evicted + * {@link GroupedFlowable}s and the arrival of an item with the same key as a completed {@link GroupedFlowable} + * will prompt the creation and emission of a new {@link GroupedFlowable} with that key. + * + *

A use case for specifying an {@code evictingMapFactory} is where the source is infinite and fast and + * over time the number of keys grows enough to be a concern in terms of the memory footprint of the + * internal hash map containing the {@link GroupedFlowable}s. + * + *

The map created by an {@code evictingMapFactory} must be thread-safe. + * + *

An example of an {@code evictingMapFactory} using CacheBuilder from the Guava library is below: + * + *

+     * Function<Consumer<Object>, Map<Integer, Object>> evictingMapFactory = 
+     *   notify ->
+     *       CacheBuilder
+     *         .newBuilder() 
+     *         .maximumSize(3)
+     *         .removalListener(entry -> {
+     *              try {
+     *                  // emit the value not the key!
+     *                  notify.accept(entry.getValue());
+     *              } catch (Exception e) {
+     *                  throw new RuntimeException(e);
+     *              }
+     *            })
+     *         .<Integer, Object> build()
+     *         .asMap();
+     *
+     * // Emit 1000 items but ensure that the
+     * // internal map never has more than 3 items in it           
+     * Flowable
+     *   .range(1, 1000)
+     *   // note that number of keys is 10
+     *   .groupBy(x -> x % 10, x -> x, true, 16, evictingMapFactory)
+     *   .flatMap(g -> g)
+     *   .forEach(System.out::println);
+     * 
+ * + *

+ * + *

+ * Note: A {@link GroupedFlowable} will cache the items it is to emit until such time as it + * is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those + * {@code GroupedFlowable}s that do not concern you. Instead, you can signal to them that they may + * discard their buffers by applying an operator like {@link #ignoreElements} to them. + *

+ *
Backpressure:
+ *
Both the returned and its inner {@code GroupedFlowable}s honor backpressure and the source {@code Publisher} + * is consumed in a bounded mode (i.e., requested a fixed amount upfront and replenished based on + * downstream consumption). Note that both the returned and its inner {@code GroupedFlowable}s use + * unbounded internal buffers and if the source {@code Publisher} doesn't honor backpressure, that may + * lead to {@code OutOfMemoryError}.
+ *
Scheduler:
+ *
{@code groupBy} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param keySelector + * a function that extracts the key for each item + * @param valueSelector + * a function that extracts the return element for each item + * @param delayError + * if true, the exception from the current Flowable is delayed in each group until that specific group emitted + * the normal values; if false, the exception bypasses values in the groups and is reported immediately. + * @param bufferSize + * the hint for how many {@link GroupedFlowable}s and element in each {@link GroupedFlowable} should be buffered + * @param evictingMapFactory + * The factory used to create a map that will be used by the implementation to hold the + * {@link GroupedFlowable}s. The evicting map created by this factory must + * notify the provided {@code Consumer} with the entry value (not the key!) when + * an entry in this map has been evicted. The next source emission will bring about the + * completion of the evicted {@link GroupedFlowable}s. See example above. + * @param + * the key type + * @param + * the element type + * @return a {@code Publisher} that emits {@link GroupedFlowable}s, each of which corresponds to a + * unique key value and each of which emits those items from the source Publisher that share that + * key value + * @see ReactiveX operators documentation: GroupBy + * + * @since 2.1.10 + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @Beta + public final Flowable> groupBy(Function keySelector, + Function valueSelector, + boolean delayError, int bufferSize, + Function, ? extends Map> evictingMapFactory) { + ObjectHelper.requireNonNull(keySelector, "keySelector is null"); + ObjectHelper.requireNonNull(valueSelector, "valueSelector is null"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); + ObjectHelper.requireNonNull(evictingMapFactory, "evictingMapFactory is null"); + + return RxJavaPlugins.onAssembly(new FlowableGroupBy(this, keySelector, valueSelector, bufferSize, delayError, evictingMapFactory)); + } + /** * Returns a Flowable that correlates two Publishers when they overlap in time and groups the results. *

diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java index a89a47ff72..15dc85ebd9 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java @@ -14,7 +14,9 @@ package io.reactivex.internal.operators.flowable; import java.util.Map; +import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.*; import org.reactivestreams.*; @@ -23,11 +25,13 @@ import io.reactivex.annotations.Nullable; import io.reactivex.exceptions.Exceptions; import io.reactivex.flowables.GroupedFlowable; +import io.reactivex.functions.Consumer; import io.reactivex.functions.Function; import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.queue.SpscLinkedArrayQueue; import io.reactivex.internal.subscriptions.*; import io.reactivex.internal.util.BackpressureHelper; +import io.reactivex.internal.util.EmptyComponent; import io.reactivex.plugins.RxJavaPlugins; public final class FlowableGroupBy extends AbstractFlowableWithUpstream> { @@ -35,18 +39,42 @@ public final class FlowableGroupBy extends AbstractFlowableWithUpstream final Function valueSelector; final int bufferSize; final boolean delayError; + final Function, ? extends Map> mapFactory; - public FlowableGroupBy(Flowable source, Function keySelector, Function valueSelector, int bufferSize, boolean delayError) { + public FlowableGroupBy(Flowable source, Function keySelector, Function valueSelector, + int bufferSize, boolean delayError, Function, ? extends Map> mapFactory) { super(source); this.keySelector = keySelector; this.valueSelector = valueSelector; this.bufferSize = bufferSize; this.delayError = delayError; + this.mapFactory = mapFactory; } @Override protected void subscribeActual(Subscriber> s) { - source.subscribe(new GroupBySubscriber(s, keySelector, valueSelector, bufferSize, delayError)); + + final Map> groups; + final Queue> evictedGroups; + + try { + if (mapFactory == null) { + evictedGroups = null; + groups = new ConcurrentHashMap>(); + } else { + evictedGroups = new ConcurrentLinkedQueue>(); + Consumer evictionAction = (Consumer)(Consumer) new EvictionAction(evictedGroups); + groups = (Map>)(Map) mapFactory.apply(evictionAction); + } + } catch (Exception e) { + Exceptions.throwIfFatal(e); + s.onSubscribe(EmptyComponent.INSTANCE); + s.onError(e); + return; + } + GroupBySubscriber subscriber = + new GroupBySubscriber(s, keySelector, valueSelector, bufferSize, delayError, groups, evictedGroups); + source.subscribe(subscriber); } public static final class GroupBySubscriber @@ -62,6 +90,7 @@ public static final class GroupBySubscriber final boolean delayError; final Map> groups; final SpscLinkedArrayQueue> queue; + final Queue> evictedGroups; static final Object NULL_KEY = new Object(); @@ -78,13 +107,16 @@ public static final class GroupBySubscriber boolean outputFused; - public GroupBySubscriber(Subscriber> actual, Function keySelector, Function valueSelector, int bufferSize, boolean delayError) { + public GroupBySubscriber(Subscriber> actual, Function keySelector, + Function valueSelector, int bufferSize, boolean delayError, + Map> groups, Queue> evictedGroups) { this.actual = actual; this.keySelector = keySelector; this.valueSelector = valueSelector; this.bufferSize = bufferSize; this.delayError = delayError; - this.groups = new ConcurrentHashMap>(); + this.groups = groups; + this.evictedGroups = evictedGroups; this.queue = new SpscLinkedArrayQueue>(bufferSize); } @@ -144,6 +176,13 @@ public void onNext(T t) { } group.onNext(v); + + if (evictedGroups != null) { + GroupedUnicast evictedGroup; + while ((evictedGroup = evictedGroups.poll()) != null) { + evictedGroup.onComplete(); + } + } if (newGroup) { q.offer(group); @@ -161,7 +200,9 @@ public void onError(Throwable t) { g.onError(t); } groups.clear(); - + if (evictedGroups != null) { + evictedGroups.clear(); + } error = t; done = true; drain(); @@ -174,6 +215,9 @@ public void onComplete() { g.onComplete(); } groups.clear(); + if (evictedGroups != null) { + evictedGroups.clear(); + } done = true; drain(); } @@ -372,6 +416,20 @@ public boolean isEmpty() { } } + static final class EvictionAction implements Consumer> { + + final Queue> evictedGroups; + + EvictionAction(Queue> evictedGroups) { + this.evictedGroups = evictedGroups; + } + + @Override + public void accept(GroupedUnicast value) { + evictedGroups.offer(value); + } + } + static final class GroupedUnicast extends GroupedFlowable { final State state; diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableGroupByTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableGroupByTest.java index afbe8e1108..e5497193cc 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableGroupByTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableGroupByTest.java @@ -17,14 +17,20 @@ import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; +import java.io.IOException; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import io.reactivex.subjects.PublishSubject; import org.junit.Test; import org.mockito.Mockito; import org.reactivestreams.*; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; + import io.reactivex.*; import io.reactivex.exceptions.TestException; import io.reactivex.flowables.GroupedFlowable; @@ -1842,4 +1848,240 @@ public Publisher apply(GroupedFlowable g) throws Exce .test() .assertResult(1); } + + @Test + public void mapFactoryThrows() { + final IOException ex = new IOException("boo"); + Function, Map> evictingMapFactory = // + new Function, Map>() { + + @Override + public Map apply(final Consumer notify) throws Exception { + throw ex; + } + }; + Flowable.just(1) + .groupBy(Functions.identity(), Functions.identity(), true, 16, evictingMapFactory) + .test() + .assertNoValues() + .assertError(ex); + } + + @Test + public void mapFactoryExpiryCompletesGroupedFlowable() { + final List completed = new CopyOnWriteArrayList(); + Function, Map> evictingMapFactory = createEvictingMapFactorySynchronousOnly(1); + PublishSubject subject = PublishSubject.create(); + TestSubscriber ts = subject.toFlowable(BackpressureStrategy.BUFFER) + .groupBy(Functions.identity(), Functions.identity(), true, 16, evictingMapFactory) + .flatMap(addCompletedKey(completed)) + .test(); + subject.onNext(1); + subject.onNext(2); + subject.onNext(3); + ts.assertValues(1, 2, 3) + .assertNotTerminated(); + assertEquals(Arrays.asList(1, 2), completed); + //ensure coverage of the code that clears the evicted queue + subject.onComplete(); + ts.assertComplete(); + ts.assertValueCount(3); + } + + private static final Function mod5 = new Function() { + + @Override + public Integer apply(Integer n) throws Exception { + return n % 5; + } + }; + + @Test + public void mapFactoryWithExpiringGuavaCacheDemonstrationCodeForUseInJavadoc() { + //javadoc will be a version of this using lambdas and without assertions + final List completed = new CopyOnWriteArrayList(); + //size should be less than 5 to notice the effect + Function, Map> evictingMapFactory = createEvictingMapFactoryGuava(3); + int numValues = 1000; + TestSubscriber ts = + Flowable.range(1, numValues) + .groupBy(mod5, Functions.identity(), true, 16, evictingMapFactory) + .flatMap(addCompletedKey(completed)) + .test() + .assertComplete(); + ts.assertValueCount(numValues); + //the exact eviction behaviour of the guava cache is not specified so we make some approximate tests + assertTrue(completed.size() > numValues *0.9); + } + + @Test + public void mapFactoryEvictionQueueClearedOnErrorCoverageOnly() { + Function, Map> evictingMapFactory = createEvictingMapFactorySynchronousOnly(1); + PublishSubject subject = PublishSubject.create(); + TestSubscriber ts = subject + .toFlowable(BackpressureStrategy.BUFFER) + .groupBy(Functions.identity(), Functions.identity(), true, 16, evictingMapFactory) + .flatMap(new Function, Publisher>() { + @Override + public Publisher apply(GroupedFlowable g) throws Exception { + return g; + } + }) + .test(); + RuntimeException ex = new RuntimeException(); + //ensure coverage of the code that clears the evicted queue + subject.onError(ex); + ts.assertNoValues() + .assertError(ex); + } + + private static Function, Publisher> addCompletedKey( + final List completed) { + return new Function, Publisher>() { + @Override + public Publisher apply(final GroupedFlowable g) throws Exception { + return g.doOnComplete(new Action() { + @Override + public void run() throws Exception { + completed.add(g.getKey()); + } + }); + } + }; + } + + //not thread safe + private static final class SingleThreadEvictingHashMap implements Map { + + private final List list = new ArrayList(); + private final Map map = new HashMap(); + private final int maxSize; + private final Consumer evictedListener; + + SingleThreadEvictingHashMap(int maxSize, Consumer evictedListener) { + this.maxSize = maxSize; + this.evictedListener = evictedListener; + } + + @Override + public int size() { + return map.size(); + } + + @Override + public boolean isEmpty() { + return map.isEmpty(); + } + + @Override + public boolean containsKey(Object key) { + return map.containsKey(key); + } + + @Override + public boolean containsValue(Object value) { + return map.containsValue(value); + } + + @Override + public V get(Object key) { + return map.get(key); + } + + @Override + public V put(K key, V value) { + list.remove(key); + V v; + if (maxSize > 0 && list.size() == maxSize) { + //remove first + K k = list.get(0); + list.remove(0); + v = map.get(k); + } else { + v = null; + } + list.add(key); + V result = map.put(key, value); + if (v != null) { + try { + evictedListener.accept(v); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + return result; + } + + @Override + public V remove(Object key) { + return map.remove(key); + } + + @Override + public void putAll(Map m) { + map.putAll(m); + } + + @Override + public void clear() { + map.clear(); + } + + @Override + public Set keySet() { + return map.keySet(); + } + + @Override + public Collection values() { + return map.values(); + } + + @Override + public Set> entrySet() { + return map.entrySet(); + } + } + + private static Function, Map> createEvictingMapFactoryGuava(final int maxSize) { + Function, Map> evictingMapFactory = // + new Function, Map>(){ + + @Override + public Map apply(final Consumer notify) throws Exception { + return CacheBuilder.newBuilder() // + .maximumSize(maxSize) // + .removalListener(new RemovalListener() { + @Override + public void onRemoval(RemovalNotification notification) { + try { + notify.accept(notification.getValue()); + } catch (Exception e) { + throw new RuntimeException(e); + } + }}) + . build() + .asMap(); + }}; + return evictingMapFactory; + } + + private static Function, Map> createEvictingMapFactorySynchronousOnly(final int maxSize) { + Function, Map> evictingMapFactory = // + new Function, Map>(){ + + @Override + public Map apply(final Consumer notify) throws Exception { + return new SingleThreadEvictingHashMap(maxSize, new Consumer() { + @Override + public void accept(Object object) { + try { + notify.accept(object); + } catch (Exception e) { + throw new RuntimeException(e); + } + }}); + }}; + return evictingMapFactory; + } }