Skip to content

Commit

Permalink
feat(replaceError): implement replaceError(), wrap code with try-catch
Browse files Browse the repository at this point in the history
Wrap all user-provided code (like project in map or predicate in filter) with a try-catch to send
out the error to the listener, and implement the replaceError() operator.
  • Loading branch information
Andre Medeiros committed Apr 9, 2016
1 parent 566746a commit ffa5976
Show file tree
Hide file tree
Showing 19 changed files with 379 additions and 23 deletions.
22 changes: 14 additions & 8 deletions src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {DropOperator} from './operator/DropOperator';
import {DebugOperator} from './operator/DebugOperator';
import {FoldOperator} from './operator/FoldOperator';
import {LastOperator} from './operator/LastOperator';
import {ReplaceErrorOperator} from './operator/ReplaceErrorOperator';
import {StartWithOperator} from './operator/StartWithOperator';
import {EndWhenOperator} from './operator/EndWhenOperator';
import {FlattenOperator} from './operator/FlattenOperator';
Expand Down Expand Up @@ -100,6 +101,7 @@ export class Stream<T> implements InternalListener<T> {
}

_x(): void { // tear down logic, after error or complete
if (this._ils.length === 0) return;
if (this._prod) this._prod._stop();
this._ils = [];
}
Expand Down Expand Up @@ -136,20 +138,20 @@ export class Stream<T> implements InternalListener<T> {
}
}

static never(): Stream<void> {
return new Stream<void>({_start: noop, _stop: noop});
static never(): Stream<any> {
return new Stream<any>({_start: noop, _stop: noop});
}

static empty(): Stream<void> {
return new Stream<void>({
_start(il: InternalListener<void>) { il._c(); },
static empty(): Stream<any> {
return new Stream<any>({
_start(il: InternalListener<any>) { il._c(); },
_stop: noop,
});
}

static throw(err: any): Stream<void> {
return new Stream<void>({
_start(il: InternalListener<void>) { il._e(err); },
static throw(err: any): Stream<any> {
return new Stream<any>({
_start(il: InternalListener<any>) { il._e(err); },
_stop: noop,
});
}
Expand Down Expand Up @@ -212,6 +214,10 @@ export class Stream<T> implements InternalListener<T> {
return new Stream<R>(new FoldOperator(accumulate, init, this));
}

replaceError(replace: (err: any) => Stream<T>): Stream<T> {
return new Stream<T>(new ReplaceErrorOperator(replace, this));
}

flatten<R, T extends Stream<R>>(): T {
return <T> new Stream<R>(this._prod instanceof MapOperator ?
new MapFlattenOperator(<MapOperator<R, Stream<R>>> <any> this._prod) :
Expand Down
14 changes: 9 additions & 5 deletions src/factory/CombineProducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,18 @@ export class Proxy<T> implements InternalListener<T> {

_n(t: T): void {
const prod = this.prod;
const vals = prod.vals;
prod.hasVal[this.i] = true;
prod.vals[this.i] = t;
vals[this.i] = t;
if (!prod.ready) {
prod.up();
}
if (prod.ready) {
prod.out._n(invoke(prod.project, prod.vals));
try {
prod.out._n(invoke(prod.project, vals));
} catch (e) {
prod.out._e(e);
}
}
}

Expand All @@ -94,10 +99,9 @@ export class CombineProducer<R> implements InternalProducer<R> {
public ready: boolean = false;
public hasVal: Array<boolean>;
public vals: Array<any>;
private streams: Array<Stream<any>>;

constructor(public project: CombineProjectFunction, streams: Array<Stream<any>>) {
this.streams = streams;
constructor(public project: CombineProjectFunction,
public streams: Array<Stream<any>>) {
this.vals = new Array(streams.length);
this.hasVal = new Array(streams.length);
this.ac = streams.length;
Expand Down
8 changes: 6 additions & 2 deletions src/operator/DebugOperator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {Stream} from '../Stream';
export class DebugOperator<T> implements Operator<T, T> {
private out: Stream<T> = null;

constructor(public spy: (t: T) => void = null,
constructor(public spy: (t: T) => any = null,
public ins: Stream<T>) {
}

Expand All @@ -19,7 +19,11 @@ export class DebugOperator<T> implements Operator<T, T> {

_n(t: T) {
if (this.spy) {
this.spy(t);
try {
this.spy(t);
} catch (e) {
this.out._e(e);
}
} else {
console.log(t);
}
Expand Down
6 changes: 5 additions & 1 deletion src/operator/FilterOperator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ export class FilterOperator<T> implements Operator<T, T> {
}

_n(t: T) {
if (this.predicate(t)) this.out._n(t);
try {
if (this.predicate(t)) this.out._n(t);
} catch (e) {
this.out._e(e);
}
}

_e(err: any) {
Expand Down
6 changes: 5 additions & 1 deletion src/operator/FoldOperator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ export class FoldOperator<T, R> implements Operator<T, R> {
}

_n(t: T) {
this.out._n(this.acc = this.f(this.acc, t));
try {
this.out._n(this.acc = this.f(this.acc, t));
} catch (e) {
this.out._e(e);
}
}

_e(err: any) {
Expand Down
6 changes: 5 additions & 1 deletion src/operator/MapFlattenConcOperator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ export class MapFlattenConcOperator<T> implements InternalProducer<T>, InternalL

_n(v: T) {
this.active++;
this.mapOp.project(v)._add(new Inner(this.out, this));
try {
this.mapOp.project(v)._add(new Inner(this.out, this));
} catch (e) {
this.out._e(e);
}
}

_e(err: any) {
Expand Down
6 changes: 5 additions & 1 deletion src/operator/MapFlattenOperator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ export class MapFlattenOperator<T> implements InternalProducer<T>, InternalListe

_n(v: T) {
this.cut();
(this.curr = this.mapOp.project(v))._add(this.inner = new Inner(this.out, this));
try {
(this.curr = this.mapOp.project(v))._add(this.inner = new Inner(this.out, this));
} catch (e) {
this.out._e(e);
}
}

_e(err: any) {
Expand Down
6 changes: 5 additions & 1 deletion src/operator/MapOperator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ export class MapOperator<T, R> implements Operator<T, R> {
}

_n(t: T) {
this.out._n(this.project(t));
try {
this.out._n(this.project(t));
} catch (e) {
this.out._e(e);
}
}

_e(err: any) {
Expand Down
37 changes: 37 additions & 0 deletions src/operator/ReplaceErrorOperator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import {Operator} from '../Operator';
import {Stream} from '../Stream';
import {empty} from '../utils/empty';

export class ReplaceErrorOperator<T> implements Operator<T, T> {
private out: Stream<T> = <Stream<T>> empty;

constructor(public fn: (err: any) => Stream<T>,
public ins: Stream<T>) {
}

_start(out: Stream<T>): void {
this.out = out;
this.ins._add(this);
}

_stop(): void {
this.ins._remove(this);
}

_n(t: T) {
this.out._n(t);
}

_e(err: any) {
try {
this.ins._remove(this);
(this.ins = this.fn(err))._add(this);
} catch (e) {
this.out._e(e);
}
}

_c() {
this.out._c();
}
}
19 changes: 19 additions & 0 deletions tests/factory/combine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,23 @@ describe('xs.combine', () => {
},
});
});

it('should propagate user mistakes in project as errors', (done) => {
const stream1 = xs.interval(30).take(1);
const stream2 = xs.interval(50).take(4);
const stream = xs.combine(
(x, y) => <number> <any> (<string> <any> x).toLowerCase(),
stream1, stream2
);
stream.addListener({
next: () => done('next should not be called'),
error: (err) => {
assert.strictEqual(err.message, 'x.toLowerCase is not a function');
done();
},
complete: () => {
done('complete should not be called');
},
});
});
});
19 changes: 19 additions & 0 deletions tests/operator/combine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,23 @@ describe('Stream.prototype.combine', () => {
},
});
});

it('should propagate user mistakes in project as errors', (done) => {
const source = xs.interval(30).take(1);
const other = xs.interval(50).take(4);
const stream = source.combine(
(x, y) => <number> <any> (<string> <any> x).toLowerCase(),
other
);
stream.addListener({
next: () => done('next should not be called'),
error: (err) => {
assert.strictEqual(err.message, 'x.toLowerCase is not a function');
done();
},
complete: () => {
done('complete should not be called');
},
});
});
});
17 changes: 17 additions & 0 deletions tests/operator/debug.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,21 @@ describe('Stream.prototype.debug', () => {
};
stream.addListener(listener);
});

it('should propagate user mistakes in spy as errors', (done) => {
const source = xs.interval(30).take(1);
const stream = source.debug(
x => <number> <any> (<string> <any> x).toLowerCase()
);
stream.addListener({
next: () => done('next should not be called'),
error: (err) => {
assert.strictEqual(err.message, 'x.toLowerCase is not a function');
done();
},
complete: () => {
done('complete should not be called');
},
});
});
});
2 changes: 1 addition & 1 deletion tests/operator/drop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import * as assert from 'assert';

describe('Stream.prototype.drop', () => {
it('should allow specifying max amount to drop from input stream', (done) => {
const stream = xs.interval(50).drop(4)
const stream = xs.interval(50).drop(4);
const expected = [4, 5, 6];
let listener = {
next: (x: number) => {
Expand Down
17 changes: 17 additions & 0 deletions tests/operator/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,21 @@ describe('Stream.prototype.filter', () => {
};
stream.addListener(listener);
});

it('should propagate user mistakes in predicate as errors', (done) => {
const source = xs.interval(30).take(1);
const stream = source.filter(
x => (<string> <any> x).toLowerCase() === 'a'
);
stream.addListener({
next: () => done('next should not be called'),
error: (err) => {
assert.strictEqual(err.message, 'x.toLowerCase is not a function');
done();
},
complete: () => {
done('complete should not be called');
},
});
});
});
25 changes: 23 additions & 2 deletions tests/operator/flatten.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ describe('Stream.prototype.flatten', () => {
describe('with map', () => {
it('should expand each interval event with 3 sync events', (done) => {
const stream = xs.interval(100).take(3)
.map(i => xs.of(1 + i, 2 + i, 3 + i))
.flatten();
.map(i => xs.of(1 + i, 2 + i, 3 + i))
.flatten();
const expected = [1, 2, 3, 2, 3, 4, 3, 4, 5];
const listener = {
next: (x: number) => {
Expand Down Expand Up @@ -92,5 +92,26 @@ describe('Stream.prototype.flatten', () => {
};
stream.addListener(listener);
});

it('should propagate user mistakes in project as errors', (done) => {
const source = xs.interval(30).take(1);
const stream = source.map(
x => {
const y = (<string> <any> x).toLowerCase();
return xs.of(y);
}
).flatten();

stream.addListener({
next: () => done('next should not be called'),
error: (err) => {
assert.strictEqual(err.message, 'x.toLowerCase is not a function');
done();
},
complete: () => {
done('complete should not be called');
},
});
});
});
});
21 changes: 21 additions & 0 deletions tests/operator/flattenConcurrently.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,26 @@ describe('Stream.prototype.flattenConcurrently', () => {
};
stream.addListener(listener);
});

it('should propagate user mistakes in project as errors', (done) => {
const source = xs.interval(30).take(1);
const stream = source.map(
x => {
const y = (<string> <any> x).toLowerCase();
return xs.of(y);
}
).flattenConcurrently();

stream.addListener({
next: () => done('next should not be called'),
error: (err) => {
assert.strictEqual(err.message, 'x.toLowerCase is not a function');
done();
},
complete: () => {
done('complete should not be called');
},
});
});
});
});
Loading

0 comments on commit ffa5976

Please sign in to comment.