Skip to content

Commit

Permalink
fix(IteratorObservable): get new iterator for each subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
kwonoj committed Jun 15, 2017
1 parent 3003614 commit 1ea94c2
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 11 deletions.
18 changes: 18 additions & 0 deletions spec/observables/IteratorObservable-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,24 @@ describe('IteratorObservable', () => {
);
});

it('should get new iterator for each subscription', () => {
const expected = [
Rx.Notification.createNext(10),
Rx.Notification.createNext(20),
Rx.Notification.createComplete()
];

const e1 = IteratorObservable.create<number>(new Int32Array([10, 20])).observeOn(rxTestScheduler);

let v1, v2: Array<Rx.Notification<any>>;
e1.materialize().toArray().subscribe((x) => v1 = x);
e1.materialize().toArray().subscribe((x) => v2 = x);

rxTestScheduler.flush();
expect(v1).to.deep.equal(expected);
expect(v2).to.deep.equal(expected);
});

it('should finalize generators if the subscription ends', () => {
const iterator = {
finalized: false,
Expand Down
17 changes: 6 additions & 11 deletions src/observable/IteratorObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import { Subscriber } from '../Subscriber';
* @hide true
*/
export class IteratorObservable<T> extends Observable<T> {
private iterator: any;

static create<T>(iterator: any, scheduler?: IScheduler): IteratorObservable<T> {
return new IteratorObservable(iterator, scheduler);
}
Expand Down Expand Up @@ -45,20 +43,20 @@ export class IteratorObservable<T> extends Observable<T> {
(<any> this).schedule(state);
}

constructor(iterator: any, private scheduler?: IScheduler) {
constructor(private readonly iteratorObject: any, private scheduler?: IScheduler) {
super();

if (iterator == null) {
if (iteratorObject == null) {
throw new Error('iterator cannot be null.');
} else if (!iteratorObject[Symbol_iterator]) {
throw new TypeError('object is not iterable');
}

this.iterator = getIterator(iterator);
}

protected _subscribe(subscriber: Subscriber<T>): TeardownLogic {

let index = 0;
const { iterator, scheduler } = this;
const { scheduler } = this;
const iterator = getIterator(this.iteratorObject);

if (scheduler) {
return scheduler.schedule(IteratorObservable.dispatch, 0, {
Expand Down Expand Up @@ -126,9 +124,6 @@ function getIterator(obj: any) {
if (!i && obj.length !== undefined) {
return new ArrayIterator(obj);
}
if (!i) {
throw new TypeError('object is not iterable');
}
return obj[Symbol_iterator]();
}

Expand Down

0 comments on commit 1ea94c2

Please sign in to comment.