Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dematerialize): add dematerialize operator #479

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions spec/operators/dematerialize-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/* globals describe, it, expect, expectObservable, hot */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;
var Notification = Rx.Notification;

describe('Observable.prototype.dematerialize()', function () {
it('should dematerialize a happy stream', function () {
var values = {
a: Notification.createNext('w'),
b: Notification.createNext('x'),
c: Notification.createNext('y'),
d: Notification.createComplete()
};

var e1 = hot('--a--b--c--d--|', values);
var expected = '--w--x--y--|';

expectObservable(e1.dematerialize()).toBe(expected);
});

it('should dematerialize a sad stream', function () {
var values = {
a: Notification.createNext('w'),
b: Notification.createNext('x'),
c: Notification.createNext('y'),
d: Notification.createError('error')
};

var e1 = hot('--a--b--c--d--|', values);
var expected = '--w--x--y--#';

expectObservable(e1.dematerialize()).toBe(expected);
});

it('should dematerialize stream does not completes', function () {
var e1 = hot('------');
var expected = '-';

expectObservable(e1.dematerialize()).toBe(expected);
});

it('should dematerialize stream never completes', function () {
var e1 = Observable.never();
var expected = '-';

expectObservable(e1.dematerialize()).toBe(expected);
});

it('should dematerialize stream does not emit', function () {
var e1 = hot('----|');
var expected = '----|)';

expectObservable(e1.dematerialize()).toBe(expected);
});

it('should dematerialize empty stream', function () {
var e1 = Observable.empty();
var expected = '|';

expectObservable(e1.dematerialize()).toBe(expected);
});

it('should dematerialize stream throws', function () {
var error = 'error';
var e1 = hot('(x|)', {x: Notification.createError(error)});
var expected = '#';

expectObservable(e1.dematerialize()).toBe(expected, null, error);
});

it('should dematerialize and completes when stream compltes with complete notification', function () {
var e1 = hot('----(a|)', { a: Notification.createComplete() });
var expected = '----|';

expectObservable(e1.dematerialize()).toBe(expected);
});

it('should dematerialize and completes when stream emits complete notification', function () {
var e1 = hot('----a--|', { a: Notification.createComplete() });
var expected = '----|';

expectObservable(e1.dematerialize()).toBe(expected);
});
});
91 changes: 59 additions & 32 deletions spec/operators/materialize-spec.js
Original file line number Diff line number Diff line change
@@ -1,43 +1,70 @@
/* globals describe, it, expect */
/* globals describe, it, expect, expectObservable, hot */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;
var Notification = Rx.Notification;

describe('Observable.prototype.materialize()', function () {
it('should materialize a happy stream', function () {
var expected = [
Notification.createNext(1),
Notification.createNext(2),
Notification.createNext(3),
Notification.createComplete()
];

Observable.of(1, 2, 3)
.materialize()
.subscribe(function (n) {
expect(n instanceof Notification).toBe(true);
expect(n).toEqual(expected.shift());
});
var e1 = hot('--a--b--c--|');
var expected = '--w--x--y--(z|)';

var expectedValue = {
w: Notification.createNext('a'),
x: Notification.createNext('b'),
y: Notification.createNext('c'),
z: Notification.createComplete()
};

expectObservable(e1.materialize()).toBe(expected, expectedValue);
});

it('should materialize a sad stream', function () {
var expected = [
Notification.createNext(1),
Notification.createNext(2),
Notification.createNext(3),
Notification.createError('booooo')
];

Observable.of(1, 2, 3, 4)
.map(function (x) {
if (x === 4) {
throw 'booooo';
}
return x;
})
.materialize()
.subscribe(function (n) {
expect(n).toEqual(expected.shift());
});
var e1 = hot('--a--b--c--#');
var expected = '--w--x--y--(z|)';

var expectedValue = {
w: Notification.createNext('a'),
x: Notification.createNext('b'),
y: Notification.createNext('c'),
z: Notification.createError('error')
};

expectObservable(e1.materialize()).toBe(expected, expectedValue);
});

it('should materialize stream does not completes', function () {
var e1 = hot('------');
var expected = '-';

expectObservable(e1.materialize()).toBe(expected);
});

it('should materialize stream never completes', function () {
var e1 = Observable.never();
var expected = '-';

expectObservable(e1.materialize()).toBe(expected);
});

it('should materialize stream does not emit', function () {
var e1 = hot('----|');
var expected = '----(x|)';

expectObservable(e1.materialize()).toBe(expected, { x: Notification.createComplete() });
});

it('should materialize empty stream', function () {
var e1 = Observable.empty();
var expected = '(x|)';

expectObservable(e1.materialize()).toBe(expected, { x: Notification.createComplete() });
});

it('should materialize stream throws', function () {
var error = 'error';
var e1 = Observable.throw(error);
var expected = '(x|)';

expectObservable(e1.materialize()).toBe(expected, { x: Notification.createError(error) });
});
});
1 change: 1 addition & 0 deletions src/CoreOperators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export interface CoreOperators<T> {
concatMap?: <R>(project: ((x: T, ix: number) => Observable<any>), projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
concatMapTo?: <R>(observable: Observable<any>, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
count?: () => Observable<number>;
dematerialize?: () => Observable<any>;
debounce?: <R>(dueTime: number, scheduler?: Scheduler) => Observable<R>;
defaultIfEmpty?: <T, R>(defaultValue: R) => Observable<T>|Observable<R>;
delay?: <T>(delay: number, scheduler?: Scheduler) => Observable<T>;
Expand Down
3 changes: 3 additions & 0 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ observableProto.concatMapTo = concatMapTo;
import count from './operators/count';
observableProto.count = count;

import dematerialize from './operators/dematerialize';
observableProto.dematerialize = dematerialize;

import debounce from './operators/debounce';
observableProto.debounce = debounce;

Expand Down
3 changes: 3 additions & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ observableProto.concatMapTo = concatMapTo;
import count from './operators/count';
observableProto.count = count;

import dematerialize from './operators/dematerialize';
observableProto.dematerialize = dematerialize;

import debounce from './operators/debounce';
observableProto.debounce = debounce;

Expand Down
24 changes: 24 additions & 0 deletions src/operators/dematerialize.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import Operator from '../Operator';
import Observer from '../Observer';
import Subscriber from '../Subscriber';
import Notification from '../Notification';

export default function dematerialize<T>() {
return this.lift(new DeMaterializeOperator());
}

class DeMaterializeOperator<T extends Notification<any>, R> implements Operator<T, R> {
call(subscriber: Subscriber<R>) {
return new DeMaterializeSubscriber(subscriber);
}
}

class DeMaterializeSubscriber<T extends Notification<any>> extends Subscriber<T> {
constructor(destination: Subscriber<any>) {
super(destination);
}

_next(value: T) {
value.observe(this.destination);
}
}