Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: add Flowable.parallel() and parallel operators #4974

Merged
merged 2 commits into from
Jan 18, 2017

Conversation

akarnokd
Copy link
Member

@akarnokd akarnokd commented Jan 8, 2017

This PR adds the parallel() method to Flowable which opens up a sub-DSL with parallel operations. (Note that only a few operators make sense in a parallel settings.)

This parallel sub-DSL is not limited to computation tasks as it allows specifying the parallelism and the Scheduler to run the parallel 'rails'. For example, you can have parallel downloads that block:

Flowable.range(1, 100)
.parallel(10)
.runOn(Schedulers.io())
.map(v -> httpClient.blockingGet("http://server/item/" + v))
.sequential()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(...);

@akarnokd akarnokd added this to the 2.1 milestone Jan 8, 2017
@codecov-io
Copy link

codecov-io commented Jan 8, 2017

Current coverage is 94.94% (diff: 74.69%)

Merging #4974 into 2.x will decrease coverage by 0.59%

@@                2.x      #4974   diff @@
==========================================
  Files           592        609     +17   
  Lines         37969      39186   +1217   
  Methods           0          0           
  Messages          0          0           
  Branches       5752       5968    +216   
==========================================
+ Hits          36273      37204    +931   
- Misses          741        955    +214   
- Partials        955       1027     +72   

Powered by Codecov. Last update cd45675...14111b6

@akarnokd
Copy link
Member Author

akarnokd commented Jan 8, 2017

I'll restore the +95% coverage in a separate PR.

@benjchristensen
Copy link
Member

I have a use case that could benefit from this depending on how it is implemented. I don't see the API for ParallelFlowable.merge(Flowable<Flowable<T>> flowables) however, which is what I'd need and have to manually do.

Let me describe the type of parallel processing and see if your goal of ParallelFlowable matches it.

100s of network connections, each spread across n event loops (say 16). The semantic behavior is to merge the 100s of connections into a single stream, then do groupBy on all of them, and on each GroupedFlowable then does a scan. With normal Flowable this is bad, as it takes the 16 threads and synchronizes them all, even though each source Flowable is on one of the 16 threads, and each output GroupedObservable can be processed concurrently again on those 16 threads.

In theory, a ParallelFlowable.merge(sourceFlowables).groupBy(...).scan(...) could allow the merge to support concurrent onNext and then ParallelFlowable.groupBy could re-emit a normal Flowable where scan works sequentially again.

Is this the type of thing you want ParallelFlowable to enable?

@akarnokd
Copy link
Member Author

akarnokd commented Jan 9, 2017

@benjchristensen No. ParallelFlowable optimizes for a fixed parallelism level with round-robin dispatch and round-robin join. The closest thing is the parallelStream() operator in Java 8 for computation-intensive tasks. Your case has an unknown number of inner sources to merge and an unknown number of groups that could appear.

@benjchristensen
Copy link
Member

Too bad. Maybe someday I'll get around to making a "ConcurrentFlowable" happen ... but it's been on my todo list for 3 years, so not counting on it :-)

@akarnokd
Copy link
Member Author

@JakeWharton Do you want to review this or if not, do you at least willing to accept it into RxJava 2?

Copy link
Contributor

@artem-zinnatullin artem-zinnatullin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few nits.

return;
}

int n = subscribers.length;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same in other similar places would be good.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't do those unless the variable has to be accessed from an inner class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In long methods reader has to spend extra time to check that it's not modified anywhere, but ok

/**
* Flattens the generated Publishers on each rail.
*
* @param <T> the input value type
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: naming generic type parameters Input and Output would remove such comments and make code slightly more readable. Or I and O as a reference to common I/O abbr.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an established naming pattern with other generic types of RxJava.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, "just saying"

@@ -0,0 +1,51 @@
/**
* Copyright 2016 Netflix, Inc.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(c) 2016-present, RxJava Contributors here and in all other files.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll update the PR.

} else {
SimpleQueue<T> q = inner.getQueue();

// FIXME overflow handling
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Signal MBE? When do you plan to resolve FIXME? It'll lead to silently dropped values…

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding it right now.

} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(ex);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add return to avoid request in case of error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FlowableDoOnLifecycle doesn't return either. There is no good way to report an error and not inject a lot of overhead. See strict().

* times as this ParallelFlowable's parallelism level is.
* <p>
* No assumptions are made about the Scheduler's parallelism level,
* if the Scheduler's parallelism level is lwer than the ParallelFlowable's,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lower

* times as this ParallelFlowable's parallelism level is.
* <p>
* No assumptions are made about the Scheduler's parallelism level,
* if the Scheduler's parallelism level is lwer than the ParallelFlowable's,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lower

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixing.

@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@CheckReturnValue
public final Flowable<T> sequential() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be a verb: sequentize()/etc to be consistent with other operators (which are verbs mostly).

Btw, reading chains like:

Flowable.range(1, 100)
.parallel(10)
.runOn(Schedulers.io())
.map(v -> httpClient.blockingGet("http://server/item/" + v))
.sequential()

feels strange because sequential after parallel looks like an operator that disables parallelization of the chain (of course it can't, but I dunno, it just reads strange to me).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tis naming matches Java 8 Stream's parallel() and sequential() operators.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, though JDK is not the best example of good naming.

* @return the new Px instance
*/
@CheckReturnValue
public final Flowable<T> sorted(Comparator<? super T> comparator) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sort?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Matches the naming of Flowable.sorted().

@artem-zinnatullin
Copy link
Contributor

What about tests and benchmark comparisons with parallelization that you can achieve at the moment, using existing RxJava apis?

* and dispatches the upstream items to them in a round-robin fashion.
* <p>
* Note that the rails don't execute in parallel on their own and one needs to
* apply {@link ParallelFlowable#runOn(Scheduler)} to specify the Scheduler where
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about remove runOn and add Scheduler as a parameter to parallel()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the same logic as with regular factory methods such as just, range, fromIterable don't take a Scheduler, plus you can apply multiple runOn's on a sequence at different stages. For example create a pipeline with stages of parallelism=2 and 3 stages in total.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plus you can apply multiple runOn's on a sequence at different stages

Ah, that's nice, got it.

@akarnokd
Copy link
Member Author

I've added a benchmark and here are the results (i7 4770K, Windows 7 x64, Java 8u112):

image

Raw data

Clearly, parallel has lower overhead than flatMap-based, 1 element parallelism.

image

Comparing against groupBy, the benefits manifest with longer per-item computation but groupBy looks odd: in each compute/parallelism setup the numbers are really close to each other as if there wasn't actual parallel execution with groupBy. I have to investigate that further.

Copy link
Collaborator

@davidmoten davidmoten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I like it.

requested.addAndGet(-e);
}

int w = get();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't seen this optimization before with missed (calling get before addAndGet). Is this particular to the ParallelJoin use case or do you expect to start applying it elsewhere too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a couple of places which uses this pattern: range, observeOn, fromArray.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not going to apply them eagerly because it is another local variable/register to worry about when there are lots of other locals in deeper/user code.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok thanks

@akarnokd
Copy link
Member Author

Updated the groupBy benchmark. I forgot that v was constant and thus the group expression didn't create 1..4 groups. New results (i7 4790, Windows 7 x64, Java 8u112):

image

For smaller computation, parallel has less overhead. For longer computation, they are roughly next to each other. Parallel uses round-robin collection whereas flatMap collects from a source as long as it can.

@akarnokd akarnokd merged commit 6c88036 into ReactiveX:2.x Jan 18, 2017
@akarnokd akarnokd deleted the ParallelFlowable branch January 18, 2017 22:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants