-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(delay): fix memory leak #3605
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR probably isn't the right solution.
- The issue it's referencing isn't quite refined enough to be sure it's an issue with delay.
- Making changes to the VirtualTimeScheduler in this case is a big no-no, as that can effect almost all tests in our test suite. :)
- The fix that is here may have issues when simultaneous delays are scheduled.
@@ -24,7 +24,15 @@ export class VirtualTimeScheduler extends AsyncScheduler { | |||
const {actions, maxFrames} = this; | |||
let error: any, action: AsyncAction<any>; | |||
|
|||
while ((action = actions.shift()) && (this.frame = action.delay) <= maxFrames) { | |||
while (actions.length > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To fix delay, we shouldn't have done anything to the VirtualTimeScheduler, changes to the virtual scheduler will affect all marble tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change has nothing to do with the fix. It's just I wanted to test that the last action is correctly unsubscribed and when calling scheduler.flush() it removed the action that wasn't executed
actions.length === 1
maxFrames === 60
action.delay === 80
(action = actions.shift()) && (this.frame = action.delay) <= maxFrames
true && false
// no execution
actions.length === 0 // same as if action would execute
src/operators/delay.ts
Outdated
@@ -79,7 +79,7 @@ interface DelayState<T> { | |||
*/ | |||
class DelaySubscriber<T> extends Subscriber<T> { | |||
private queue: Array<DelayMessage<T>> = []; | |||
private active: boolean = false; | |||
private nextAction: Subscription = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm suspicious this will have issues in situations where more than one delayed action is dispatched synchronously. For example of(1, 2, 3).pipe(delay(10))
or the like. You'd have to keep more than one subscription for each scheduled action.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original code would only schedule one action. this.active
would be set to true
and additional messages would be buffered until dispatch. On dispatch the queue would be processed... etc. I hope I am interpreting it correctly...
I think this code does the same except it doesn't register unsubscription logic that is the cause of the leak.
@benlesh I'll review this later today. If |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it looks like you've found a bug. Thanks for the work you've put into this.
I can see how your fix works, but I think there is a simpler alternative.
The leak appears to arise from the action being added the the DelaySubscriber
's subscriptions without ever being removed. And you have fixed this by storing the subscription (the action) in a property - which means you also have to implement unsubscribe
to take this into account.
IMO, the fix should be a one-liner: just add source.remove(this);
after source.active = false;
in DelaySubscriber#dispatch
.
And you should not need to modify the VirtualTimeScheduler
to test this. A test something like this should do it:
let subscriber: any = null;
let subscribeSpy: any = null;
const counts: number[] = [];
const duration = time('-|');
const result = timer(duration).pipe(
repeatWhen(notifications => {
const delayed = notifications.pipe(delay(duration));
subscribeSpy = sinon.spy(delayed["source"], "subscribe");
return delayed;
}),
skip(1),
take(5),
tap(() => {
const [[subscriber]] = subscribeSpy.args;
counts.push(subscriber._subscriptions.length);
})
);
expectObservable(result).toBe(/* work out the expected diagram */);
expect(counts).to.deep.equal([1, 1, 1, 1, 1]);
And, ideally, the (failing) test should be added in a separate commit that precedes the fix.
src/operators/delay.ts
Outdated
@@ -96,7 +96,8 @@ class DelaySubscriber<T> extends Subscriber<T> { | |||
const delay = Math.max(0, queue[0].time - scheduler.now()); | |||
this.schedule(state, delay); | |||
} else { | |||
source.active = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the fix is a one-liner and source.remove(this);
should be added here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I've totally missed that. Will fix the PR. Thanks.
Thank you, @cartant, for taking the time to review this in more detail. |
51995e0
to
0ced8f3
Compare
f6810fa
to
06db858
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. However, I've suggested a couple of minor changes.
I should have suggested this.unsubscribe()
be used in the first place - apologies for the hassle - but it wasn't until I was trying to come up with an accurate test description for the test that I realised that this.unsubscribe()
was more idiomatic and was the way to go. (The behaviour will be the same, as the implementation of Subscription#unsubscribe
will remove the subscription from its parent.)
src/internal/operators/delay.ts
Outdated
@@ -92,6 +92,7 @@ class DelaySubscriber<T> extends Subscriber<T> { | |||
const delay = Math.max(0, queue[0].time - scheduler.now()); | |||
this.schedule(state, delay); | |||
} else { | |||
source.remove(this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've changed my mind with this. I think this.unsubscribe()
instead of source.remove(this)
would be better. The effected behaviour will be identical, but the former is more idiomatic.
spec/operators/delay-spec.ts
Outdated
@@ -143,4 +146,33 @@ describe('Observable.prototype.delay', () => { | |||
|
|||
expectObservable(result).toBe(expected); | |||
}); | |||
|
|||
it('should unsubscribe scheduled action when result is unsubscribed explicitly', () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One minor nit: the description doesn't actually describe the problem. The leak happens because the actions are not removed/unsubscribed from the subscriber's internal subscriptions after the action is executed. I think a more accurate description would be something like should unsubscribe scheduled actions after execution
.
The actions will be unsubscribed when the subscriber unsubscribes; it's just that they will accumulate up until that happens.
06db858
to
8afb492
Compare
@cartant All should be fixed now. I've learned a lot finding and fixing this bug. Thanks for your time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. LGTM.
Thank you, @ubnt-michals! (and thanks for the review @cartant ) |
* test(delay): failing test * fix(delay): fix memory leak
* test(delay): failing test * fix(delay): fix memory leak
This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs. |
Description:
Hopefully fix leaking memory for delay operator
EDIT:
I had to bend the VirtualTimeScheduler to not
shift
the last action in order to properly test this. This could be potential breaking change if left as it is. I am not sure how to test the case without the change though. Any help is appreciated.Related issue (if exists):
#3604