-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GroupBy backpressure fix #3428
GroupBy backpressure fix #3428
Conversation
The following test starts breaking with this branch. It prints out 1 to 128 then stops and does not print anything else (apparently no progress is made). In branch 1.x this continues to print past 128. range(1, RxRingBuffer.SIZE *4).repeat()
.groupBy((d) -> d)
.flatMap((go) ->
go.doOnNext(System.out::println)
.subscribeOn(Schedulers.io())
)
.subscribe(); |
This is because the lack of horizontal unboundedness of flatMap and the bug in groupBy together made it work in 1.0.14. If I run this code with the 2.x fix and add 1024 as the max concurrency value, it prints lots of values indefinitely. |
To avoid the conflicts with ReactiveX#3428
public void call(GroupedObservable<Object, Integer> g) { | ||
g.subscribe(); | ||
} | ||
}) // this will request Long.MAX_VALUE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this comment meant for the inner observable? The outer observable gets a request of 128.
627c658
to
6caf9c6
Compare
I've updated the comments. |
👍 |
This is a backport of the 2.x GroupBy operator which solves #3425.
One unit test in OperatorRetryTest had to be altered a bit. I believe
the original code relied on a GroupBy behavior which caused the bug in
#3425.