Skip to content

Commit

Permalink
feat(Stream): accept partially defined listeners
Browse files Browse the repository at this point in the history
Stream.addListener can now accept a listener that is partially defined. This means that it does not

require all three callbacks. One or two or all of the callbacks can be given, and it will still work

as expected.

#67
  • Loading branch information
staltz committed Oct 19, 2016
1 parent 02be36f commit e9d005d
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 117 deletions.
20 changes: 10 additions & 10 deletions src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ export interface Listener<T> {
complete: () => void;
}

export interface PartialListener<T> {
next?: (x: T) => void;
error?: (err: any) => void;
complete?: () => void;
}

export type Observable<T> = {
subscribe(listener: Listener<T>): { unsubscribe: () => void; }
}
Expand Down Expand Up @@ -1329,16 +1335,10 @@ export class Stream<T> implements InternalListener<T> {
*
* @param {Listener<T>} listener
*/
addListener(listener: Listener<T>): void {
if (typeof listener.next !== 'function'
|| typeof listener.error !== 'function'
|| typeof listener.complete !== 'function') {
throw new Error('stream.addListener() requires all three next, error, ' +
'and complete functions.');
}
(listener as InternalListener<T> & Listener<T>)._n = listener.next;
(listener as InternalListener<T> & Listener<T>)._e = listener.error;
(listener as InternalListener<T> & Listener<T>)._c = listener.complete;
addListener(listener: PartialListener<T>): void {
(listener as InternalListener<T> & Listener<T>)._n = listener.next || noop;
(listener as InternalListener<T> & Listener<T>)._e = listener.error || noop;
(listener as InternalListener<T> & Listener<T>)._c = listener.complete || noop;
this._add(listener as InternalListener<T> & Listener<T>);
}

Expand Down
20 changes: 18 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
import {Stream, MemoryStream, Listener, Producer, Operator, Observable} from './core';
export {Stream, MemoryStream, Listener, Producer, Operator, Observable};
import {
Stream,
MemoryStream,
Listener,
PartialListener,
Producer,
Operator,
Observable,
} from './core';
export {
Stream,
MemoryStream,
Listener,
PartialListener,
Producer,
Operator,
Observable,
};
export default Stream;
135 changes: 30 additions & 105 deletions tests/stream.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/// <reference path="../typings/globals/mocha/index.d.ts" />
/// <reference path="../typings/globals/node/index.d.ts" />
import xs, {Producer, Listener, Stream} from '../src/index';
import fromDiagram from '../src/extra/fromDiagram';
import * as assert from 'assert';

describe('Stream', () => {
Expand Down Expand Up @@ -352,126 +353,50 @@ describe('Stream', () => {
});

describe('addListener', () => {
it('throws a helpful error if you forget the next function', (done) => {
const stream = xs.empty();
const listener = <Listener<Number>> <any> {};
it('should accept a partial listener with just next', (done) => {
const stream = fromDiagram('--a--b----c----d---|');
const expected = ['a', 'b', 'c', 'd'];

try {
stream.addListener(listener);
} catch (e) {
assert.equal(e.message, 'stream.addListener() requires all three ' +
'next, error, and complete functions.');
done();
}
});

it('throws a helpful error if you forget the error function', (done) => {
const stream = xs.empty();
const listener = <Listener<Number>> <any> {
next: (x: any) => {}
};

try {
stream.addListener(listener);
} catch (e) {
assert.equal(e.message, 'stream.addListener() requires all three ' +
'next, error, and complete functions.');
done();
}
});

it('throws a helpful error if you forget the complete function', (done) => {
const stream = xs.empty();
const listener = <Listener<Number>> <any> {
next: (x: any) => {},
error: (err: any) => {}
let listener = {
next: (x: number) => {
assert.equal(x, expected.shift());
if (expected.length === 0) {
stream.removeListener(listener as any);
done();
}
},
};

try {
stream.addListener(listener);
} catch (e) {
assert.equal(e.message, 'stream.addListener() requires all three ' +
'next, error, and complete functions.');
done();
}
stream.addListener(listener as any);
});

it('throws a helpful error if you pass a non function value as the next function', (done) => {
const stream = xs.empty();
const listener = <Listener<Number>> <any> {
next: undefined
};

try {
stream.addListener(listener);
} catch (e) {
assert.equal(e.message, 'stream.addListener() requires all three ' +
'next, error, and complete functions.');
done();
}
});
});
it('should accept a partial listener with just error', (done) => {
const stream = fromDiagram('--a--b----c----d---#', {errorValue: 'oops'});

describe('subscribe', () => {
it('throws a helpful error if you forget the next function', (done) => {
const stream = xs.empty();
const listener = <Listener<Number>> <any> {};

try {
stream.subscribe(listener);
} catch (e) {
assert.equal(e.message, 'stream.addListener() requires all three ' +
'next, error, and complete functions.');
done();
}
});

it('throws a helpful error if you forget the error function', (done) => {
const stream = xs.empty();
const listener = <Listener<Number>> <any> {
next: (x: any) => {}
let listener = {
error: (err: any) => {
assert.equal(err, 'oops');
done();
},
};

try {
stream.subscribe(listener);
} catch (e) {
assert.equal(e.message, 'stream.addListener() requires all three ' +
'next, error, and complete functions.');
done();
}
stream.addListener(listener as any);
});

it('throws a helpful error if you forget the complete function', (done) => {
const stream = xs.empty();
const listener = <Listener<Number>> <any> {
next: (x: any) => {},
error: (err: any) => {}
};

try {
stream.subscribe(listener);
} catch (e) {
assert.equal(e.message, 'stream.addListener() requires all three ' +
'next, error, and complete functions.');
done();
}
});
it('should accept a partial listener with just complete', (done) => {
const stream = fromDiagram('--a--b----c----d---|');

it('throws a helpful error if you pass a non function value as the next function', (done) => {
const stream = xs.empty();
const listener = <Listener<Number>> <any> {
next: undefined
let listener = {
complete: () => {
done();
},
};

try {
stream.subscribe(listener);
} catch (e) {
assert.equal(e.message, 'stream.addListener() requires all three ' +
'next, error, and complete functions.');
done();
}
stream.addListener(listener as any);
});
});

describe('subscribe', () => {
it('should return a subscription', (done) => {
const stream = xs.empty();
const noop = (): void => void 0;
Expand Down

0 comments on commit e9d005d

Please sign in to comment.