-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(subscribe): prevent swallowing of errors when stopped #3469
Conversation
src/internal/Subscriber.ts
Outdated
* subscribers and uses the appropriate unhandled mechanism to report the | ||
* error if a stopped subscriber is found. | ||
*/ | ||
reportError(err?: any): void { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be private or protected. And ideally named with _
prefix to discourage external use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. It's now private and has a leading _
. However, its being private means that a cast to any
is required in _trySubscribe
and in the tests.
src/internal/Subscriber.ts
Outdated
reportError(err?: any): void { | ||
let observer: PartialObserver<T> = this; | ||
while (observer) { | ||
if (observer instanceof Subscriber) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about this check. When would this
not be the Subscriber? If for some reason it gets wrapped in a SafeSubscriber
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this
will always be a Subscriber
. It's just that observer
is the iteration variable. PartialObserver<T>
is the type of the destination
property - which is assigned to observer
during the iteration.
The iteration starts with this
so that its isStopped
property is checked. And it's all done in the one loop so that the hostReportError
call, etc. is just in the one place.
src/internal/Subscriber.ts
Outdated
while (observer) { | ||
if (observer instanceof Subscriber) { | ||
const { destination } = observer; | ||
if (observer.isStopped) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like this could pass if someone just calls error
twice in a row on Subscriber, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The existing mechanism for guarding against re-enterant calls to error
should not be compromised. _reportError
is called only from within the catch
inside _trySubscribe
.
Even if a caller subverts the private
qualification (added per request above) and calls _reportError
instead of error
, a Subscriber
in the chain will still become stopped and further calls to either _reportError
or error
will not see the observer's error callback invoked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wrong. The implementation needs to set isStopped
to true
for the subscriber at the root of the each of the subscribers in the destination chain. I'll do that and add another test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated the implementation so that error
is called on the chain's root subscriber. This will see calls to error
flow through the chain of subscribers and each will be marked as stopped.
Even if a particular subscriber is used as a destination for another subscriber that itself is not part of the chain, the observer's error callback for that particular subscriber will be invoked only once. So the existing mechanism for guarding against re-enterant calls to error
should not be compromised.
_reportError
is called only from within the catch
inside _trySubscribe
, but even if a caller subverts the private
qualification (added per request above) and calls _reportError
instead of error
, all of the subscribers in the chain will still become stopped and further calls to either _reportError
or error
will not see the observer's error callback invoked.
@benlesh I'm finished making changes that relate to the review's comments. |
It occurred to me that I've updated the PR to do this. |
If _trySubscribe catches an error and the sink - or one of its destinations - is stopped and the sink's error method is called, the error is swallowed - as it cannot be reported via the observer's callback. This commit adds an internal reportError method to Subscriber. The method defers to the appropriate error reporting mechanism if a stopped subscriber is found. Closes ReactiveX#3461
Actually, the |
Yes, I see what you mean. The implementation in this PR would have to use the |
My take on it is that Technically, using |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this is the right fix, now that I'm looking at it. See my other comment.
} else { | ||
this.error(err); | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like this loop will continue to hit hostReportError over and over for every level at which a subscriber isStopped
. So if it has this.destination.destination.destination and all are stopped, the same error will be reported 3 or 4 times.
Also, I'm trying to understand the bug this is solving. The isStopped
flag is meant to guard against code calling subscriber.complete(); subscriber.error(new Error());
or calling error
twice, etc.
It seems like the real fix might be to figure out why the Subscriber is prematurely stopped in the first place. Sorry, I should have caught that in an earlier review. Perhaps I missed something in one of your earlier comments?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I should have caught that in an earlier review.
No worries. With PRs like this - where there isn't a prior, agreed-upon solution that was discussed in an issue - I'm aware that it could all be for nought. It could be the wrong approach or could just be plain wrong, etc. I probably should have mentioned that at the outset, as it's never fun having to tell someone you don't want to merge a PR.
Anyway, this PR is far from urgent.
Also, I'm trying to understand the bug this is solving.
The swallowing of the error is problem that it solves. If an error is thrown after a subscriber becomes stopped - as in #3444 - _trySubscribe
attempts to report it via which error
is pointless. The motivation for the PR was the appearance of the bug in #3444 in 5.5.6 and its disappearance in the beta. I thought that if a change to the sync error handling was made in 5.5.6 - to better report errors - it would be preferable if the beta didn't go back to swallowing them.
I guess swallowed errors are a pet peeve of mine.
It seems like the real fix might be to figure out why the Subscriber is prematurely stopped in the first place.
In #3444, the reason the subscriber became stopped was due to notifyComplete
being called by the inner subscriber. The notifyComplete
implementation called super.complete()
and stopped the subscriber. However, with a synchronous notifier there was more code that ran after that and there was a bug in that code.
It might be possible to re-arrange repeatWhen
so that becoming stopped is guaranteed to be the very last thing that occurs, but situations in which errors are swallowed could re-arise if someone were to write a wierd, user-land repeat-ish operator.
I imagine that the types of errors that this PR would expose - that would otherwise be swallowed - are very rare. I doubt there are too many operators like repeatWhen
that have to fudge with the forwarding of complete
notifications.
it looks like this loop will continue to hit
hostReportError
over and over for every level at which a subscriberisStopped
I don't believe that's the case; the return
will exit the function and, as it doesn't re-throw, there's no error to be caught by any outer subscribe
calls. Basically, there are three ways out of the loop:
- the old, sync error behaviour of re-throwing the error;
- calling
hostReportError
and returning; - calling
error
and returning.
Of those three, I'm quite sure that the last two are correct. However, I'm less certain about the first: the throw
. I think it's okay, but I will need to give it some more thought.
I think the core team will have to have a discussion about what Subscribers should do when Until then I think we can either block or close this PR, as I don't think I can land it in v6 and it would most definitely be a breaking change after that. |
Yeah, in light of #3487 - which is almost the same, but with a completed/closed subject instead of a completed/stopped subscriber - it would make a lot of sense to close this PR and institute an across-the-board policy for the reporting of errors if an observer's If the deprecated sync error handling is going to be removed in v7, calling |
#3487 was a little different, as it has to do with the specific implementation of bindCallback, but reporting errors to hostReportErrors in situations where error couldn't be called would definitely help there, yes. |
I'm really only talking about errors caught with a The suggestion is that a specific function could be called in those situations and said function could decide whether or not to report via the observer or the host. Something like this: export function reportError(err: any, observer: ErrorObserver<any>): void {
if (canReportError(observer)) {
observer.error(err);
} else {
hostReportError(err);
}
}
function canReportError(observer: ErrorObserver<any>): boolean {
const { closed, destination, isStopped } = observer as any;
if (closed || isStopped) {
return false;
} else if (destination) {
return canReportError(destination);
}
return true;
} It'd need better type checking, but that's the basic concept. Changing the behaviour of |
have the same thought like @cartant {
next: () => {
// throw some error here
},
error: (err) => {
// won't trigger, but unsubscribe parent observables
},
} |
Okay, going to close this one for now, pending further discussion. |
This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs. |
Description:
If
_trySubscribe
catches an error and the sink - or one of its destinations - is stopped and the sink'serror
method is called, the error is swallowed - as it cannot be reported via the observer's callback. This commit adds an internalreportError
method toSubscriber
. The method defers to the appropriate error reporting mechanism if a stopped subscriber is found.For an example scenario of an error being swallowed due to a bug and a closed subscriber (that has not previously been notified of any error) see #3444.
Related issue (if exists): #3461