Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: fix groupBy cancellation with evicting map factory #5947

Merged

Conversation

davidmoten
Copy link
Collaborator

Fixes #5933.

When the outer stream is cancelled there may be evicted groups present in the queue to be completed.

I've added code to complete the evicted groups in the cancel method as suggested by @akarnokd. To protect against a race condition where the EvictionAction is called just after the evicted groups are completed in cancel I've added a cancelled check to EvictionAction.

However, I am concerned about a race that I'll place in the comments on the code below.

} else {
// note that this call to `onComplete` is safe because
// GroupedUnicast is designed to allow concurrent access
group.onComplete();
Copy link
Collaborator Author

@davidmoten davidmoten Apr 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned about a race with group.onNext where cancelled has just happened and group.onComplete gets called above which then drains in State. At that point group.onNext gets called from another thread and that emission is lost. I'm guessing some significant rework might be needed to protect against this. It is a cancellation scenario so perhaps not important. Thoughts?

@codecov
Copy link

codecov bot commented Apr 3, 2018

Codecov Report

Merging #5947 into 2.x will increase coverage by 0.12%.
The diff coverage is 100%.

Impacted file tree graph

@@             Coverage Diff              @@
##                2.x    #5947      +/-   ##
============================================
+ Coverage     98.23%   98.35%   +0.12%     
- Complexity     6169     6174       +5     
============================================
  Files           665      665              
  Lines         44717    44726       +9     
  Branches       6205     6206       +1     
============================================
+ Hits          43928    43992      +64     
+ Misses          242      213      -29     
+ Partials        547      521      -26
Impacted Files Coverage Δ Complexity Δ
...x/internal/operators/flowable/FlowableGroupBy.java 95.51% <100%> (+0.11%) 3 <0> (ø) ⬇️
.../operators/observable/ObservableFlatMapSingle.java 88.8% <0%> (-6.72%) 2% <0%> (ø)
.../io/reactivex/disposables/CompositeDisposable.java 98.14% <0%> (-1.86%) 39% <0%> (-1%)
...perators/observable/ObservableMergeWithSingle.java 99.06% <0%> (-0.94%) 2% <0%> (ø)
.../operators/maybe/MaybeFlatMapIterableFlowable.java 97.54% <0%> (-0.82%) 2% <0%> (ø)
...vex/internal/operators/parallel/ParallelRunOn.java 96.61% <0%> (-0.49%) 8% <0%> (ø)
...main/java/io/reactivex/subjects/ReplaySubject.java 97.69% <0%> (-0.42%) 50% <0%> (ø)
...al/operators/observable/ObservableWindowTimed.java 90.73% <0%> (-0.28%) 4% <0%> (ø)
...ternal/operators/observable/ObservableFlatMap.java 87.22% <0%> (ø) 3% <0%> (ø) ⬇️
...va/io/reactivex/processors/MulticastProcessor.java 98.67% <0%> (+0.44%) 84% <0%> (+1%) ⬆️
... and 24 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 43ceedf...bd78a20. Read the comment docs.

@davidmoten davidmoten changed the title 2.x: fix groupBy cancellation with evicting map factory (#5933) 2.x: fix groupBy cancellation with evicting map factory Apr 3, 2018
import com.google.common.cache.*;

import io.reactivex.*;
import org.reactivestreams.Publisher;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't unroll star imports.

@davidmoten
Copy link
Collaborator Author

Fixed imports

@davidmoten
Copy link
Collaborator Author

davidmoten commented Apr 3, 2018

I'm thinking now that we need to complete evicted groups in the onError and onComplete methods as well (and deal with late arrivals to evictedGroups).

@davidmoten
Copy link
Collaborator Author

I've added a bit more code to complete evicted groups in the onError and onComplete methods as well. I'll keep pondering it a bit.

public void accept(GroupedUnicast<K,V> value) {
evictedGroups.offer(value);
public void accept(GroupedUnicast<K,V> group) {
if (!cancelled.get() && !sourceDone.get()) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

insufficient race protection here, I'll fix

@davidmoten
Copy link
Collaborator Author

Excuse the delay, busy times. I'll return to this soonish. Can close the PR and reopen later if desired.

@davidmoten
Copy link
Collaborator Author

Having a look now

@davidmoten davidmoten force-pushed the fix-group-by-eviction-cancellation branch 2 times, most recently from fe34c53 to 6dbce2f Compare May 29, 2018 04:26
@davidmoten davidmoten force-pushed the fix-group-by-eviction-cancellation branch from 6dbce2f to bd78a20 Compare May 29, 2018 04:27
@davidmoten
Copy link
Collaborator Author

I've redone this PR.

The test failure submitted in #5933 arose because groupCount needs to be decremented when groups are evicted from the map (I'm doing it when we complete evicted groups which happens before checking groupCount is 0 for the purpose of cancelling the source). We also needed to complete evictions when cancel is called as @akarnokd pointed out.

I've also included a perf improvement (without stats!) where a volatile boolean done served both as post-terminal event protection and as a memory barrier in the drain method. Splitting those two functions into boolean done and volatile boolean finished means particularly that every source onNext emission will do one less volatile read for post-terminal event protection (and onComplete and onError benefit too). There's a slight allocation cost I suppose.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants