From 1435284b5e5fc8526d00eafa27bfbfefb4bd84f6 Mon Sep 17 00:00:00 2001 From: David Karnok Date: Thu, 21 Nov 2019 18:31:08 +0100 Subject: [PATCH] fix #1959 GroupedFlux fused with parallel() not replenishing properly Fix a case when the `GroupedFlowable` is consumed by a `parallel()` in fusion mode causing the source to stop replenishing items from the upstream, hanging the whole sequence. `parallel()` was slightly different from the usual queue consumers because it checks for `isEmpty` before trying to `pull` for an item. This was necessary because the rails may not be ready for more and an eager `pull` to check for emptyness would lose that item. The replenishing was done in `GroupedFlowable.pull` but a call to `GroupedFlowable.isEmpty` would not replenish. The fix is to have `isEmpty` replenish similar to when `poll` detects emptyness and replenishes. --- .../reactor/core/publisher/FluxGroupBy.java | 26 ++++++++++++------- .../core/publisher/FluxGroupByTest.java | 23 ++++++++++++++++ 2 files changed, 40 insertions(+), 9 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxGroupBy.java b/reactor-core/src/main/java/reactor/core/publisher/FluxGroupBy.java index 3ae843e37f..6fce20dbfa 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxGroupBy.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxGroupBy.java @@ -733,18 +733,22 @@ public V poll() { produced++; } else { - int p = produced; - if (p != 0) { - produced = 0; - GroupByMain main = parent; - if (main != null) { - main.s.request(p); - } - } + tryReplenish(); } return v; } + void tryReplenish() { + int p = produced; + if (p != 0) { + produced = 0; + GroupByMain main = parent; + if (main != null) { + main.s.request(p); + } + } + } + @Override public int size() { return queue.size(); @@ -752,7 +756,11 @@ public int size() { @Override public boolean isEmpty() { - return queue.isEmpty(); + if (queue.isEmpty()) { + tryReplenish(); + return true; + } + return false; } @Override diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxGroupByTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxGroupByTest.java index 2820bf9469..dda46ab3f5 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxGroupByTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxGroupByTest.java @@ -29,6 +29,7 @@ import reactor.core.CoreSubscriber; import reactor.core.Fuseable; import reactor.core.Scannable; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import reactor.test.publisher.FluxOperatorTest; @@ -882,4 +883,26 @@ public void scanUnicastGroupedFlux() { assertThat(test.scan(Scannable.Attr.ERROR)).isSameAs(test.error); } + @Test(timeout = 10000) + public void fusedGroupByParallel() { + int parallelism = 2; + Scheduler process = Schedulers.newParallel("process", parallelism, true); + + final long start = System.nanoTime(); + + Flux.range(0, 500_000) + .subscribeOn(Schedulers.newSingle("range", true)) + .groupBy(i -> i % 2) + .flatMap(g -> + g.key() == 0 + ? g //.hide() /* adding hide here fixes the hang issue */ + .parallel(parallelism) + .runOn(process) + .map(i -> i) + .sequential() + : g.map(i -> i) // no need to use hide + ) + .then() + .block(); + } }