Skip to content

Commit

Permalink
feat(operator): add dematerialize operator
Browse files Browse the repository at this point in the history
- add dematerialize operator
- add marble test case for materialize, dematerialize operator

closes ReactiveX#391, ReactiveX#475
  • Loading branch information
kwonoj committed Oct 7, 2015
1 parent c1a4994 commit a97ef0b
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 32 deletions.
84 changes: 84 additions & 0 deletions spec/operators/dematerialize-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/* globals describe, it, expect, expectObservable, hot */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;
var Notification = Rx.Notification;

describe('Observable.prototype.dematerialize()', function () {
it('should dematerialize a happy stream', function () {
var values = {
a: Notification.createNext('w'),
b: Notification.createNext('x'),
c: Notification.createNext('y'),
d: Notification.createComplete()
};

var e1 = hot('--a--b--c--d--|', values);
var expected = '--w--x--y--|';

expectObservable(e1.dematerialize()).toBe(expected);
});

it('should dematerialize a sad stream', function () {
var values = {
a: Notification.createNext('w'),
b: Notification.createNext('x'),
c: Notification.createNext('y'),
d: Notification.createError('error')
};

var e1 = hot('--a--b--c--d--|', values);
var expected = '--w--x--y--#';

expectObservable(e1.dematerialize()).toBe(expected);
});

it('should dematerialize stream does not completes', function () {
var e1 = hot('------');
var expected = '-';

expectObservable(e1.dematerialize()).toBe(expected);
});

it('should dematerialize stream never completes', function () {
var e1 = Observable.never();
var expected = '-';

expectObservable(e1.dematerialize()).toBe(expected);
});

it('should dematerialize stream does not emit', function () {
var e1 = hot('----|');
var expected = '----|)';

expectObservable(e1.dematerialize()).toBe(expected);
});

it('should dematerialize empty stream', function () {
var e1 = Observable.empty();
var expected = '|';

expectObservable(e1.dematerialize()).toBe(expected);
});

it('should dematerialize stream throws', function () {
var error = 'error';
var e1 = hot('(x|)', {x: Notification.createError(error)});
var expected = '#';

expectObservable(e1.dematerialize()).toBe(expected, null, error);
});

it('should dematerialize and completes when stream compltes with complete notification', function () {
var e1 = hot('----(a|)', { a: Notification.createComplete() });
var expected = '----|';

expectObservable(e1.dematerialize()).toBe(expected);
});

it('should dematerialize and completes when stream emits complete notification', function () {
var e1 = hot('----a--|', { a: Notification.createComplete() });
var expected = '----|';

expectObservable(e1.dematerialize()).toBe(expected);
});
});
91 changes: 59 additions & 32 deletions spec/operators/materialize-spec.js
Original file line number Diff line number Diff line change
@@ -1,43 +1,70 @@
/* globals describe, it, expect */
/* globals describe, it, expect, expectObservable, hot */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;
var Notification = Rx.Notification;

describe('Observable.prototype.materialize()', function () {
it('should materialize a happy stream', function () {
var expected = [
Notification.createNext(1),
Notification.createNext(2),
Notification.createNext(3),
Notification.createComplete()
];

Observable.of(1, 2, 3)
.materialize()
.subscribe(function (n) {
expect(n instanceof Notification).toBe(true);
expect(n).toEqual(expected.shift());
});
var e1 = hot('--a--b--c--|');
var expected = '--w--x--y--(z|)';

var expectedValue = {
w: Notification.createNext('a'),
x: Notification.createNext('b'),
y: Notification.createNext('c'),
z: Notification.createComplete()
};

expectObservable(e1.materialize()).toBe(expected, expectedValue);
});

it('should materialize a sad stream', function () {
var expected = [
Notification.createNext(1),
Notification.createNext(2),
Notification.createNext(3),
Notification.createError('booooo')
];

Observable.of(1, 2, 3, 4)
.map(function (x) {
if (x === 4) {
throw 'booooo';
}
return x;
})
.materialize()
.subscribe(function (n) {
expect(n).toEqual(expected.shift());
});
var e1 = hot('--a--b--c--#');
var expected = '--w--x--y--(z|)';

var expectedValue = {
w: Notification.createNext('a'),
x: Notification.createNext('b'),
y: Notification.createNext('c'),
z: Notification.createError('error')
};

expectObservable(e1.materialize()).toBe(expected, expectedValue);
});

it('should materialize stream does not completes', function () {
var e1 = hot('------');
var expected = '-';

expectObservable(e1.materialize()).toBe(expected);
});

it('should materialize stream never completes', function () {
var e1 = Observable.never();
var expected = '-';

expectObservable(e1.materialize()).toBe(expected);
});

it('should materialize stream does not emit', function () {
var e1 = hot('----|');
var expected = '----(x|)';

expectObservable(e1.materialize()).toBe(expected, { x: Notification.createComplete() });
});

it('should materialize empty stream', function () {
var e1 = Observable.empty();
var expected = '(x|)';

expectObservable(e1.materialize()).toBe(expected, { x: Notification.createComplete() });
});

it('should materialize stream throws', function () {
var error = 'error';
var e1 = Observable.throw(error);
var expected = '(x|)';

expectObservable(e1.materialize()).toBe(expected, { x: Notification.createError(error) });
});
});
1 change: 1 addition & 0 deletions src/CoreOperators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export interface CoreOperators<T> {
concatMap?: <R>(project: ((x: T, ix: number) => Observable<any>), projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
concatMapTo?: <R>(observable: Observable<any>, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
count?: () => Observable<number>;
dematerialize?: () => Observable<any>;
debounce?: <R>(dueTime: number, scheduler?: Scheduler) => Observable<R>;
defaultIfEmpty?: <T, R>(defaultValue: R) => Observable<T>|Observable<R>;
delay?: <T>(delay: number, scheduler?: Scheduler) => Observable<T>;
Expand Down
3 changes: 3 additions & 0 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ observableProto.concatMapTo = concatMapTo;
import count from './operators/count';
observableProto.count = count;

import dematerialize from './operators/dematerialize';
observableProto.dematerialize = dematerialize;

import debounce from './operators/debounce';
observableProto.debounce = debounce;

Expand Down
3 changes: 3 additions & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ observableProto.concatMapTo = concatMapTo;
import count from './operators/count';
observableProto.count = count;

import dematerialize from './operators/dematerialize';
observableProto.dematerialize = dematerialize;

import debounce from './operators/debounce';
observableProto.debounce = debounce;

Expand Down
24 changes: 24 additions & 0 deletions src/operators/dematerialize.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import Operator from '../Operator';
import Observer from '../Observer';
import Subscriber from '../Subscriber';
import Notification from '../Notification';

export default function dematerialize<T>() {
return this.lift(new DeMaterializeOperator());
}

class DeMaterializeOperator<T extends Notification<any>, R> implements Operator<T, R> {
call(subscriber: Subscriber<R>) {
return new DeMaterializeSubscriber(subscriber);
}
}

class DeMaterializeSubscriber<T extends Notification<any>> extends Subscriber<T> {
constructor(destination: Subscriber<any>) {
super(destination);
}

_next(value: T) {
value.observe(this.destination);
}
}

0 comments on commit a97ef0b

Please sign in to comment.