diff --git a/spec/operators/dematerialize-spec.js b/spec/operators/dematerialize-spec.js new file mode 100644 index 00000000000..029c1a2c8a5 --- /dev/null +++ b/spec/operators/dematerialize-spec.js @@ -0,0 +1,84 @@ +/* globals describe, it, expect, expectObservable, hot */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; +var Notification = Rx.Notification; + +describe('Observable.prototype.dematerialize()', function () { + it('should dematerialize a happy stream', function () { + var values = { + a: Notification.createNext('w'), + b: Notification.createNext('x'), + c: Notification.createNext('y'), + d: Notification.createComplete() + }; + + var e1 = hot('--a--b--c--d--|', values); + var expected = '--w--x--y--|'; + + expectObservable(e1.dematerialize()).toBe(expected); + }); + + it('should dematerialize a sad stream', function () { + var values = { + a: Notification.createNext('w'), + b: Notification.createNext('x'), + c: Notification.createNext('y'), + d: Notification.createError('error') + }; + + var e1 = hot('--a--b--c--d--|', values); + var expected = '--w--x--y--#'; + + expectObservable(e1.dematerialize()).toBe(expected); + }); + + it('should dematerialize stream does not completes', function () { + var e1 = hot('------'); + var expected = '-'; + + expectObservable(e1.dematerialize()).toBe(expected); + }); + + it('should dematerialize stream never completes', function () { + var e1 = Observable.never(); + var expected = '-'; + + expectObservable(e1.dematerialize()).toBe(expected); + }); + + it('should dematerialize stream does not emit', function () { + var e1 = hot('----|'); + var expected = '----|)'; + + expectObservable(e1.dematerialize()).toBe(expected); + }); + + it('should dematerialize empty stream', function () { + var e1 = Observable.empty(); + var expected = '|'; + + expectObservable(e1.dematerialize()).toBe(expected); + }); + + it('should dematerialize stream throws', function () { + var error = 'error'; + var e1 = hot('(x|)', {x: Notification.createError(error)}); + var expected = '#'; + + expectObservable(e1.dematerialize()).toBe(expected, null, error); + }); + + it('should dematerialize and completes when stream compltes with complete notification', function () { + var e1 = hot('----(a|)', { a: Notification.createComplete() }); + var expected = '----|'; + + expectObservable(e1.dematerialize()).toBe(expected); + }); + + it('should dematerialize and completes when stream emits complete notification', function () { + var e1 = hot('----a--|', { a: Notification.createComplete() }); + var expected = '----|'; + + expectObservable(e1.dematerialize()).toBe(expected); + }); +}); \ No newline at end of file diff --git a/spec/operators/materialize-spec.js b/spec/operators/materialize-spec.js index 7af9159d53f..5db46ccdad1 100644 --- a/spec/operators/materialize-spec.js +++ b/spec/operators/materialize-spec.js @@ -1,43 +1,70 @@ -/* globals describe, it, expect */ +/* globals describe, it, expect, expectObservable, hot */ var Rx = require('../../dist/cjs/Rx'); var Observable = Rx.Observable; var Notification = Rx.Notification; describe('Observable.prototype.materialize()', function () { it('should materialize a happy stream', function () { - var expected = [ - Notification.createNext(1), - Notification.createNext(2), - Notification.createNext(3), - Notification.createComplete() - ]; - - Observable.of(1, 2, 3) - .materialize() - .subscribe(function (n) { - expect(n instanceof Notification).toBe(true); - expect(n).toEqual(expected.shift()); - }); + var e1 = hot('--a--b--c--|'); + var expected = '--w--x--y--(z|)'; + + var expectedValue = { + w: Notification.createNext('a'), + x: Notification.createNext('b'), + y: Notification.createNext('c'), + z: Notification.createComplete() + }; + + expectObservable(e1.materialize()).toBe(expected, expectedValue); }); it('should materialize a sad stream', function () { - var expected = [ - Notification.createNext(1), - Notification.createNext(2), - Notification.createNext(3), - Notification.createError('booooo') - ]; - - Observable.of(1, 2, 3, 4) - .map(function (x) { - if (x === 4) { - throw 'booooo'; - } - return x; - }) - .materialize() - .subscribe(function (n) { - expect(n).toEqual(expected.shift()); - }); + var e1 = hot('--a--b--c--#'); + var expected = '--w--x--y--(z|)'; + + var expectedValue = { + w: Notification.createNext('a'), + x: Notification.createNext('b'), + y: Notification.createNext('c'), + z: Notification.createError('error') + }; + + expectObservable(e1.materialize()).toBe(expected, expectedValue); + }); + + it('should materialize stream does not completes', function () { + var e1 = hot('------'); + var expected = '-'; + + expectObservable(e1.materialize()).toBe(expected); + }); + + it('should materialize stream never completes', function () { + var e1 = Observable.never(); + var expected = '-'; + + expectObservable(e1.materialize()).toBe(expected); + }); + + it('should materialize stream does not emit', function () { + var e1 = hot('----|'); + var expected = '----(x|)'; + + expectObservable(e1.materialize()).toBe(expected, { x: Notification.createComplete() }); + }); + + it('should materialize empty stream', function () { + var e1 = Observable.empty(); + var expected = '(x|)'; + + expectObservable(e1.materialize()).toBe(expected, { x: Notification.createComplete() }); + }); + + it('should materialize stream throws', function () { + var error = 'error'; + var e1 = Observable.throw(error); + var expected = '(x|)'; + + expectObservable(e1.materialize()).toBe(expected, { x: Notification.createError(error) }); }); }); \ No newline at end of file diff --git a/src/CoreOperators.ts b/src/CoreOperators.ts index db2feaeb28d..7e8918b6ecc 100644 --- a/src/CoreOperators.ts +++ b/src/CoreOperators.ts @@ -18,6 +18,7 @@ export interface CoreOperators { concatMap?: (project: ((x: T, ix: number) => Observable), projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable; concatMapTo?: (observable: Observable, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable; count?: () => Observable; + dematerialize?: () => Observable; debounce?: (dueTime: number, scheduler?: Scheduler) => Observable; defaultIfEmpty?: (defaultValue: R) => Observable|Observable; delay?: (delay: number, scheduler?: Scheduler) => Observable; diff --git a/src/Rx.KitchenSink.ts b/src/Rx.KitchenSink.ts index 1489f0713b9..5dc59a67858 100644 --- a/src/Rx.KitchenSink.ts +++ b/src/Rx.KitchenSink.ts @@ -108,6 +108,9 @@ observableProto.concatMapTo = concatMapTo; import count from './operators/count'; observableProto.count = count; +import dematerialize from './operators/dematerialize'; +observableProto.dematerialize = dematerialize; + import debounce from './operators/debounce'; observableProto.debounce = debounce; diff --git a/src/Rx.ts b/src/Rx.ts index 32db411b1d0..75e4f0d74a5 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -97,6 +97,9 @@ observableProto.concatMapTo = concatMapTo; import count from './operators/count'; observableProto.count = count; +import dematerialize from './operators/dematerialize'; +observableProto.dematerialize = dematerialize; + import debounce from './operators/debounce'; observableProto.debounce = debounce; diff --git a/src/operators/dematerialize.ts b/src/operators/dematerialize.ts new file mode 100644 index 00000000000..85122e3b091 --- /dev/null +++ b/src/operators/dematerialize.ts @@ -0,0 +1,24 @@ +import Operator from '../Operator'; +import Observer from '../Observer'; +import Subscriber from '../Subscriber'; +import Notification from '../Notification'; + +export default function dematerialize() { + return this.lift(new DeMaterializeOperator()); +} + +class DeMaterializeOperator, R> implements Operator { + call(subscriber: Subscriber) { + return new DeMaterializeSubscriber(subscriber); + } +} + +class DeMaterializeSubscriber> extends Subscriber { + constructor(destination: Subscriber) { + super(destination); + } + + _next(value: T) { + value.observe(this.destination); + } +}