You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Flux hangs when GroupedFlux is used together with ParallelFlux. i.e. when we use parallel to process groups created by groupBy. See code below for exact scenario.
Expected Behavior
ParallelFlux can be used together with GroupedFlux to processes data and finish without errors.
Actual Behavior
Flux hangs and stops processing data.
Steps to Reproduce
Looks like there is unexpected interaction when GroupedFlux and ParallelFlux are used together.
(NB parallelism of nested ParallelFlux is irrelevant the issue happens with any parallelism)
Note: if we "hide" GroupedFlux identity then Flux doesn't hang.
importreactor.core.publisher.Flux;
importreactor.core.scheduler.Scheduler;
importreactor.core.scheduler.Schedulers;
publicclassGroupByIssue {
publicstaticvoidmain(String[] args) throwsInterruptedException {
intparallelism = 2; // issue can be reproduced with any parallelismSchedulerprocess = Schedulers.newParallel("process", parallelism, true);
finallongstart = System.nanoTime();
Flux.range(0, 4_000_000)
.subscribeOn(Schedulers.newSingle("range", true))
.groupBy(i -> i % 2)
.flatMap(g ->
g.key() == 0
? g//.hide() /* adding hide here fixes the hang issue */
.parallel(parallelism)
.runOn(process)
.map(i -> i)
.sequential()
: g.map(i -> i) // no need to use hide
)
.doOnNext(i -> print(i))
.then()
.block();
System.out.printf("elapsed: %d\n", (System.nanoTime() - start) / 1_000_000);
}
privatestaticvoidprint(intcurrent) {
if (current % 100000 == 0) {
System.out.println("processed: " + current);
}
}
}
Problem description:
Flux hangs when GroupedFlux is used together with ParallelFlux. i.e. when we use parallel to process groups created by groupBy. See code below for exact scenario.
Expected Behavior
ParallelFlux can be used together with GroupedFlux to processes data and finish without errors.
Actual Behavior
Flux hangs and stops processing data.
Steps to Reproduce
Looks like there is unexpected interaction when GroupedFlux and ParallelFlux are used together.
(NB parallelism of nested ParallelFlux is irrelevant the issue happens with any parallelism)
Note: if we "hide" GroupedFlux identity then Flux doesn't hang.
Environment
The text was updated successfully, but these errors were encountered: