Skip to content

Commit

Permalink
fix(scan): accumulator passes current index
Browse files Browse the repository at this point in the history
closes #1614
  • Loading branch information
kwonoj committed Apr 23, 2016
1 parent d1f0fd6 commit a3ec896
Showing 2 changed files with 35 additions and 10 deletions.
25 changes: 25 additions & 0 deletions spec/operators/scan-spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import {expect} from 'chai';
import * as Rx from '../../dist/cjs/Rx';
declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions};

@@ -176,4 +177,28 @@ describe('Observable.prototype.scan', () => {
expectObservable(source, unsub).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should pass current index to accumulator', () => {
const values = {
a: 1, b: 3, c: 5,
x: 1, y: 4, z: 9
};
let idx = [0, 1, 2];

const e1 = hot('--a--b--c--|', values);
const e1subs = '^ !';
const expected = '--x--y--z--|';

const scanFunction = (o: number, value: number, index: number) => {
expect(index).to.equal(idx.shift());
return o + value;
};

const scan = e1.scan(scanFunction, 0).finally(() => {
expect(idx).to.be.empty;
});

expectObservable(scan).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});
20 changes: 10 additions & 10 deletions src/operator/scan.ts
Original file line number Diff line number Diff line change
@@ -32,23 +32,23 @@ import {Subscriber} from '../Subscriber';
* @see {@link mergeScan}
* @see {@link reduce}
*
* @param {function(acc: R, value: T): R} accumulator The accumulator function
* called on each source value.
* @param {function(acc: R, value: T, index: number): R} accumulator
* The accumulator function called on each source value.
* @param {T|R} [seed] The initial accumulation value.
* @return {Observable<R>} An observable of the accumulated values.
* @method scan
* @owner Observable
*/
export function scan<T, R>(accumulator: (acc: R, value: T) => R, seed?: T | R): Observable<R> {
export function scan<T, R>(accumulator: (acc: R, value: T, index: number) => R, seed?: T | R): Observable<R> {
return this.lift(new ScanOperator(accumulator, seed));
}

export interface ScanSignature<T> {
<R>(accumulator: (acc: R, value: T) => R, seed?: T | R): Observable<R>;
<R>(accumulator: (acc: R, value: T, index: number) => R, seed?: T | R): Observable<R>;
}

class ScanOperator<T, R> implements Operator<T, R> {
constructor(private accumulator: (acc: R, value: T) => R, private seed?: T | R) {
constructor(private accumulator: (acc: R, value: T, index: number) => R, private seed?: T | R) {
}

call(subscriber: Subscriber<R>, source: any): any {
@@ -62,6 +62,8 @@ class ScanOperator<T, R> implements Operator<T, R> {
* @extends {Ignored}
*/
class ScanSubscriber<T, R> extends Subscriber<T> {
private index: number = 0;
private accumulatorSet: boolean = false;
private _seed: T | R;

get seed(): T | R {
@@ -73,12 +75,9 @@ class ScanSubscriber<T, R> extends Subscriber<T> {
this._seed = value;
}

private accumulatorSet: boolean = false;

constructor(destination: Subscriber<R>, private accumulator: (acc: R, value: T) => R, seed?: T|R) {
constructor(destination: Subscriber<R>, private accumulator: (acc: R, value: T, index: number) => R, seed?: T|R) {
super(destination);
this.seed = seed;
this.accumulator = accumulator;
this.accumulatorSet = typeof seed !== 'undefined';
}

@@ -92,9 +91,10 @@ class ScanSubscriber<T, R> extends Subscriber<T> {
}

private _tryNext(value: T): void {
const index = this.index++;
let result: any;
try {
result = this.accumulator(<R>this.seed, value);
result = this.accumulator(<R>this.seed, value, index);
} catch (err) {
this.destination.error(err);
}

0 comments on commit a3ec896

Please sign in to comment.