2.x: Fix Observable.flatMap scalar maxConcurrency overflow #5900
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Since
Observable
is not backpressured, theflatMap
of it has to manage the buffering of inner sources so that only a limited number of them are active at the same time. However, when most outstanding inner sources were scalar (just()
is such a source) but the drain loop was busy, the operator overflow its bounded scalar queue (as it is supposed to be holding at most maxConcurrency scalar items), causing anIllegalStateException
.The PR fixes this corner case by making sure the
tryScalarEmit
returns false if it had to queue up the scalar, which in turn prevents the next inner source to be subscribed to until the queued item is cleared. In addition, the terminal state check has to include the buffer holding the remaining inner sources: being done, having an empty scalar queue and having no active inner observers is just not enough.Flowable.flatMap
is not affected as it uses backpressure to ensure only a limited number of sources or scalars are mapped in.Originally reported in the chat of this StackOverflow question: