Skip to content

Commit

Permalink
fix(bufferTime): handle closing context when synchronously unsubscribed
Browse files Browse the repository at this point in the history
closes #1763
  • Loading branch information
kwonoj committed Jun 12, 2016
1 parent 0a6c4e8 commit 4ce4433
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 3 deletions.
16 changes: 16 additions & 0 deletions spec/operators/bufferTime-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -284,4 +284,20 @@ describe('Observable.prototype.bufferTime', () => {
expectObservable(result).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should not throw when subscription synchronously unsubscribed after emit', () => {
const e1 = hot('---a---b---c---d---e---f---g-----|');
const subs = '^ !';
const t = time( '----------|');
const expected = '----------w---------(x|)';
const values = {
w: ['a', 'b'],
x: ['c', 'd', 'e']
};

const result = e1.bufferTime(t, null, Number.POSITIVE_INFINITY, rxTestScheduler).take(2);

expectObservable(result).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(subs);
});
});
10 changes: 7 additions & 3 deletions src/operator/bufferTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,15 +183,19 @@ class BufferTimeSubscriber<T> extends Subscriber<T> {
}

openContext(): Context<T> {
let context: Context<T> = new Context<T>();
const context: Context<T> = new Context<T>();
this.contexts.push(context);
return context;
}

closeContext(context: Context<T>) {
this.destination.next(context.buffer);
const contexts = this.contexts;
contexts.splice(contexts.indexOf(context), 1);

const spliceIndex = contexts ? contexts.indexOf(context) : -1;
if (spliceIndex >= 0) {
contexts.splice(contexts.indexOf(context), 1);
}
}
}

Expand All @@ -203,8 +207,8 @@ function dispatchBufferTimeSpanOnly(state: any) {
subscriber.closeContext(prevContext);
}

state.context = subscriber.openContext();
if (!subscriber.isUnsubscribed) {
state.context = subscriber.openContext();
state.context.closeAction = (<any>this).schedule(state, state.bufferTimeSpan);
}
}
Expand Down

0 comments on commit 4ce4433

Please sign in to comment.