Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: add resilient versions of parallel map(), filter() & doOnNext() #5202

Merged
merged 1 commit into from
Mar 23, 2017

Conversation

akarnokd
Copy link
Member

This PR adds 2 new overloads to ParallelFlowable operators map, filter and doOnNext to enable per item error handling in case the main function fails with some exception.

Flowable.range(0, 2)
.parallel(1)
.map(v -> 1 / v, ParallelFailureHandling.SKIP)
.sequential()
.test()
.assertResult(1);

The new ParallelFailureHandling has some default enumeration values to handle the common cases. In addition, the BiFunction overload allows bounded retries and/or conditional handling of failures.

Related: #5128.

@akarnokd akarnokd added this to the 2.1 milestone Mar 20, 2017
@codecov
Copy link

codecov bot commented Mar 20, 2017

Codecov Report

Merging #5202 into 2.x will increase coverage by 0.15%.
The diff coverage is 98.42%.

@@             Coverage Diff             @@
##                2.x   #5202      +/-   ##
===========================================
+ Coverage     95.94%   96.1%   +0.15%     
- Complexity     5677    5709      +32     
===========================================
  Files           621     625       +4     
  Lines         40611   40991     +380     
  Branches       5632    5675      +43     
===========================================
+ Hits          38963   39393     +430     
+ Misses          666     627      -39     
+ Partials        982     971      -11
Impacted Files Coverage Δ Complexity Δ
...ex/internal/operators/parallel/ParallelFilter.java 97.59% <100%> (ø) 6 <0> (ø) ⬇️
...n/java/io/reactivex/parallel/ParallelFlowable.java 100% <100%> (ø) 49 <6> (+6) ⬆️
...io/reactivex/parallel/ParallelFailureHandling.java 100% <100%> (ø) 3 <3> (?)
...internal/operators/parallel/ParallelFilterTry.java 98.26% <98.26%> (ø) 6 <6> (?)
...ternal/operators/parallel/ParallelDoOnNextTry.java 98.33% <98.33%> (ø) 6 <6> (?)
...ex/internal/operators/parallel/ParallelMapTry.java 98.33% <98.33%> (ø) 6 <6> (?)
...erators/completable/CompletableConcatIterable.java 95.91% <0%> (-4.09%) 2% <0%> (ø)
...reactivex/internal/operators/single/SingleAmb.java 96.36% <0%> (-3.64%) 9% <0%> (-1%)
...perators/flowable/FlowableSequenceEqualSingle.java 96.61% <0%> (-3.39%) 3% <0%> (ø)
...ternal/operators/flowable/FlowableSubscribeOn.java 94.91% <0%> (-3.39%) 2% <0%> (ø)
... and 46 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 21a7a05...dc08894. Read the comment docs.

@AarjavP
Copy link

AarjavP commented Jun 6, 2018

Sorry if this is not the correct place for this question, however it's relevant to the above feature.
I would like to have an error handler/on error callback but continue the processing, or specify the number of retries.
Can the ParallelFailureHandling enum be made into an interface instead? I see the enum instances are all used in the same way in all the tryOnNext methods of the 3 operators (filter, map, and doOnNext). The existing enums can maybe 'built-in' handler instances. Something like:

built-ins:

.map(v -> 1 / v, ParallelFailureHandling.SKIP) //static instance

some customizable built-ins:

.map(v -> 1 / v, ParallelFailureHandling.retry(5)) //retry 5 times before failing

user defined:

.map(v -> 1 / v, (retries, err) -> { log.error(err); return ParrallelFailureHandling.Result.SKIP; })

The return values could be SKIP/IGNORE, RETRY, STOP, default(val), or error(ex)

Or perhaps this is already do able using existing api?

@akarnokd
Copy link
Member Author

akarnokd commented Jun 6, 2018

Please always check the available operators first before asking for a feature: http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/parallel/ParallelFlowable.html#map-io.reactivex.functions.Function-io.reactivex.functions.BiFunction-

The enums are just convenience implementations of the BiFunction variants of the operators.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants