Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Fix Observable.flatMap scalar maxConcurrency overflow #5900

Merged

Conversation

akarnokd
Copy link
Member

@akarnokd akarnokd commented Mar 7, 2018

Since Observable is not backpressured, the flatMap of it has to manage the buffering of inner sources so that only a limited number of them are active at the same time. However, when most outstanding inner sources were scalar (just() is such a source) but the drain loop was busy, the operator overflow its bounded scalar queue (as it is supposed to be holding at most maxConcurrency scalar items), causing an IllegalStateException.

The PR fixes this corner case by making sure the tryScalarEmit returns false if it had to queue up the scalar, which in turn prevents the next inner source to be subscribed to until the queued item is cleared. In addition, the terminal state check has to include the buffer holding the remaining inner sources: being done, having an empty scalar queue and having no active inner observers is just not enough.

Flowable.flatMap is not affected as it uses backpressure to ensure only a limited number of sources or scalars are mapped in.

Originally reported in the chat of this StackOverflow question:


AndroidRuntime: FATAL EXCEPTION: RxCachedThreadScheduler-1 
Process: com.hackerli.girl, PID: 6030 
java.lang.IllegalStateException: Scalar queue full?! 
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.tryEmitScalar(ObservableFlatMap.java:250) 
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.subscribeInner(ObservableFlatMap.java:146) 
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.drainLoop(ObservableFlatMap.java:475) 
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.drain(ObservableFlatMap.java:323) 
at io.reactivex.internal.operators.observable.ObservableFlatMap$InnerObserver.onComplete(ObservableFlatMap.java:579) 
at io.reactivex.internal.observers.BasicFuseableObserver.onComplete(BasicFuseableObserver.java:119) 
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onComplete(ObservableSubscribeOn.java:68) 
at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onComplete(BodyObservable.java:66) 
at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:47)
at io.reactivex.Observable.subscribe(Observable.java:11442) 
at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34) 
at io.reactivex.Observable.subscribe(Observable.java:11442) 
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96) 
at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:571) 
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66) 
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57) 
at java.util.concurrent.FutureTask.run(FutureTask.java:237) 
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:269) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588) 
at java.lang.Thread.run(Thread.java:818)

@akarnokd akarnokd added this to the 2.2 milestone Mar 7, 2018
@codecov
Copy link

codecov bot commented Mar 7, 2018

Codecov Report

Merging #5900 into 2.x will decrease coverage by 0.01%.
The diff coverage is 88.88%.

Impacted file tree graph

@@             Coverage Diff             @@
##                2.x   #5900      +/-   ##
===========================================
- Coverage     97.92%   97.9%   -0.02%     
- Complexity     5984    5988       +4     
===========================================
  Files           655     655              
  Lines         43863   43872       +9     
  Branches       6076    6078       +2     
===========================================
+ Hits          42953   42954       +1     
- Misses          281     288       +7     
- Partials        629     630       +1
Impacted Files Coverage Δ Complexity Δ
...ternal/operators/observable/ObservableFlatMap.java 86.58% <88.88%> (+0.39%) 3 <0> (ø) ⬇️
...l/operators/observable/ObservableFlatMapMaybe.java 84.96% <0%> (-5.89%) 2% <0%> (ø)
.../io/reactivex/internal/functions/ObjectHelper.java 94.73% <0%> (-5.27%) 20% <0%> (-1%)
...nternal/operators/observable/ObservableCreate.java 92.17% <0%> (-5.22%) 2% <0%> (ø)
...tivex/internal/schedulers/TrampolineScheduler.java 96.1% <0%> (-2.6%) 6% <0%> (ø)
...ava/io/reactivex/processors/BehaviorProcessor.java 96.86% <0%> (-2.25%) 60% <0%> (ø)
...perators/single/SingleFlatMapIterableFlowable.java 96.66% <0%> (-1.67%) 2% <0%> (ø)
...vex/internal/operators/flowable/FlowableCache.java 93.24% <0%> (-1.36%) 7% <0%> (ø)
...perators/mixed/ObservableSwitchMapCompletable.java 98.93% <0%> (-1.07%) 2% <0%> (ø)
...rnal/operators/mixed/ObservableConcatMapMaybe.java 99.17% <0%> (-0.83%) 2% <0%> (ø)
... and 15 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 2edea6b...eb9cc47. Read the comment docs.

@akarnokd akarnokd merged commit 3aae12e into ReactiveX:2.x Mar 7, 2018
@akarnokd akarnokd deleted the ObservableFlatMapScalarOverflowFix branch March 7, 2018 17:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants