Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

switchOnMap with single returned item cancels input #3936

Closed
yawkat opened this issue Nov 13, 2024 · 7 comments
Closed

switchOnMap with single returned item cancels input #3936

yawkat opened this issue Nov 13, 2024 · 7 comments
Assignees
Labels
status/invalid We don't feel this issue is valid, or the root cause was found outside of Reactor

Comments

@yawkat
Copy link

yawkat commented Nov 13, 2024

Expected Behavior

When a switchOnFirst function returns a single item that contains the input flux but is not derived through operators, the input flux should remain available without restriction.

Actual Behavior

The input flux is cancelled when the outer flux is cancelled (e.g. by Mono.from), making it unusable.

Steps to Reproduce

    public static void main(String[] args) {
        Flux<String> input = Flux.just("foo", "bar");
        Mono<Flux<String>> nested = Mono.from(input.switchOnFirst((first, all) -> Mono.just(all)));
        Flux<String> item = nested.block();
        System.out.println(item); // prints SwitchOnFirstMain as expected
        System.out.println(item.collectList().block()); // never terminates as the item has been cancelled
    }

Possible Solution

In the above example, Mono.from sees the onNext immediately, and cancels the FluxSwitchOnFirst. Cancelling FluxSwitchOnFirst however also cancels the SwitchOnFirstMain, which it should not. Cancellation should only be sent to the publisher returned by the switchOnFirst lambda (in this case the Mono.just)

Your Environment

  • Reactor version(s) used: 3.6.9
  • Other relevant libraries versions (eg. netty, ...): N/A
@chemicL
Copy link
Member

chemicL commented Nov 20, 2024

Hey, @yawkat !

Thanks for the detailed report. The thing is that you are attempting to use the internal product of switchOnFirst outside of its scope.

After you've obtained the Flux<String> item from block() it means that the nested chain has completed. Subscribing to its inner representation is disallowed outside of the scope of the lambdas that you can define. As stated in the javadoc:

The whole source (including the first signal) is passed as second argument to the BiFunction and it is very strongly advised to always build upon with operators (see below).

What you do instead is not derive anything but rather capture this intermediate result and terminate the chain with it. That's not supported. However, perhaps the javadoc can be more specific here.

There's also the following note:

It is advised to return a Publisher derived from the original Flux in all cases

But it follows with another risk that you are not actually running into since you complete the execution. That's when the source is actually cancelled!

I propose two follow up tasks:

  1. Propagate onError to the late subscriber in case of subscribing to a cancelled intermediate result (I believe this is a bug on our part).
  2. Improve the Javadoc. Not sure yet about the right wording.

@chemicL chemicL added the status/need-investigation This needs more in-depth investigation label Nov 20, 2024
@chemicL chemicL self-assigned this Nov 20, 2024
@yawkat
Copy link
Author

yawkat commented Nov 20, 2024

@chemicL I understand why it breaks the way it does from reading the code, but I disagree that this behavior is forbidden by the javadoc. The note about deriving the result of the lambda from the given publisher just says "strongly advised", it doesn't say it's necessary, and the javadoc makes it appear like this is because you may leave a dangling flux, which the code above does not.

Also, I just tried this with io.projectreactor:reactor-core:3.3.0.RELEASE and this issue does not happen, so this is technically a regression.

@chemicL
Copy link
Member

chemicL commented Nov 20, 2024

Thanks, that's interesting, I wasn't aware. 3.3.0.RELEASE was released over 5 years ago and is no longer supported in the OSS. We currently support 3.6 and 3.7. I just checked and 3.6.0 behaves in the same way for this particular scenario. There have been some major changes implemented since 3.3.0.RELEASE:

With a whole rewrite happening in 3.4.6:

And from my POV a regression is introduced in the following fix for another issue: https://github.com/reactor/reactor-core/pull/2794/files#diff-ca833c86dffdc3cb95fd2ead3626c836e9951f7b01a38163215ad483479916a3L654-R660

All in all, I think the time to report regressions between 3.3 and 3.4 is behind us. @yawkat Let's focus on the problem you are trying to solve. Can you be more specific and perhaps we can work out a way to achieve the goals you have with the current behaviour?

@yawkat
Copy link
Author

yawkat commented Nov 20, 2024

In Micronaut HTTP, when there is a controller like Publisher<byte[]> controller() { ... }, we want to be able to have special handling for the case where the Publisher immediately returns an error when we can still send a full error response, as opposed to an error that happens after the first byte[] has been sent, when we can't change the response status or headers anymore. So I tried to use switchOnFirst for that, for the special error handling.

I don't need a workaround anymore, I implemented my own processor for it: https://github.com/micronaut-projects/micronaut-core/blob/126058cf5fd16e7ac565ec0e70840799f46a52a1/core-reactive/src/main/java/io/micronaut/core/async/subscriber/LazySendingSubscriber.java

Just thought this might still be worth fixing.

@chemicL
Copy link
Member

chemicL commented Nov 21, 2024

Thanks for the additional context. With that, I'll inline my observations about the initial example you provided as comments:

        Flux<String> input = Flux.just("foo", "bar");

        // Mono.from causes the execution to cancel the provided Publisher once the first item is emitted
        Mono<Flux<String>> nested = Mono.from(input.switchOnFirst((first, all) -> Mono.just(all)));

        // block() waits for the Mono to complete -> once the first item is retrieved, the source is cancelled
        // meaning that SwitchOnFirstMain is cancelled
        Flux<String> item = nested.block();
        System.out.println(item); // prints SwitchOnFirstMain as expected

        // collectList().block() subscribes to a cancelled SwitchOnFirstMain,
        // it is a bug that it doesn't terminate but waits forever.
        System.out.println(item.collectList().block()); // never terminates as the item has been cancelled

So essentially it's not really the effect of SwitchOnFirstMain being unusable outside of the scope of the lambda as I initially thought.

Let's apply the first mitigation so that the cancellation does not happen, Mono<Flux<String>> -> Flux<Flux<String>>:

        Flux<Flux<String>> nested = Flux.from(input.switchOnFirst((first, all) -> Mono.just(all)));
        // now we use blockLast to wait for the last item of the Flux
        Flux<String> item = nested.blockLast();

Now, from the collectList().block() you'll observe the following:

Exception in thread "main" java.util.concurrent.CancellationException: FluxSwitchOnFirst has already been cancelled
	at reactor.core.publisher.FluxSwitchOnFirst$AbstractSwitchOnFirstMain.subscribe(FluxSwitchOnFirst.java:724)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
	at reactor.core.publisher.Mono.block(Mono.java:1778)
	at reactor.core.publisher.Flux.main(Flux.java:138)
	Suppressed: java.lang.Exception: #block terminated with an error

And that is expected as I commented before, unless you use the switchOnFirst overload which accepts a boolean cancelSourceOnComplete:

        input.switchOnFirst((first, all) -> Mono.just(all), false)

This will successfully print:

[foo, bar]

As an alternative to using Flux instead of Mono consider using .single():

        Flux<String> input = Flux.just("foo", "bar");
        Mono<Flux<String>> nested =
                input.switchOnFirst((first, all) -> Mono.just(all), false).single();
        Flux<String> item = nested.block();
        System.out.println(item);
        System.out.println(item.collectList().block());

chemicL added a commit that referenced this issue Nov 21, 2024
In a situation where `SwitchOnFirstMain` is subscribed to after the
operator itself has terminated, the expectation is that either an error
is propagated due to cancellation of the source or the first actual
subscription succeeds. Due to a regression introduced in #2794 the
subscription would hang with no termination. This change addresses the
issue and the subscrption properly terminates.

Related to #3936.

Signed-off-by: Dariusz Jędrzejczyk <dariusz.jedrzejczyk@broadcom.com>
chemicL added a commit that referenced this issue Nov 21, 2024
In a situation where `SwitchOnFirstMain` is subscribed to after the
operator itself has terminated, the expectation is that either an error
is propagated due to cancellation of the source or the first actual
subscription succeeds. Due to a regression introduced in #2794 the
subscription would hang with no termination. This change addresses the
issue and the subscrption properly terminates.

Related to #3936.

Signed-off-by: Dariusz Jędrzejczyk <dariusz.jedrzejczyk@broadcom.com>
@chemicL chemicL added status/invalid We don't feel this issue is valid, or the root cause was found outside of Reactor and removed status/need-investigation This needs more in-depth investigation labels Nov 21, 2024
@chemicL
Copy link
Member

chemicL commented Nov 21, 2024

Pushed the fix to the hanging behaviour. With the above explanation I think the issue can be closed. @yawkat hope this helps. Please don't hesitate to open a new issue in case something else is fishy.

@chemicL chemicL closed this as completed Nov 21, 2024
@yawkat
Copy link
Author

yawkat commented Nov 21, 2024

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status/invalid We don't feel this issue is valid, or the root cause was found outside of Reactor
Projects
None yet
Development

No branches or pull requests

2 participants