Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.5 Why isn't it required to cancel the active Subscription? #317

Closed
anthonyvdotbe opened this issue Apr 7, 2016 · 7 comments
Closed

2.5 Why isn't it required to cancel the active Subscription? #317

anthonyvdotbe opened this issue Apr 7, 2016 · 7 comments
Labels

Comments

@anthonyvdotbe
Copy link
Contributor

Rule 2.5 states: A Subscriber MUST call Subscription.cancel() on the given Subscription after an onSubscribe signal if it already has an active Subscription.

Why doesn't it additionally state that the Subscriber MUST call Subscription.cancel() on the active Subscription as well?

Say I have Subscriber S, with an active Subscription for Publisher P1. Now if Publisher P2 invokes onSubscribe on S:

  • S MUST cancel the Subscription for P2 (rule 2.5)
  • but P2 is still allowed to send S an onComplete or onError signal, even though S never called request on its Subscription (rules 2.9 and 2.10)
  • now suppose P2 sends an onError signal, this causes several issues:
    • S has no way of knowing whether this signal comes from P1 or P2, so what should it do?
    • P1 may later send an onComplete/onError signal, thereby further confusing S
    • the onError signal may arrive concurrently with another signal from Publisher P1, causing thread-safety issues

So I believe that, as soon as onSubscribe is invoked on a Subscriber with an active Subscription, the Subscriber is broken and both the new and active Subscription must immediately be cancelled.

@akarnokd
Copy link
Contributor

akarnokd commented Apr 7, 2016

Even if they are both cancelled, the onError/onComplete may still fire. In addition, cancel() is best effort so normal onNext values may slip through.

In Rsc/Reactor, we choose to cancel the incoming second Subscription only because almost all operators check for cancellation before sending out any onXXX event.

In theory, you could switch the Subscriber into a failed state if a second subscription comes in, but that requires a half-serialized subscriber which adds a lot of overhead for well behaving sequences.

@viktorklang
Copy link
Contributor

@akarnokd nails the reply here.

As for backstory, in the beginning there was a proposal to have all the signals have the Subscription as a parameter, this not only meant that the Subscriber could be immutable but also you could do demuxing.

The massive downside of that proposal is given that Reactive Streams is a communications protocol, it would mean that for every element passed into onNext you'd have to transfer/include a Subscription identifier, massively augmenting the payload.

@anthonyvdotbe
Copy link
Contributor Author

Thanks, that makes sense. But then why not change rule 1.9 to something like the following?

Publisher.subscribe MUST call onSubscribe on the provided Subscriber prior to any other signals to that Subscriber. Moreover, if the Subscription was cancelled by the Subscriber during the execution of onSubscribe, it is REQUIRED that no further signals occur. Publisher.subscribe MUST return normally, except [...]

It seems like this wouldn't be too much asked of the Publisher, while at the same time guaranteeing that a Subscriber could properly fend off an intruding Publisher as explained by @akarnokd.

@viktorklang
Copy link
Contributor

How would the Publisher know that the Subscription was cancelled during the execution of onSubscribe?

@anthonyvdotbe
Copy link
Contributor Author

Maybe I'm missing something, but wouldn't the Publisher only have to check this in case it wants to send an onComplete/onError signal in its subscribe method (in all other cases the first call to request would immediately notice the Subscription was cancelled & not send any signals)? This could be done, for example, by using a decorator class such as the following:

class SubscriptionDecorator implements Subscription {

    private final Subscription delegate;
    private volatile boolean cancelled;

    SubscriptionDecorator(Subscription delegate) {
        this.delegate = delegate;
    }

    boolean isCancelled() {
        return cancelled;
    }

    public void cancel() {
        if (!cancelled) {
            delegate.cancel();
            this.cancelled = true;
        }
    }

    public void request(long n) {
        if (!cancelled) {
            delegate.request(n);
        }
    }

}

and then a Publisher which completes immediately could be implemented as:

class EmptyPublisher implements Publisher<T> {

    public void subscribe(Subscriber<? super T> s) {
        SubscriptionDecorator decorator = new SubscriptionDecorator(...);
        s.onSubscribe(decorator);
        if (!decorator.isCancelled()) {
            s.onComplete();
        }
    }

}

@akarnokd
Copy link
Contributor

Yep, that's how it usually looks. (You should swap the cancelled = true with the call to delegate to avoid reentrancy problems)

@anthonyvdotbe
Copy link
Contributor Author

Closing this issue, since the original question was answered & I've created follow-up issue #325 for the proposal to change rule 1.9.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants