Skip to content

Commit

Permalink
feat(reduce): add thisArg parameter
Browse files Browse the repository at this point in the history
Adds thisArg as third argument to have symmetry with native Array.prototype.reduce().

Relates to ReactiveX#878 and ReactiveX#996.
  • Loading branch information
luisgabriel committed Dec 8, 2015
1 parent 1298b57 commit ef890a8
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 12 deletions.
9 changes: 9 additions & 0 deletions spec/observables/ScalarObservable-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,15 @@ describe('ScalarObservable', function () {
expect(r).toBe(s);
});

it('should reduce with custom thisArg', function () {
var s = new ScalarObservable(1);
var reducer = { add: function (a, b) { return a + b; } };
var reduceFunction = function (a, x) { return this.add(a, x); };
var r = s.reduce(reduceFunction, 1, reducer);
expect(r instanceof ScalarObservable).toBe(true);
expect(r.value).toBe(2);
});

it('should return ErrorObservable if projection throws', function () {
var s = new ScalarObservable(1);
var r = s.reduce(function (a, x) { throw 'error'; }, 1);
Expand Down
17 changes: 17 additions & 0 deletions spec/operators/reduce-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,23 @@ describe('Observable.prototype.reduce()', function () {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should reduce with custom thisArg', function () {
var e1 = hot('--a--b--c--|');
var e1subs = '^ !';
var expected = '-----------(x|)';

var reducer = {
add: function (a, b) { return a + b; }
};
var reduceFunction = function (o, x) {
return this.add(o, x);
};
var r = e1.reduce(reduceFunction, '', reducer);

expectObservable(r).toBe(expected, {x: 'abc'});
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should reduce with seed if source is empty', function () {
var e1 = hot('--a--^-------|');
var e1subs = '^ !';
Expand Down
2 changes: 1 addition & 1 deletion src/CoreOperators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ export interface CoreOperators<T> {
publishBehavior?: (value: any) => ConnectableObservable<T>;
publishReplay?: (bufferSize?: number, windowTime?: number, scheduler?: Scheduler) => ConnectableObservable<T>;
publishLast?: () => ConnectableObservable<T>;
reduce?: <R>(project: (acc: R, x: T) => R, seed?: R) => Observable<R>;
reduce?: <R>(project: (acc: R, x: T) => R, seed?: R, thisArg?: any) => Observable<R>;
repeat?: (count?: number) => Observable<T>;
retry?: (count?: number) => Observable<T>;
retryWhen?: (notifier: (errors: Observable<any>) => Observable<any>) => Observable<T>;
Expand Down
2 changes: 1 addition & 1 deletion src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ export class Observable<T> implements CoreOperators<T> {
publishBehavior: (value: any) => ConnectableObservable<T>;
publishReplay: (bufferSize?: number, windowTime?: number, scheduler?: Scheduler) => ConnectableObservable<T>;
publishLast: () => ConnectableObservable<T>;
reduce: <R>(project: (acc: R, x: T) => R, seed?: R) => Observable<R>;
reduce: <R>(project: (acc: R, x: T) => R, seed?: R, thisArg?: any) => Observable<R>;
repeat: (count?: number) => Observable<T>;
retry: (count?: number) => Observable<T>;
retryWhen: (notifier: (errors: Observable<any>) => Observable<any>) => Observable<T>;
Expand Down
4 changes: 2 additions & 2 deletions src/observable/ScalarObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ proto.filter = function <T>(select: (x: T, ix?: number) => boolean, thisArg?: an
}
};

proto.reduce = function <T, R>(project: (acc: R, x: T) => R, seed?: R): Observable<R> {
proto.reduce = function <T, R>(project: (acc: R, x: T) => R, seed?: R, thisArg?: any): Observable<R> {
if (typeof seed === 'undefined') {
return <any>this;
}
let result = tryCatch(project)(seed, this.value);
let result = tryCatch(project).call(thisArg || this, seed, this.value);
if (result === errorObject) {
return new ErrorObservable(errorObject.e);
} else {
Expand Down
15 changes: 9 additions & 6 deletions src/operator/reduce-support.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import {errorObject} from '../util/errorObject';

export class ReduceOperator<T, R> implements Operator<T, R> {

constructor(private project: (acc: R, x: T) => R, private seed?: R) {
constructor(private project: (acc: R, x: T) => R,
private seed?: R,
private thisArg?: any) {
}

call(subscriber: Subscriber<T>): Subscriber<T> {
return new ReduceSubscriber(subscriber, this.project, this.seed);
return new ReduceSubscriber(subscriber, this.project, this.seed, this.thisArg);
}
}

Expand All @@ -18,18 +20,19 @@ export class ReduceSubscriber<T, R> extends Subscriber<T> {
acc: R;
hasSeed: boolean;
hasValue: boolean = false;
project: (acc: R, x: T) => R;

constructor(destination: Subscriber<T>, project: (acc: R, x: T) => R, seed?: R) {
constructor(destination: Subscriber<T>,
private project: (acc: R, x: T) => R,
seed?: R,
private thisArg?: any) {
super(destination);
this.acc = seed;
this.project = project;
this.hasSeed = typeof seed !== 'undefined';
}

_next(x) {
if (this.hasValue || (this.hasValue = this.hasSeed)) {
const result = tryCatch(this.project).call(this, this.acc, x);
const result = tryCatch(this.project).call(this.thisArg || this, this.acc, x);
if (result === errorObject) {
this.destination.error(errorObject.e);
} else {
Expand Down
4 changes: 2 additions & 2 deletions src/operator/reduce.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {Observable} from '../Observable';
import {ReduceOperator} from './reduce-support';

export function reduce<T, R>(project: (acc: R, x: T) => R, seed?: R): Observable<R> {
return this.lift(new ReduceOperator(project, seed));
export function reduce<T, R>(project: (acc: R, x: T) => R, seed?: R, thisArg?: any): Observable<R> {
return this.lift(new ReduceOperator(project, seed, thisArg));
}

0 comments on commit ef890a8

Please sign in to comment.