-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
2.x: Flowable.groupBy add overload with evicting map factory #5860
Conversation
Codecov Report
@@ Coverage Diff @@
## 2.x #5860 +/- ##
============================================
+ Coverage 96.53% 96.58% +0.04%
- Complexity 5857 5858 +1
============================================
Files 646 646
Lines 42608 42640 +32
Branches 5906 5910 +4
============================================
+ Hits 41132 41183 +51
+ Misses 578 550 -28
- Partials 898 907 +9
Continue to review full report at Codecov.
|
} | ||
|
||
/** | ||
* Groups the items emitted by a {@code Publisher} according to a specified criterion, and emits these | ||
* grouped items as {@link GroupedFlowable}s. The emitted {@code GroupedPublisher} allows only a single |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GroupedPublisher
-> GroupedFlowable
* grouped items as {@link GroupedFlowable}s. The emitted {@code GroupedPublisher} allows only a single | ||
* {@link Subscriber} during its lifetime and if this {@code Subscriber} cancels before the | ||
* source terminates, the next emission by the source having the same key will trigger a new | ||
* {@code GroupedPublisher} emission. The {@code evictingMapFactory} is used to create a map that will |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GroupedPublisher
-> GroupedFlowable
* | ||
* <p>A use case for specifying an {@code evictingMapFactory} is where the source is infinite and fast and | ||
* over time the number of keys grows enough to be a concern in terms of the memory footprint of the | ||
* internal hash map containing the {@link GroupedFlowables}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
{@link GroupedFlowable}s
* CacheBuilder | ||
* .newBuilder() | ||
* .maximumSize(3) | ||
* .removalListener(entry -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
->
-> ->
* <p> | ||
* <em>Note:</em> A {@link GroupedFlowable} will cache the items it is to emit until such time as it | ||
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those | ||
* {@code GroupedPublisher}s that do not concern you. Instead, you can signal to them that they may |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GroupedPublisher
-> GroupedFlowable
* discard their buffers by applying an operator like {@link #ignoreElements} to them. | ||
* <dl> | ||
* <dt><b>Backpressure:</b></dt> | ||
* <dd>Both the returned and its inner {@code Publisher}s honor backpressure and the source {@code Publisher} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Publisher
-> GroupedFlowable
* <dt><b>Backpressure:</b></dt> | ||
* <dd>Both the returned and its inner {@code Publisher}s honor backpressure and the source {@code Publisher} | ||
* is consumed in a bounded mode (i.e., requested a fixed amount upfront and replenished based on | ||
* downstream consumption). Note that both the returned and its inner {@code Publisher}s use |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Publisher
-> GroupedFlowable
* @param evictingMapFactory | ||
* The factory used to create a map that will be used by the implementation to hold the | ||
* {@link GroupedFlowable}s. The evicting map created by this factory must notify when a | ||
* value in this map has been evicted. The next source emission will bring about the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How? By calling the provided input Consumer<Object>
with something?
this.actual = actual; | ||
this.keySelector = keySelector; | ||
this.valueSelector = valueSelector; | ||
this.bufferSize = bufferSize; | ||
this.delayError = delayError; | ||
this.groups = new ConcurrentHashMap<Object, GroupedUnicast<K, V>>(); | ||
if (mapFactory == null) { | ||
this.groups = new ConcurrentHashMap<Object, GroupedUnicast<K, V>>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a fan of this structure. These could be constructor arguments prepared before the GroupBySubscriber
gets initialized. So the try-catch will only cover the non-null case only and the use of GroupBySubscriber
can look like before:
source.subscribe(new GroupBySubscriber<T, K, V>(s, keySelector, valueSelector, bufferSize, delayError, groups, evictedGroups));
@@ -371,6 +408,27 @@ public boolean isEmpty() { | |||
return queue.isEmpty(); | |||
} | |||
} | |||
|
|||
@SuppressWarnings("unchecked") | |||
private static <K,V> Map<Object, GroupedUnicast<K, V>> createMap( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why this is a separate method. Also please avoid private
.
Thanks @akarnokd. I've addressed your comments and also added |
* completion of the evicted {@link GroupedFlowable}s. See example using Guava above. | ||
* {@link GroupedFlowable}s. The evicting map created by this factory must | ||
* notify the provided {@code Consumer<Object>} with the entry value (not the key!) when | ||
* an entry in this map has been evicted.. The next source emission will bring about the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two dots.
@@ -9798,10 +9799,13 @@ public final Disposable forEachWhile(final Predicate<? super T> onNext, final Co | |||
* unique key value and each of which emits those items from the source Publisher that share that | |||
* key value | |||
* @see <a href="http://reactivex.io/documentation/operators/groupby.html">ReactiveX operators documentation: GroupBy</a> | |||
* | |||
* @since 2.1.11 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should fit into 2.1.10, assuming @vanniktech or @artem-zinnatullin approves.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good. Let's get this into 2.1.10
@@ -43,6 +43,7 @@ apply plugin: "me.champeau.gradle.jmh" | |||
apply plugin: "com.github.hierynomus.license" | |||
apply plugin: "com.jfrog.bintray" | |||
apply plugin: "com.jfrog.artifactory" | |||
apply plugin: "eclipse" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need the eclipse plugin?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is so I can run ./gradlew eclipse
for import into Eclipse without using gradle plugin. It's not required for the general build obviously but is a convenience for me. Can remove if has any negative effects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Works for me.
build.gradle
Outdated
@@ -55,7 +56,7 @@ def reactiveStreamsVersion = "1.0.2" | |||
def mockitoVersion = "2.1.0" | |||
def jmhLibVersion = "1.19" | |||
def testNgVersion = "6.11" | |||
|
|||
def guavaVersion = "19.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Latest version is 24.0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I know but 19.0 will do. I don't plan to keep this test dependency up to date. No point as it is for demonstration purposes only.
* | ||
* <p>The map created by an {@code evictingMapFactory} must be thread-safe. | ||
* | ||
* <p>An example of an {@code evictingMapFactory} using <a href="https://google.github.io/guava/releases/19.0/api/docs/com/google/common/cache/CacheBuilder.html">CacheBuilder</a> from the Guava library is below: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably we should reference a newer version link
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not that keen to track this against latest release of guava in the future but can go to v24 if required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have moved to 24.0
* Flowable | ||
* .range(1, 1000) | ||
* // note that number of keys is 10 | ||
* .groupBy(x -> x % 10, x-> x, true, 16, evictingMapFactory) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we probably want x -> x
here to be consistent with the first parameter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't understand that ??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
x-> x
becomes x-> x
x -> x
becomes x -> x
which is then more readable and also consistent with the first parameter
* <p> | ||
* <em>Note:</em> A {@link GroupedFlowable} will cache the items it is to emit until such time as it | ||
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those | ||
* {@code GroupedFlowable}s that do not concern you. Instead, you can signal to them that they may |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead, you can signal to them that they may
sounds very weird to me. Can we rephrase it somehow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a legacy comment that occurs in every groupBy
overload. Perhaps we could leave rework of that phrase to another PR? I agree that the para is a bit clunky and could probably do with explicit examples like don't filter on GroupedFlowable keys (.filter(g -> g.getKey() > 10)
) do blah instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then leave as is.
I've
|
9a0fdd5
to
991efdb
Compare
Added that missing space to javadoc ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per discussion in #5763, this PR adds an overload for
Flowable.groupBy
that specifies anevictingMapFactory
.An example of usage taken from the new javadoc:
Note that I based this operator on the 1.x version which I think may have a bug that goes as far as the signature of that method. The eviction consumer should not be given a key but rather the value from the map. I'll pursue the 1.x issue after dealing with this.