-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(catchError): simplify implementation #5666
Conversation
1bfea4d
to
e944f45
Compare
src/internal/operators/catchError.ts
Outdated
} | ||
try { | ||
const nextSource = from(selector(err, catchError(selector)(source))); | ||
subscription = nextSource.subscribe(subscriber); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm reasonably confident that I could write a test to make this fail. What would happen if a source emitted an error
notification synchronously upon subscription? It looks to me like the subscription
to nextSource
would be overwritten by the return value from the outer subscribe
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would happen if a source emitted an error notification synchronously upon subscription?
OMG! You are right! I can't believe that I felt for that one 😅. I just fixed it, thanks!
However, I'm still puzzled by the fact that this test was passing 🤔:
it("should stop listening to a synchronous observable when unsubscribed", () => {
const sideEffects: number[] = [];
const synchronousObservable = concat(
defer(() => {
sideEffects.push(1);
return of(1);
}),
defer(() => {
sideEffects.push(2);
return of(2);
}),
defer(() => {
sideEffects.push(3);
return of(3);
})
);
throwError(new Error("Some error"))
.pipe(
catchError(() => synchronousObservable),
takeWhile(x => x != 2) // unsubscribe at the second side-effect
)
.subscribe(() => {
/* noop */
});
expect(sideEffects).to.deep.equal([1, 2]);
});
I want to look into that because I think that it's important to have a test that catches that mistake. Thanks @cartant !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't believe it... Even the following test passes with my original implementation 🤯
it(
'should call the unsubscribe function of the catched observable when the source throws synchronously',
(done: MochaDone) =>~
{
let clean = false;
const testCleanup$ = new Observable(() => () => {
clean = true;
})
const subscription = throwError(new Error('Some error')).pipe(
catchError(() => defer(() => testCleanup$)),
).subscribe()
expect(clean).to.equal(false);
setTimeout(() => {
subscription.unsubscribe()
expect(clean).to.equal(true);
done()
}, 100)
});
What's even more surprising is that in order to understand what's happening I tested all the tests against the following implementation and all the tests pass (including the one above):
export function catchError<T, O extends ObservableInput<any>>(
selector: (err: any, caught: Observable<T>) => O
): OperatorFunction<T, T | ObservedValueOf<O>> {
return (source: Observable<T>) =>
lift(source, function(this: Subscriber<T>, source: Observable<T>) {
// notice how we are not returning anything
source.subscribe(
new CatchErrorSubscriber(this, err => {
try {
from(selector(err, catchError(selector)(source))).subscribe(this);
} catch (selectorErr) {
this.error(selectorErr);
}
})
);
});
}
I think that this has to do with the fact that with that ^^ implementation when we subscribe we are creating a new instance of CatchErrorSubscriber
.
In fact, I'm going to push these changes into this branch, just to make sure that the CI passes for real and that this is not an issue with my local env.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually hate that test with the concat(defer, defer, defer)
. It should probably be just a plain observable with a loop, similar to "firehose" tests @cartant recently added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually hate that test with the concat(defer, defer, defer). It should probably be just a plain observable with a loop, similar to "firehose" tests @cartant recently added.
Totally agree. In fact I'm pretty sure that test is redundant because unless I'm mistaken the "firehose" test is already covering that case, right?
da5d72a
to
364567a
Compare
src/internal/operators/catchError.ts
Outdated
return subscription; | ||
return () => { | ||
subscription!.unsubscribe(); | ||
subscription = sourceSubscription = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FTR I'm setting these subscriptions to null out of "tradition"/"superstition". I would really appreciate it if someone could explain to me why this is necessary in order to avoid memory-leaks 🙏
I guess that it must have to do with the lift
util? I mean, if we were doing new Observable
instead, then would this be necessary in order to avoid memory-leaks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Subscriptions are often subscribers, and historically, our subscribers are these OOP monstrosities that have a lot of state living in properties. There's this practice throughout the library mostly to make sure things that are no longer being used get orphaned for GC. That said, some of it is probably superstition. I haven't examined this example too closely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@josepot Regarding the nulling out, the shareReplay
refactor/simplification you did a while ago touched on code where the nulling was important - as the subscription was not held on a per-subscriber basis. Nulling out is far less of an issue - as Ben has mentioned - when the reference is per-subscriber.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you both @benlesh and @cartant for taking the time to answer this question, and I apologize if I framed the question in a way that it sounded like I was being judgemental. I understand that it's better to be cautious than to be sorry.
Nulling out is far less of an issue - as Ben has mentioned - when the reference is per-subscriber.
That's what I thought, and yep, in the case of the shareReplay
I could see why the cleanup was important in there. Once again: thank you both for the detailed explanation, much, much appreciated.
1c04b34
to
432488c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we can merge this.
The problem with the new implementation is that, in the synchronous case, finalize
will now happen after everything else... for example:
const source = new Observable(subscriber => {
for (let i = 0; !subscriber.closed && i < 10; i++) {
subscriber.next(i);
}
subscriber.complete();
});
let i = 0;
source.pipe(
tap(n => {
if (n > 2) { throw 'bad!' }
}),
finalize(() => console.log('finalized')),
catchError((err, caught) => {
if (i++ < 3) {
return caught;
}
throw err;
})
)
.subscribe({
next: console.log,
error: console.log
});
SHOULD log:
0
1
2
finalized
0
1
2
finalized
0
1
2
finalized
0
1
2
bad!
finalized
But with your code, I suspect that it will look like:
0
1
2
0
1
2
0
1
2
0
1
2
bad!
finalized
finalized
finalized
finalized
I'm not sure if there's a test that covers this though, which might be a better add. Along with the other test you've added.
Nice! Thanks for spotting that @benlesh , I will add a test for that. |
432488c
to
eaa5367
Compare
I've addressed the comments. There are 2 commits in this PR:
It goes without saying that I don't mind at all dropping the second commit if you prefer the current implantation. It's just a suggestion: if you like it "yay!", if you don't I totally get it, of course 🙂 . Also, maybe it would be better to create a separate PR just for the improvements of the tests? |
5893948
to
3634261
Compare
src/internal/operators/catchError.ts
Outdated
} | ||
try { | ||
nextSource = from(selector(err, catchError(selector)(source))); | ||
tryToSwapSubscription(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's another interesting semantic difference here. Any error that occurs in tryToSwapSubscription()
will be caught and sent down the error
channel... I'm not sure what this could mean. In the event that someone is using the deprecated sync error throwing (set in by config
), it's possible that they have a synchronous error thrown during this subscription... I'm not sure what the behavior in this case here would be.. (And I'm equally unsure what the behavior would be in the previous code).
I guess I would expect it to send down the error channel as you do here, so this is probably correct. It's just something to think about.
Probably nothing we should be concerned with, I suppose. But I wanted to note the concern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! We're getting closer.
src/internal/operators/catchError.ts
Outdated
|
||
return subscription; | ||
return () => subscription.unsubscribe(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should just return the subscription
here, TBH. In master, we only just recently stopped wrapping these sorts of functions in Subscriptions. But either way, you're wrapping a subscription in a function, which isn't necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right! I thought that it was necessary because the subscription is being reassigned on the tryToSwapSubscription
function. However, now that I have a closer look at it, that reassignment is not necessary. Nice!
expect(clean).to.equal(true); | ||
}); | ||
|
||
it('should not alter the order of the teardown logic of the subscription chain', () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is a little too clever.
I'd prefer that things were more clearly named. For example, you're leveraging i
in that for loop, but from the name it's not clear what the intended use of i
is, or why it lives outside of the for
declaration. You have to study the code to realize its part of the test, because you're throwing if you "make it through the loop". That test is probably unnecessary BTW.
Basically just an of(1, 2, 3, 4, 5, 6, 7)
or something would probably be fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I got a bit lazy and I just made a test out of your comment 🙈 I will improve that, for sure. Thanks!
That test is probably unnecessary BTW.
I don't know, I think that it doesn't hurt. With my previous implementation all the tests were passing, but this test would have failed... Let me try to make it more explicit and understandable.
19b3ee4
to
818fc97
Compare
I've addressed the comments. Once again, thank you both for the reviews. I'm learning a lot! |
a7e9cd3
to
e42249a
Compare
LGTM... I'm going to bump this to @cartant since he reviewed this once earlier as well. |
I'm going to block this, for the moment. I started reviewing it and AFAICT, there is a problem with this bit. I cannot see how |
e42249a
to
f68afea
Compare
You are right. It is more confusing. I agree, my apologies.
I made that change when I was addressing this comment. I remember that when I saw that comment I thought: "did Ben realize that I'm swapping the subscription? He probably missed that, because if I just return In retrospective I should have asked for further clarification on that comment. I just put the code back to what it was before I addressed that comment. I think that implementation is a bit simpler and clearer, but this is something very subjective. So, if you don't see it like that, I totally understand it. |
f68afea
to
1ef63e2
Compare
Oh... yup, @cartant has a good catch there. I think I'm inclined to agree with him that this implementation is also more confusing than the previous implementation. In particular, the bit about the handling of synchronous unsubscription and errors is a bit clearer with what we currently have in master... although, if I were a better developer, I'd have left more comments about why that is there. 🤔 I'm sorry, @josepot... this was good work, it really was. And I'm very glad you explored a different avenue of implementing this operator, but I can't see a reason we'd merge this at this time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No changes requested... just removing approval so I doesn't show up in my merge list.
No worries at all. I'm closing this. Thank you both! If you want me to open a different PR that includes just the commit with the tests, let me know it. |
Much respect, @josepot. Really. ❤️ |
Description:
After I saw #5665 I decided to have a look at the implementation of
catchError
, and when I did I thought that it could be simplified. So, I wrote a simpler implementation and with this implementation all the tests pass andon top of that it also fixes the issue described on #5665I guess that I should also write a test that reproduces the issue described on #5665, but I couldn't come up with a good description for a test like that 🤔. Maybe tomorrow I will be more inspired 😄Related issue (if exists):
#5665
EDIT
It turns out that the the issue described on #5665 was fixed by #5627. So, this PR doesn't fix anything. However, it does simplify the code quite a bit IMO.