Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Flowable.groupBy add overload with evicting map factory #5860

Merged
merged 1 commit into from
Feb 22, 2018

Conversation

davidmoten
Copy link
Collaborator

As per discussion in #5763, this PR adds an overload for Flowable.groupBy that specifies an evictingMapFactory.

An example of usage taken from the new javadoc:

Function<Consumer<Object>, Map<Integer, Object>> evictingMapFactory = 
   notify ->
       CacheBuilder
         .newBuilder() 
         .maximumSize(3)
         .removalListener(entry -> { 
              try {
                  // emit the value not the key!
                  notify.accept(entry.getValue());
              } catch (Exception e) {
                  throw new RuntimeException(e);
              }
            })
         .<Integer, Object> build()
         .asMap();
         
 // Emit 1000 items but ensure that the
 // internal map never has more than 3 items in it           
 Flowable
   .range(1, 1000)
   // note that number of keys is 10
   .groupBy(x -> x % 10, x-> x, true, 16, evictingMapFactory)
   .flatMap(g -> g)
   .forEach(System.out::println);

Note that I based this operator on the 1.x version which I think may have a bug that goes as far as the signature of that method. The eviction consumer should not be given a key but rather the value from the map. I'll pursue the 1.x issue after dealing with this.

@codecov
Copy link

codecov bot commented Feb 21, 2018

Codecov Report

Merging #5860 into 2.x will increase coverage by 0.04%.
The diff coverage is 100%.

Impacted file tree graph

@@             Coverage Diff              @@
##                2.x    #5860      +/-   ##
============================================
+ Coverage     96.53%   96.58%   +0.04%     
- Complexity     5857     5858       +1     
============================================
  Files           646      646              
  Lines         42608    42640      +32     
  Branches       5906     5910       +4     
============================================
+ Hits          41132    41183      +51     
+ Misses          578      550      -28     
- Partials        898      907       +9
Impacted Files Coverage Δ Complexity Δ
...x/internal/operators/flowable/FlowableGroupBy.java 95.4% <100%> (+0.38%) 3 <2> (+1) ⬆️
src/main/java/io/reactivex/Flowable.java 100% <100%> (ø) 533 <1> (+1) ⬆️
.../io/reactivex/internal/functions/ObjectHelper.java 94.73% <0%> (-5.27%) 20% <0%> (-1%)
...nternal/operators/parallel/ParallelReduceFull.java 91.17% <0%> (-3.93%) 2% <0%> (ø)
.../internal/operators/flowable/FlowableInterval.java 94.44% <0%> (-2.78%) 3% <0%> (ø)
...tivex/internal/schedulers/TrampolineScheduler.java 92.3% <0%> (-2.57%) 6% <0%> (ø)
...ternal/operators/completable/CompletableMerge.java 96.42% <0%> (-2.39%) 2% <0%> (ø)
...ernal/operators/flowable/FlowableFromIterable.java 94.11% <0%> (-2.14%) 5% <0%> (ø)
...nternal/operators/observable/ObservableWindow.java 98% <0%> (-2%) 3% <0%> (ø)
...a/io/reactivex/internal/queue/MpscLinkedQueue.java 98.03% <0%> (-1.97%) 17% <0%> (-1%)
... and 24 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 0f73283...991efdb. Read the comment docs.

@akarnokd akarnokd added this to the 2.2 milestone Feb 21, 2018
}

/**
* Groups the items emitted by a {@code Publisher} according to a specified criterion, and emits these
* grouped items as {@link GroupedFlowable}s. The emitted {@code GroupedPublisher} allows only a single
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GroupedPublisher -> GroupedFlowable

* grouped items as {@link GroupedFlowable}s. The emitted {@code GroupedPublisher} allows only a single
* {@link Subscriber} during its lifetime and if this {@code Subscriber} cancels before the
* source terminates, the next emission by the source having the same key will trigger a new
* {@code GroupedPublisher} emission. The {@code evictingMapFactory} is used to create a map that will
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GroupedPublisher -> GroupedFlowable

*
* <p>A use case for specifying an {@code evictingMapFactory} is where the source is infinite and fast and
* over time the number of keys grows enough to be a concern in terms of the memory footprint of the
* internal hash map containing the {@link GroupedFlowables}.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{@link GroupedFlowable}s

* CacheBuilder
* .newBuilder()
* .maximumSize(3)
* .removalListener(entry -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> -> -&gt;

* <p>
* <em>Note:</em> A {@link GroupedFlowable} will cache the items it is to emit until such time as it
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
* {@code GroupedPublisher}s that do not concern you. Instead, you can signal to them that they may
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GroupedPublisher -> GroupedFlowable

* discard their buffers by applying an operator like {@link #ignoreElements} to them.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>Both the returned and its inner {@code Publisher}s honor backpressure and the source {@code Publisher}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Publisher -> GroupedFlowable

* <dt><b>Backpressure:</b></dt>
* <dd>Both the returned and its inner {@code Publisher}s honor backpressure and the source {@code Publisher}
* is consumed in a bounded mode (i.e., requested a fixed amount upfront and replenished based on
* downstream consumption). Note that both the returned and its inner {@code Publisher}s use
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Publisher -> GroupedFlowable

* @param evictingMapFactory
* The factory used to create a map that will be used by the implementation to hold the
* {@link GroupedFlowable}s. The evicting map created by this factory must notify when a
* value in this map has been evicted. The next source emission will bring about the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How? By calling the provided input Consumer<Object> with something?

this.actual = actual;
this.keySelector = keySelector;
this.valueSelector = valueSelector;
this.bufferSize = bufferSize;
this.delayError = delayError;
this.groups = new ConcurrentHashMap<Object, GroupedUnicast<K, V>>();
if (mapFactory == null) {
this.groups = new ConcurrentHashMap<Object, GroupedUnicast<K, V>>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a fan of this structure. These could be constructor arguments prepared before the GroupBySubscriber gets initialized. So the try-catch will only cover the non-null case only and the use of GroupBySubscriber can look like before:

source.subscribe(new GroupBySubscriber<T, K, V>(s, keySelector, valueSelector, bufferSize, delayError, groups, evictedGroups));

@@ -371,6 +408,27 @@ public boolean isEmpty() {
return queue.isEmpty();
}
}

@SuppressWarnings("unchecked")
private static <K,V> Map<Object, GroupedUnicast<K, V>> createMap(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why this is a separate method. Also please avoid private.

davidmoten added a commit to davidmoten/RxJava that referenced this pull request Feb 21, 2018
@davidmoten
Copy link
Collaborator Author

Thanks @akarnokd. I've addressed your comments and also added @Beta and @since 2.1.11.

* completion of the evicted {@link GroupedFlowable}s. See example using Guava above.
* {@link GroupedFlowable}s. The evicting map created by this factory must
* notify the provided {@code Consumer<Object>} with the entry value (not the key!) when
* an entry in this map has been evicted.. The next source emission will bring about the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two dots.

@@ -9798,10 +9799,13 @@ public final Disposable forEachWhile(final Predicate<? super T> onNext, final Co
* unique key value and each of which emits those items from the source Publisher that share that
* key value
* @see <a href="http://reactivex.io/documentation/operators/groupby.html">ReactiveX operators documentation: GroupBy</a>
*
* @since 2.1.11
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should fit into 2.1.10, assuming @vanniktech or @artem-zinnatullin approves.

Copy link
Collaborator

@vanniktech vanniktech left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good. Let's get this into 2.1.10

@@ -43,6 +43,7 @@ apply plugin: "me.champeau.gradle.jmh"
apply plugin: "com.github.hierynomus.license"
apply plugin: "com.jfrog.bintray"
apply plugin: "com.jfrog.artifactory"
apply plugin: "eclipse"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need the eclipse plugin?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is so I can run ./gradlew eclipse for import into Eclipse without using gradle plugin. It's not required for the general build obviously but is a convenience for me. Can remove if has any negative effects.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works for me.

build.gradle Outdated
@@ -55,7 +56,7 @@ def reactiveStreamsVersion = "1.0.2"
def mockitoVersion = "2.1.0"
def jmhLibVersion = "1.19"
def testNgVersion = "6.11"

def guavaVersion = "19.0"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Latest version is 24.0

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I know but 19.0 will do. I don't plan to keep this test dependency up to date. No point as it is for demonstration purposes only.

*
* <p>The map created by an {@code evictingMapFactory} must be thread-safe.
*
* <p>An example of an {@code evictingMapFactory} using <a href="https://google.github.io/guava/releases/19.0/api/docs/com/google/common/cache/CacheBuilder.html">CacheBuilder</a> from the Guava library is below:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we should reference a newer version link

Copy link
Collaborator Author

@davidmoten davidmoten Feb 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that keen to track this against latest release of guava in the future but can go to v24 if required.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have moved to 24.0

* Flowable
* .range(1, 1000)
* // note that number of keys is 10
* .groupBy(x -&gt; x % 10, x-&gt; x, true, 16, evictingMapFactory)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably want x -&gt; x here to be consistent with the first parameter

Copy link
Collaborator Author

@davidmoten davidmoten Feb 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't understand that ??

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

x-&gt; x becomes x-> x
x -&gt; x becomes x -> x which is then more readable and also consistent with the first parameter

* <p>
* <em>Note:</em> A {@link GroupedFlowable} will cache the items it is to emit until such time as it
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
* {@code GroupedFlowable}s that do not concern you. Instead, you can signal to them that they may
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead, you can signal to them that they may sounds very weird to me. Can we rephrase it somehow?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a legacy comment that occurs in every groupBy overload. Perhaps we could leave rework of that phrase to another PR? I agree that the para is a bit clunky and could probably do with explicit examples like don't filter on GroupedFlowable keys (.filter(g -> g.getKey() > 10)) do blah instead.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then leave as is.

@davidmoten
Copy link
Collaborator Author

I've

  • updated the guava dependency to 24.0-jre
  • updated the CacheBuilder link to point to Guava 24.0 javadoc
  • set the since version to 2.1.10

@davidmoten
Copy link
Collaborator Author

Added that missing space to javadoc (x -> x) and squashed commits. Thanks for the review @vanniktech.

Copy link
Collaborator

@vanniktech vanniktech left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:shipit:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants