Skip to content

Commit

Permalink
fix(Subscription): add will return Subscription that removes itse…
Browse files Browse the repository at this point in the history
…lf when unsubscribed

There was a problem where subscriptions added to parent subscriptions would not remove themselves when unsubscribed, This meant that when Actions (which are subscriptions) were unsubscribed, they would still hang out in the _subscriptions list and live in memory when we didn't want them to

related #2244
  • Loading branch information
benlesh committed Jan 3, 2017
1 parent 6922b16 commit 375d4a5
Showing 3 changed files with 45 additions and 20 deletions.
14 changes: 9 additions & 5 deletions spec/Subscription-spec.ts
Original file line number Diff line number Diff line change
@@ -112,15 +112,19 @@ describe('Subscription', () => {
expect(isCalled).to.equal(true);
});

it('Should returns the passed one if passed a unsubscribed AnonymousSubscription', () => {
const sub = new Subscription();
it('Should wrap the AnonymousSubscription and return a subscription that unsubscribes and removes it when unsubbed', () => {
const sub: any = new Subscription();
let called = false;
const arg = {
isUnsubscribed: true,
unsubscribe: () => undefined,
unsubscribe: () => called = true,
};
const ret = sub.add(arg);

expect(ret).to.equal(arg);
expect(called).to.equal(false);
expect(sub._subscriptions.length).to.equal(1);
ret.unsubscribe();
expect(called).to.equal(true);
expect(sub._subscriptions.length).to.equal(0);
});

it('Should returns the passed one if passed a AnonymousSubscription having not function `unsubscribe` member', () => {
35 changes: 29 additions & 6 deletions src/Subscription.ts
Original file line number Diff line number Diff line change
@@ -40,6 +40,8 @@ export class Subscription implements ISubscription {
*/
public closed: boolean = false;

private _subscriptions: ISubscription[];

/**
* @param {function(): void} [unsubscribe] A function describing how to
* perform the disposal of resources when the `unsubscribe` method is called.
@@ -74,7 +76,10 @@ export class Subscription implements ISubscription {
let trial = tryCatch(_unsubscribe).call(this);
if (trial === errorObject) {
hasErrors = true;
(errors = errors || []).push(errorObject.e);
errors = errors || (
errorObject.e instanceof UnsubscriptionError ?
flattenUnsubscriptionErrors(errorObject.e.errors) : [errorObject.e]
);
}
}

@@ -92,7 +97,7 @@ export class Subscription implements ISubscription {
errors = errors || [];
let err = errorObject.e;
if (err instanceof UnsubscriptionError) {
errors = errors.concat(err.errors);
errors = errors.concat(flattenUnsubscriptionErrors(err.errors));
} else {
errors.push(err);
}
@@ -140,18 +145,20 @@ export class Subscription implements ISubscription {
sub = new Subscription(<(() => void) > teardown);
case 'object':
if (sub.closed || typeof sub.unsubscribe !== 'function') {
break;
return sub;
} else if (this.closed) {
sub.unsubscribe();
} else {
((<any> this)._subscriptions || ((<any> this)._subscriptions = [])).push(sub);
return sub;
}
break;
default:
throw new Error('unrecognized teardown ' + teardown + ' added to Subscription.');
}

return sub;
const childSub = new ChildSubscription(sub, this);
this._subscriptions = this._subscriptions || [];
this._subscriptions.push(childSub);
return childSub;
}

/**
@@ -179,3 +186,19 @@ export class Subscription implements ISubscription {
}
}
}

export class ChildSubscription extends Subscription {
constructor(private _innerSub: ISubscription, private _parent: Subscription) {
super();
}

_unsubscribe() {
const { _innerSub, _parent } = this;
_parent.remove(this);
_innerSub.unsubscribe();
}
}

function flattenUnsubscriptionErrors(errors: any[]) {
return errors.reduce((errs, err) => errs.concat((err instanceof UnsubscriptionError) ? err.errors : err), []);
}
16 changes: 7 additions & 9 deletions src/scheduler/VirtualTimeScheduler.ts
Original file line number Diff line number Diff line change
@@ -54,15 +54,13 @@ export class VirtualAction<T> extends AsyncAction<T> {
}

public schedule(state?: T, delay: number = 0): Subscription {
return !this.id ?
super.schedule(state, delay) : (
// If an action is rescheduled, we save allocations by mutating its state,
// pushing it to the end of the scheduler queue, and recycling the action.
// But since the VirtualTimeScheduler is used for testing, VirtualActions
// must be immutable so they can be inspected later.
<VirtualAction<T>> this.add(
new VirtualAction<T>(this.scheduler, this.work))
).schedule(state, delay);
if (!this.id) {
return super.schedule(state, delay);
}

const action = new VirtualAction(this.scheduler, this.work);
this.add(action);
return action.schedule(state, delay);
}

protected requestAsyncId(scheduler: VirtualTimeScheduler, id?: any, delay: number = 0): any {

0 comments on commit 375d4a5

Please sign in to comment.