diff --git a/spec/operators/last-spec.js b/spec/operators/last-spec.js new file mode 100644 index 0000000000..e6f80b7cb3 --- /dev/null +++ b/spec/operators/last-spec.js @@ -0,0 +1,22 @@ +/* globals describe, it, expect, expectObservable, hot, cold */ +var Rx = require('../../dist/cjs/Rx'); + +describe('Observable.prototype.last()', function(){ + it('should take the last value of an observable', function(){ + var e1 = hot('--a--^--b--c--|'); + var expected = '---------(c|)'; + expectObservable(e1.last()).toBe(expected) + }); + + it('should error on empty', function() { + var e1 = hot('--a--^----|'); + var expected = '-----#'; + expectObservable(e1.last()).toBe(expected, null, new Rx.EmptyError()); + }); + + it('should go on forever on never', function() { + var e2 = hot('--^---'); + var expected = '----'; + expectObservable(e2.last()).toBe(expected); + }); +}); \ No newline at end of file diff --git a/spec/schedulers/TestScheduler-spec.js b/spec/schedulers/TestScheduler-spec.js index 47804a8866..d6bf7b1432 100644 --- a/spec/schedulers/TestScheduler-spec.js +++ b/spec/schedulers/TestScheduler-spec.js @@ -35,6 +35,25 @@ describe('TestScheduler', function() { { frame: 150, notification: Notification.createError('omg error!') } ]); }); + + it('should default in the letter for the value if no value hash was passed', function(){ + var result = TestScheduler.parseMarbles('--a--b--c--'); + expect(result).toDeepEqual([ + { frame: 20, notification: Notification.createNext('a') }, + { frame: 50, notification: Notification.createNext('b') }, + { frame: 80, notification: Notification.createNext('c') }, + ]) + }); + + it('should handle grouped values', function() { + var result = TestScheduler.parseMarbles('---(abc)---'); + expect(result).toDeepEqual([ + { frame: 30, notification: Notification.createNext('a') }, + { frame: 30, notification: Notification.createNext('b') }, + { frame: 30, notification: Notification.createNext('c') } + ]); + }); + }); describe('createColdObservable()', function () { diff --git a/src/Observable.ts b/src/Observable.ts index 531ad37a78..1b3bbc139e 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -184,6 +184,8 @@ export default class Observable { startWith: (x: T) => Observable; debounce: (dueTime: number, scheduler?: Scheduler) => Observable; + last: (predicate?: (value: T, index:number) => boolean, thisArg?: any, defaultValue?: any) => Observable; + filter: (predicate: (x: T) => boolean, ix?: number, thisArg?: any) => Observable; distinctUntilChanged: (compare?: (x: T, y: T) => boolean, thisArg?: any) => Observable; distinctUntilKeyChanged: (key: string, compare?: (x: any, y: any) => boolean, thisArg?: any) => Observable; diff --git a/src/Rx.ts b/src/Rx.ts index 27c523912c..9dcf869dff 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -9,6 +9,7 @@ import Observable from './Observable'; import Subscriber from './Subscriber'; import Subscription from './Subscription'; import Notification from './Notification'; +import EmptyError from './util/EmptyError'; import ReplaySubject from './subjects/ReplaySubject'; import BehaviorSubject from './subjects/BehaviorSubject'; @@ -215,6 +216,10 @@ import sampleTime from './operators/sampleTime'; observableProto.sample = sample; observableProto.sampleTime = sampleTime; +import last from './operators/last'; + +observableProto.last = last; + var Scheduler = { nextTick, immediate @@ -231,5 +236,6 @@ export { ConnectableObservable, Notification, VirtualTimeScheduler, - TestScheduler + TestScheduler, + EmptyError }; diff --git a/src/operators/last.ts b/src/operators/last.ts new file mode 100644 index 0000000000..89e6b1b897 --- /dev/null +++ b/src/operators/last.ts @@ -0,0 +1,68 @@ +import Observable from '../Observable'; +import Operator from '../Operator'; +import Subscriber from '../Subscriber'; +import Observer from '../Observer'; + +import tryCatch from '../util/tryCatch'; +import {errorObject} from '../util/errorObject'; +import bindCallback from '../util/bindCallback'; +import EmptyError from '../util/EmptyError'; + +export default function last(predicate?: (value: T, index: number, source:Observable) => boolean, thisArg?: any, defaultValue?: any) : Observable { + return this.lift(new LastOperator(predicate, thisArg, defaultValue, this)); +} + +class LastOperator implements Operator { + constructor(private predicate?: (value: T, index: number, source:Observable) => boolean, private thisArg?: any, private defaultValue?: any, private source?: Observable) { + + } + + call(observer: Subscriber): Subscriber { + return new LastSubscriber(observer, this.predicate, this.thisArg, this.defaultValue, this.source); + } +} + +class LastSubscriber extends Subscriber { + private lastValue: T; + private hasValue: boolean = false; + private predicate: Function; + private index: number = 0; + + constructor(destination: Observer, predicate?: (value: T, index: number, source: Observable) => boolean, + private thisArg?: any, private defaultValue?: any, private source?: Observable) { + super(destination); + if(typeof defaultValue !== 'undefined') { + this.lastValue = defaultValue; + this.hasValue = true; + } + if(typeof predicate === 'function') { + this.predicate = bindCallback(predicate, thisArg, 3); + } + } + + _next(value: T) { + const predicate = this.predicate; + if(predicate) { + let result = tryCatch(predicate)(value, this.index++, this.source); + if(result === errorObject) { + this.destination.error(result.e); + } else if (result) { + this.lastValue = result; + this.hasValue = true; + } + } else { + this.lastValue = value; + this.hasValue = true; + } + } + + _complete() { + const destination = this.destination; + if(this.hasValue) { + destination.next(this.lastValue); + destination.complete(); + } else { + destination.error(new EmptyError); + } + } +} \ No newline at end of file diff --git a/src/schedulers/TestScheduler.ts b/src/schedulers/TestScheduler.ts index 1cce23f6a7..1db584b184 100644 --- a/src/schedulers/TestScheduler.ts +++ b/src/schedulers/TestScheduler.ts @@ -76,6 +76,8 @@ export default class TestScheduler extends VirtualTimeScheduler { let results: ({ notification: Notification, frame: number })[] = []; let subIndex = marbles.indexOf('^'); let frameOffset = subIndex === -1 ? 0 : (subIndex * -10); + let getValue = typeof values !== 'object' ? (x) => x : (x) => values[x]; + let groupStart = -1; for (let i = 0; i < len; i++) { let frame = i * 10; @@ -84,6 +86,12 @@ export default class TestScheduler extends VirtualTimeScheduler { switch (c) { case '-': break; + case '(': + groupStart = frame; + break; + case ')': + groupStart = -1; + break; case '|': notification = Notification.createComplete(); break; @@ -93,14 +101,15 @@ export default class TestScheduler extends VirtualTimeScheduler { notification = Notification.createError(errorValue || 'error'); break; default: - notification = Notification.createNext(values[c]); + notification = Notification.createNext(getValue(c)); break; } + frame += frameOffset; if (notification) { - results.push({ notification, frame }); + results.push({ notification, frame: groupStart > -1 ? groupStart : frame }); } } return results; diff --git a/src/util/EmptyError.ts b/src/util/EmptyError.ts new file mode 100644 index 0000000000..d9c5d5a58e --- /dev/null +++ b/src/util/EmptyError.ts @@ -0,0 +1,4 @@ +export default class EmptyError implements Error { + name = 'EmptyError' + message = 'no elements in sequence'; +} \ No newline at end of file