-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
switchOnMap with single returned item cancels input #3936
Comments
Hey, @yawkat ! Thanks for the detailed report. The thing is that you are attempting to use the internal product of After you've obtained the
What you do instead is not derive anything but rather capture this intermediate result and terminate the chain with it. That's not supported. However, perhaps the javadoc can be more specific here. There's also the following note:
But it follows with another risk that you are not actually running into since you complete the execution. That's when the source is actually cancelled! I propose two follow up tasks:
|
@chemicL I understand why it breaks the way it does from reading the code, but I disagree that this behavior is forbidden by the javadoc. The note about deriving the result of the lambda from the given publisher just says "strongly advised", it doesn't say it's necessary, and the javadoc makes it appear like this is because you may leave a dangling flux, which the code above does not. Also, I just tried this with |
Thanks, that's interesting, I wasn't aware.
With a whole rewrite happening in 3.4.6: And from my POV a regression is introduced in the following fix for another issue: https://github.com/reactor/reactor-core/pull/2794/files#diff-ca833c86dffdc3cb95fd2ead3626c836e9951f7b01a38163215ad483479916a3L654-R660 All in all, I think the time to report regressions between |
In Micronaut HTTP, when there is a controller like I don't need a workaround anymore, I implemented my own processor for it: https://github.com/micronaut-projects/micronaut-core/blob/126058cf5fd16e7ac565ec0e70840799f46a52a1/core-reactive/src/main/java/io/micronaut/core/async/subscriber/LazySendingSubscriber.java Just thought this might still be worth fixing. |
Thanks for the additional context. With that, I'll inline my observations about the initial example you provided as comments: Flux<String> input = Flux.just("foo", "bar");
// Mono.from causes the execution to cancel the provided Publisher once the first item is emitted
Mono<Flux<String>> nested = Mono.from(input.switchOnFirst((first, all) -> Mono.just(all)));
// block() waits for the Mono to complete -> once the first item is retrieved, the source is cancelled
// meaning that SwitchOnFirstMain is cancelled
Flux<String> item = nested.block();
System.out.println(item); // prints SwitchOnFirstMain as expected
// collectList().block() subscribes to a cancelled SwitchOnFirstMain,
// it is a bug that it doesn't terminate but waits forever.
System.out.println(item.collectList().block()); // never terminates as the item has been cancelled So essentially it's not really the effect of Let's apply the first mitigation so that the cancellation does not happen, Flux<Flux<String>> nested = Flux.from(input.switchOnFirst((first, all) -> Mono.just(all)));
// now we use blockLast to wait for the last item of the Flux
Flux<String> item = nested.blockLast(); Now, from the
And that is expected as I commented before, unless you use the input.switchOnFirst((first, all) -> Mono.just(all), false) This will successfully print:
As an alternative to using Flux<String> input = Flux.just("foo", "bar");
Mono<Flux<String>> nested =
input.switchOnFirst((first, all) -> Mono.just(all), false).single();
Flux<String> item = nested.block();
System.out.println(item);
System.out.println(item.collectList().block()); |
In a situation where `SwitchOnFirstMain` is subscribed to after the operator itself has terminated, the expectation is that either an error is propagated due to cancellation of the source or the first actual subscription succeeds. Due to a regression introduced in #2794 the subscription would hang with no termination. This change addresses the issue and the subscrption properly terminates. Related to #3936. Signed-off-by: Dariusz Jędrzejczyk <dariusz.jedrzejczyk@broadcom.com>
In a situation where `SwitchOnFirstMain` is subscribed to after the operator itself has terminated, the expectation is that either an error is propagated due to cancellation of the source or the first actual subscription succeeds. Due to a regression introduced in #2794 the subscription would hang with no termination. This change addresses the issue and the subscrption properly terminates. Related to #3936. Signed-off-by: Dariusz Jędrzejczyk <dariusz.jedrzejczyk@broadcom.com>
Pushed the fix to the hanging behaviour. With the above explanation I think the issue can be closed. @yawkat hope this helps. Please don't hesitate to open a new issue in case something else is fishy. |
Thanks! |
Expected Behavior
When a switchOnFirst function returns a single item that contains the input flux but is not derived through operators, the input flux should remain available without restriction.
Actual Behavior
The input flux is cancelled when the outer flux is cancelled (e.g. by
Mono.from
), making it unusable.Steps to Reproduce
Possible Solution
In the above example, Mono.from sees the onNext immediately, and cancels the FluxSwitchOnFirst. Cancelling FluxSwitchOnFirst however also cancels the SwitchOnFirstMain, which it should not. Cancellation should only be sent to the publisher returned by the switchOnFirst lambda (in this case the Mono.just)
Your Environment
netty
, ...): N/AThe text was updated successfully, but these errors were encountered: