-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
2.x: fix groupBy cancellation with evicting map factory #5947
2.x: fix groupBy cancellation with evicting map factory #5947
Conversation
} else { | ||
// note that this call to `onComplete` is safe because | ||
// GroupedUnicast is designed to allow concurrent access | ||
group.onComplete(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm concerned about a race with group.onNext
where cancelled has just happened and group.onComplete
gets called above which then drains in State
. At that point group.onNext
gets called from another thread and that emission is lost. I'm guessing some significant rework might be needed to protect against this. It is a cancellation scenario so perhaps not important. Thoughts?
Codecov Report
@@ Coverage Diff @@
## 2.x #5947 +/- ##
============================================
+ Coverage 98.23% 98.35% +0.12%
- Complexity 6169 6174 +5
============================================
Files 665 665
Lines 44717 44726 +9
Branches 6205 6206 +1
============================================
+ Hits 43928 43992 +64
+ Misses 242 213 -29
+ Partials 547 521 -26
Continue to review full report at Codecov.
|
import com.google.common.cache.*; | ||
|
||
import io.reactivex.*; | ||
import org.reactivestreams.Publisher; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't unroll star imports.
Fixed imports |
I'm thinking now that we need to complete evicted groups in the |
I've added a bit more code to complete evicted groups in the |
public void accept(GroupedUnicast<K,V> value) { | ||
evictedGroups.offer(value); | ||
public void accept(GroupedUnicast<K,V> group) { | ||
if (!cancelled.get() && !sourceDone.get()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
insufficient race protection here, I'll fix
Excuse the delay, busy times. I'll return to this soonish. Can close the PR and reopen later if desired. |
Having a look now |
fe34c53
to
6dbce2f
Compare
6dbce2f
to
bd78a20
Compare
I've redone this PR. The test failure submitted in #5933 arose because I've also included a perf improvement (without stats!) where a volatile boolean |
Fixes #5933.
When the outer stream is cancelled there may be evicted groups present in the queue to be completed.
I've added code to complete the evicted groups in the
cancel
method as suggested by @akarnokd. To protect against a race condition where the EvictionAction is called just after the evicted groups are completed incancel
I've added acancelled
check toEvictionAction
.However, I am concerned about a race that I'll place in the comments on the code below.