diff --git a/perf/micro/immediate-scheduler/operators/first-predicate.js b/perf/micro/immediate-scheduler/operators/first-predicate.js new file mode 100644 index 0000000000..3479906327 --- /dev/null +++ b/perf/micro/immediate-scheduler/operators/first-predicate.js @@ -0,0 +1,23 @@ +var RxOld = require("rx"); +var RxNew = require("../../../../index"); + +module.exports = function (suite) { + var predicate = function(value, index) { + return value === 20; + }; + + var oldFirstNoArgs = RxOld.Observable.range(0, 50, RxOld.Scheduler.immediate).first(predicate); + var newFirstNoArgs = RxNew.Observable.range(0, 50).first(predicate); + + return suite + .add('old first(predicate) with immediate scheduler', function () { + oldFirstNoArgs.subscribe(_next, _error, _complete); + }) + .add('new first(predicate) with immediate scheduler', function () { + newFirstNoArgs.subscribe(_next, _error, _complete); + }); + + function _next(x) { } + function _error(e) { } + function _complete() { } +}; \ No newline at end of file diff --git a/perf/micro/immediate-scheduler/operators/first.js b/perf/micro/immediate-scheduler/operators/first.js new file mode 100644 index 0000000000..ae9a4bffe0 --- /dev/null +++ b/perf/micro/immediate-scheduler/operators/first.js @@ -0,0 +1,19 @@ +var RxOld = require("rx"); +var RxNew = require("../../../../index"); + +module.exports = function (suite) { + var oldFirstNoArgs = RxOld.Observable.range(0, 50, RxOld.Scheduler.immediate).first(); + var newFirstNoArgs = RxNew.Observable.range(0, 50).first(); + + return suite + .add('old first() with immediate scheduler', function () { + oldFirstNoArgs.subscribe(_next, _error, _complete); + }) + .add('new first() with immediate scheduler', function () { + newFirstNoArgs.subscribe(_next, _error, _complete); + }); + + function _next(x) { } + function _error(e) { } + function _complete() { } +}; \ No newline at end of file diff --git a/spec/operators/first-spec.js b/spec/operators/first-spec.js new file mode 100644 index 0000000000..d2ee91bbf9 --- /dev/null +++ b/spec/operators/first-spec.js @@ -0,0 +1,117 @@ +/* globals describe, it, expect, expectObservable, hot, cold */ +var Rx = require('../../dist/cjs/Rx'); + +describe('Observable.prototype.first()', function() { + it('should take the first value of an observable with one value', function() { + var e1 = hot('---(a|)'); + var expected = '---(a|)'; + expectObservable(e1.first()).toBe(expected) + }); + + it('should take the first value of an observable with many values', function() { + var e1 = hot('--a--^--b----c---d--|'); + var expected = '---(b|)'; + expectObservable(e1.first()).toBe(expected) + }); + + it('should error on empty', function() { + var e1 = hot('--a--^----|'); + var expected = '-----#'; + expectObservable(e1.first()).toBe(expected, null, new Rx.EmptyError()); + }); + + it('should return the default value if source observable was empty', function() { + var e1 = hot('-----^----|'); + var expected = '-----(a|)'; + expectObservable(e1.first(null, null, 'a')).toBe(expected); + }); + + it('should propagate error from the source observable', function() { + var e1 = hot('---^---#'); + var expected = '----#'; + expectObservable(e1.first()).toBe(expected) + }); + + it('should go on forever on never', function() { + var e2 = hot('--^-------'); + var expected = '--------'; + expectObservable(e2.first()).toBe(expected); + }); + + it('should return first value that matches a predicate', function() { + var e1 = hot('--a-^--b--c--a--c--|'); + var expected = '------(c|)'; + var predicate = function (value) { + return value === 'c'; + }; + expectObservable(e1.first(predicate)).toBe(expected); + }); + + it('should return first value that matches a predicate for odd numbers', function() { + var e1 = hot('--a-^--b--c--d--e--|', {a: 1, b: 2, c: 3, d: 4, e: 5}); + var expected = '------(c|)'; + var predicate = function (value) { + return value % 2 === 1; + }; + expectObservable(e1.first(predicate)).toBe(expected, {c: 3}); + }); + + it('should return first value that matches a predicate using thisArg', function() { + var e1 = hot('--a-^--b--c--d--e--|', {a: 1, b: 2, c: 3, d: 4, e: 5}); + var expected = '------(c|)'; + var predicate = function (value) { + expect(this).toEqual(42); + return value % 2 === 1; + }; + expectObservable(e1.first(predicate, 42)).toBe(expected, {c: 3}); + }); + + it('should error when no value matches the predicate', function() { + var e1 = hot('--a-^--b--c--a--c--|'); + var expected = '---------------#'; + var predicate = function (value) { + return value === 's'; + }; + expectObservable(e1.first(predicate)).toBe(expected, null, new Rx.EmptyError()); + }); + + it('should return the default value when no value matches the predicate', function() { + var e1 = hot('--a-^--b--c--a--c--|'); + var expected = '---------------(d|)'; + var predicate = function (value) { + return value === 's'; + }; + expectObservable(e1.first(predicate, null, 'd')).toBe(expected); + }); + + it('should propagate error when no value matches the predicate', function() { + var e1 = hot('--a-^--b--c--a--#'); + var expected = '------------#'; + var predicate = function (value) { + return value === 's'; + }; + expectObservable(e1.first(predicate)).toBe(expected); + }); + + it('should return first value that matches the index in the predicate', function() { + var e1 = hot('--a-^--b--c--a--c--|'); + var expected = '---------(a|)'; + var predicate = function (value, index) { + return index === 2; + }; + expectObservable(e1.first(predicate)).toBe(expected); + }); + + it('should propagate error from predicate', function() { + var e1 = hot('--a-^--b--c--d--e--|', {a: 1, b: 2, c: 3, d: 4, e: 5}); + var expected = '---------#'; + var predicate = function (value) { + if (value < 4) { + return false; + } else { + throw 'error'; + } + }; + expectObservable(e1.first(predicate)).toBe(expected, null, 'error'); + }); +}); diff --git a/src/operators/first.ts b/src/operators/first.ts new file mode 100644 index 0000000000..b6b8be85fa --- /dev/null +++ b/src/operators/first.ts @@ -0,0 +1,74 @@ +import Observable from '../Observable'; +import Operator from '../Operator'; +import Subscriber from '../Subscriber'; +import Observer from '../Observer'; + +import tryCatch from '../util/tryCatch'; +import {errorObject} from '../util/errorObject'; +import bindCallback from '../util/bindCallback'; +import EmptyError from '../util/EmptyError'; + +export default function first(predicate?: (value: T, index: number, source: Observable) => boolean, + thisArg?: any, + defaultValue?: any): Observable { + return this.lift(new FirstOperator(predicate, thisArg, defaultValue, this)); +} + +class FirstOperator implements Operator { + constructor(private predicate?: (value: T, index: number, source: Observable) => boolean, + private thisArg?: any, + private defaultValue?: any, + private source?: Observable) { + } + + call(observer: Subscriber): Subscriber { + return new FirstSubscriber( + observer, this.predicate, this.thisArg, this.defaultValue, this.source + ); + } +} + +class FirstSubscriber extends Subscriber { + private predicate: Function; + private index: number = 0; + private hasCompleted: boolean = false; + + constructor(destination: Observer, + predicate?: (value: T, index: number, source: Observable) => boolean, + private thisArg?: any, + private defaultValue?: any, + private source?: Observable) { + super(destination); + if (typeof predicate === 'function') { + this.predicate = bindCallback(predicate, thisArg, 3); + } + } + + _next(value: T) { + const destination = this.destination; + const predicate = this.predicate; + let passed: any = true; + if (predicate) { + passed = tryCatch(predicate)(value, this.index++, this.source); + if (passed === errorObject) { + destination.error(passed.e); + return; + } + } + if (passed) { + destination.next(value); + destination.complete(); + this.hasCompleted = true; + } + } + + _complete() { + const destination = this.destination; + if (!this.hasCompleted && typeof this.defaultValue !== 'undefined') { + destination.next(this.defaultValue); + destination.complete(); + } else if (!this.hasCompleted) { + destination.error(new EmptyError); + } + } +}