Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flux hangs when GroupedFlux is used together with ParallelFlux #1959

Closed
kcieniuch-r7 opened this issue Nov 20, 2019 · 0 comments
Closed

Flux hangs when GroupedFlux is used together with ParallelFlux #1959

kcieniuch-r7 opened this issue Nov 20, 2019 · 0 comments

Comments

@kcieniuch-r7
Copy link

Problem description:

Flux hangs when GroupedFlux is used together with ParallelFlux. i.e. when we use parallel to process groups created by groupBy. See code below for exact scenario.

Expected Behavior

ParallelFlux can be used together with GroupedFlux to processes data and finish without errors.

Actual Behavior

Flux hangs and stops processing data.

Steps to Reproduce

Looks like there is unexpected interaction when GroupedFlux and ParallelFlux are used together.
(NB parallelism of nested ParallelFlux is irrelevant the issue happens with any parallelism)

Note: if we "hide" GroupedFlux identity then Flux doesn't hang.

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class GroupByIssue {

    public static void main(String[] args) throws InterruptedException {
        int parallelism = 2; // issue can be reproduced with any parallelism

        Scheduler process = Schedulers.newParallel("process", parallelism, true);

        final long start = System.nanoTime();

        Flux.range(0, 4_000_000)
            .subscribeOn(Schedulers.newSingle("range", true))
            .groupBy(i -> i % 2)
            .flatMap(g ->
                g.key() == 0
                    ? g //.hide()  /* adding hide here fixes the hang issue */
                        .parallel(parallelism)
                        .runOn(process)
                        .map(i -> i)
                        .sequential()
                    : g.map(i -> i) // no need to use hide
            )
            .doOnNext(i -> print(i))
            .then()
            .block();

        System.out.printf("elapsed: %d\n", (System.nanoTime() - start) / 1_000_000);
    }

    private static void print(int current) {
        if (current % 100000 == 0) {
            System.out.println("processed: " + current);
        }
    }
}

Environment

  • Reactor version used
    • reactor-core 3.3.0.RELEASE Dysprosium-SR1
  • JVM version
    • openjdk version "11.0.4" 2019-07-16
    • OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.4+11)
    • OpenJDK 64-Bit Server VM AdoptOpenJDK (build 11.0.4+11, mixed mode)
  • OS and version
    • macOS Mojave 10.14.6
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants