Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: add ParallelFlowable.sequentialDelayError #5117

Merged
merged 3 commits into from
Feb 21, 2017

Conversation

akarnokd
Copy link
Member

@akarnokd akarnokd commented Feb 18, 2017

This PR adds the sequentialDelayError operator to ParallelFlowable that allows awaiting all 'rails' in a parallel flow to terminate normally or with the (composite) exception of the failed rail(s).

To enable this, the Flowable.parallel() operator's behavior regarding rail cancellation had to be changed. In v2.0.5 if any of the rails cancelled (maybe due to a failure, maybe due to an end consumer cancelling the entire parallel flow) the input Flowable was cancelled.

This PR alters this by requiring all rails to cancel before cancelling the input Flowable. The change permits one or multiple rails to fail and let others progress in case the new sequentialDelayError is applied as a terminal operator. The original sequential() operator still cancels all rails if one of them fails (triggering the cancellation of the input Flowable).

Note that this change may still drop and never process elements in the internal queues of the operators in the parallel flow (because the parallel processing is not implemented with work-stealing that could pick up elements from a dead queue). In order to get as many elements processed as possible, it is recommended to reduce the default prefetch on the runOn operator to a reasonable tradeoff value (between throughput and fault tolerance).

Edit

Updated the PR to make sure parallel() ignores cancelled rails when it dispatches items.

Related: #5108.

@akarnokd akarnokd added this to the 2.1 milestone Feb 18, 2017
@codecov-io
Copy link

codecov-io commented Feb 18, 2017

Codecov Report

Merging #5117 into 2.x will decrease coverage by -0.03%.
The diff coverage is 91.03%.

@@             Coverage Diff             @@
##               2.x    #5117      +/-   ##
===========================================
- Coverage       96%   95.97%   -0.03%     
- Complexity    5598     5609      +11     
===========================================
  Files          620      620              
  Lines        39699    39810     +111     
  Branches      5567     5597      +30     
===========================================
+ Hits         38113    38208      +95     
+ Misses         629      627       -2     
- Partials       957      975      +18
Impacted Files Coverage Δ Complexity Δ
...n/java/io/reactivex/parallel/ParallelFlowable.java 100% <100%> (ø) 43 <2> (+2)
...ivex/internal/operators/parallel/ParallelJoin.java 94.83% <90.26%> (-0.47%) 3 <3> (+1)
...rnal/operators/parallel/ParallelFromPublisher.java 95.73% <92.85%> (-0.84%) 4 <ø> (ø)
...al/operators/observable/ObservableSampleTimed.java 91.66% <ø> (-5%) 3% <ø> (ø)
.../internal/operators/maybe/MaybeTakeUntilMaybe.java 95.91% <ø> (-4.09%) 2% <ø> (ø)
...ernal/operators/maybe/MaybeTakeUntilPublisher.java 96% <ø> (-4%) 2% <ø> (ø)
...tivex/internal/observers/FutureSingleObserver.java 94.33% <ø> (-3.78%) 24% <ø> (-1%)
...nternal/operators/parallel/ParallelSortedJoin.java 91.3% <ø> (-3.63%) 2% <ø> (ø)
.../internal/operators/flowable/FlowableInterval.java 93.33% <ø> (-3.34%) 2% <ø> (ø)
...ex/internal/operators/maybe/MaybeTimeoutMaybe.java 95.58% <ø> (-2.95%) 2% <ø> (ø)
... and 42 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 421c5bb...ff2c2c6. Read the comment docs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants