Skip to content

Commit

Permalink
fix(operator): startWith operator accepts scheduler, multiple values
Browse files Browse the repository at this point in the history
- update startWith operator to accept multiple values as well
- update signature of startWith to accept scheduler as last parameter
- fix current micro perf test to accept scheduler properly, expand test
for multiple values
- expand test coverage
  • Loading branch information
kwonoj authored and benlesh committed Oct 6, 2015
1 parent 0b6e35e commit d1d339a
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
var RxOld = require('rx');
var RxNew = require('../../../../index');

module.exports = function (suite) {
var oldStartWithWithCurrentThreadScheduler = RxOld.Observable.of(25, RxOld.Scheduler.currentThread)
.startWith(RxOld.Scheduler.currentThread, 5, 5, 5);
var newStartWithWithCurrentThreadScheduler = RxNew.Observable.of(25, RxNew.Scheduler.immediate)
.startWith(5, 5, 5, RxNew.Scheduler.immediate);

function _next(x) { }
function _error(e) { }
function _complete() { }
return suite
.add('old startWith(fromarray) with current thread scheduler', function () {
oldStartWithWithCurrentThreadScheduler.subscribe(_next, _error, _complete);
})
.add('new startWith(fromarray) with current thread scheduler', function () {
newStartWithWithCurrentThreadScheduler.subscribe(_next, _error, _complete);
});
};
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@ var RxNew = require('../../../../index');

module.exports = function (suite) {
var oldStartWithWithCurrentThreadScheduler = RxOld.Observable.of(25, RxOld.Scheduler.currentThread)
.startWith(5, RxOld.Scheduler.currentThread);
.startWith(RxOld.Scheduler.currentThread, 5);
var newStartWithWithCurrentThreadScheduler = RxNew.Observable.of(25, RxNew.Scheduler.immediate)
.startWith(5, RxNew.Scheduler.immediate);

function _next(x) { }
function _error(e) { }
function _complete() { }
return suite
.add('old startWith with current thread scheduler', function () {
.add('old startWith(scalar) with current thread scheduler', function () {
oldStartWithWithCurrentThreadScheduler.subscribe(_next, _error, _complete);
})
.add('new startWith with current thread scheduler', function () {
.add('new startWith(scalar) with current thread scheduler', function () {
newStartWithWithCurrentThreadScheduler.subscribe(_next, _error, _complete);
});
};
};
20 changes: 20 additions & 0 deletions perf/micro/immediate-scheduler/operators/startwith-fromarray.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
var RxOld = require('rx');
var RxNew = require('../../../../index');

module.exports = function (suite) {
var oldStartWithWithImmediateScheduler = RxOld.Observable.of(25, RxOld.Scheduler.immediate)
.startWith(RxOld.Scheduler.immediate, 5, 5, 5);
var newStartWithWithImmediateScheduler = RxNew.Observable.of(25)
.startWith(5, 5, 5);

function _next(x) { }
function _error(e) { }
function _complete() { }
return suite
.add('old startWith(fromArray) with immediate scheduler', function () {
oldStartWithWithImmediateScheduler.subscribe(_next, _error, _complete);
})
.add('new startWith(fromArray) with immediate scheduler', function () {
newStartWithWithImmediateScheduler.subscribe(_next, _error, _complete);
});
};
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@ var RxNew = require('../../../../index');

module.exports = function (suite) {
var oldStartWithWithImmediateScheduler = RxOld.Observable.of(25, RxOld.Scheduler.immediate)
.startWith(5, RxOld.Scheduler.immediate);
.startWith(RxOld.Scheduler.immediate, 5);
var newStartWithWithImmediateScheduler = RxNew.Observable.of(25)
.startWith(5);

function _next(x) { }
function _error(e) { }
function _complete() { }
return suite
.add('old startWith with immediate scheduler', function () {
.add('old startWith(scalar) with immediate scheduler', function () {
oldStartWithWithImmediateScheduler.subscribe(_next, _error, _complete);
})
.add('new startWith with immediate scheduler', function () {
.add('new startWith(scalar) with immediate scheduler', function () {
newStartWithWithImmediateScheduler.subscribe(_next, _error, _complete);
});
};
};
100 changes: 86 additions & 14 deletions spec/operators/startWith-spec.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,92 @@
/* globals describe, it, expect */
/* globals describe, it, expect, expectObservable, hot, rxTestScheduler */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.startWith()', function () {
it('should start an observable with given value', function (done) {
var source = 'source';
var init = 'init';
var expected = [init, source];

var i = 0;
Observable.of(source)
.startWith(init)
.subscribe(function (x) {
expect(x).toBe(expected[i++]);
}, null, function () {
done();
});
var defaultStartValue = 'x';

it('should start an observable with given value', function () {
var e1 = hot('--a--|');
var expected = 'x-a--|';

expectObservable(e1.startWith(defaultStartValue)).toBe(expected);
});

it('should start with given value and does not completes if source does not completes', function () {
var e1 = hot('----a-');
var expected = 'x---a-';

expectObservable(e1.startWith(defaultStartValue)).toBe(expected);
});

it('should start with given value and does not completes if source never emits', function () {
var e1 = Observable.never();
var expected = 'x-';

expectObservable(e1.startWith(defaultStartValue)).toBe(expected);
});

it('should start with given value and completes if source does not emits', function () {
var e1 = hot('---|');
var expected = 'x--|';

expectObservable(e1.startWith(defaultStartValue)).toBe(expected);
});

it('should start with given value and complete immediately if source is empty', function () {
var e1 = Observable.empty();
var expected = '(x|)';

expectObservable(e1.startWith(defaultStartValue)).toBe(expected);
});

it('should start with given value and source both if source emits single value', function () {
var e1 = Observable.of('a');
var expected = '(xa|)';

expectObservable(e1.startWith(defaultStartValue)).toBe(expected);
});

it('should start with given values when given value is more than one', function () {
var e1 = hot('-----a--|');
var expected = '(yz)-a--|';

expectObservable(e1.startWith('y','z')).toBe(expected);
});

it('should start with given value and raises error if source raises error', function () {
var e1 = hot('--#');
var expected = 'x-#';

expectObservable(e1.startWith(defaultStartValue)).toBe(expected, defaultStartValue);
});

it('should start with given value and raises error immediately if source throws error', function () {
var error = 'error';
var e1 = Observable.throw(error);
var expected = '(x#)';

expectObservable(e1.startWith(defaultStartValue)).toBe(expected, defaultStartValue, error);
});

it('should start with empty if given value is not specified', function () {
var e1 = hot('-a-|');
var expected = '-a-|';

expectObservable(e1.startWith(rxTestScheduler)).toBe(expected);
});

it('should accept scheduler as last argument with single value', function () {
var e1 = hot('--a--|');
var expected = 'x-a--|';

expectObservable(e1.startWith(defaultStartValue, rxTestScheduler)).toBe(expected);
});

it('should accept scheduler as last argument with multiple value', function () {
var e1 = hot('-----a--|');
var expected = '(yz)-a--|';

expectObservable(e1.startWith('y','z', rxTestScheduler)).toBe(expected);
});
});
21 changes: 19 additions & 2 deletions src/operators/startWith.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,24 @@
import Scheduler from '../Scheduler';
import Observable from '../Observable';
import ArrayObservable from '../observables/ArrayObservable';
import ScalarObservable from '../observables/ScalarObservable';
import EmptyObservable from '../observables/EmptyObservable';
import concat from './concat-static';

export default function startWith<T>(x: T): Observable<T> {
return concat(new ScalarObservable(x), this);
export default function startWith<T>(...array: (T | Scheduler)[]): Observable<T> {
let scheduler = <Scheduler>array[array.length - 1];
if (scheduler && typeof scheduler.schedule === 'function') {
array.pop();
} else {
scheduler = void 0;
}

const len = array.length;
if (len === 1) {
return concat(new ScalarObservable(array[0], scheduler), this);
} else if (len > 1) {
return concat(new ArrayObservable(array, scheduler), this);
} else {
return concat(new EmptyObservable(scheduler), this);
}
}

0 comments on commit d1d339a

Please sign in to comment.