Skip to content

Commit

Permalink
feat(sampleCombine): add sampleCombine extra
Browse files Browse the repository at this point in the history
SampleCombine combines a source stream with multiple other streams.
The result stream will emit the latest events from all input streams,
but only when the source stream emits.

If the source, or any input stream, throws an error, the result stream
will propagate the error. If any input streams end, their final emitted
value will remain in the array of any subsequent events from the result
stream. The result stream will only complete upon completion of the
source stream.

This extra operator implements the CombineSignature core interface.

Resolves #102.
  • Loading branch information
Christian Johns authored and xtianjohns committed Sep 23, 2016
1 parent 0fc6c62 commit d3aceed
Show file tree
Hide file tree
Showing 4 changed files with 321 additions and 1 deletion.
4 changes: 3 additions & 1 deletion markdown/footer.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ A: Read this [blog post](http://staltz.com/why-we-built-xstream.html) on the top

**Q: What is the equivalent of [`withLatestFrom`](http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#instance-method-withLatestFrom) in xstream?**

`withLatestFrom` is implemented as an extra named [`sampleCombine`](https://github.com/staltz/xstream/blob/master/EXTRA_DOCS.md#sampleCombine). It may also be composed with existing operators:

<!-- skip-example -->
```js
A.withLatestFrom(B, (a, b) => a + b)
```

can be achieved in *xstream* with
can be composed in *xstream* with

```js
B.map(b => A.map(a => a + b)).flatten()
Expand Down
155 changes: 155 additions & 0 deletions src/extra/sampleCombine.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import {CombineSignature, InternalListener, Operator, Stream} from '../core';

export class SampleCombineListener<T> implements InternalListener<T> {
constructor(private i: number, private p: SampleCombineOperator<any>) {
p.ils[i] = this;
}

_n(t: T): void {
if (!this.p.out) return;
this.p.vals[this.i] = t;
}

_e(err: any): void {
this.p._e(err);
}

_c(): void {
this.p.d(this.i, this);
}
}

export class SampleCombineOperator<T> implements Operator<T, Array<any>> {
public type = 'sampleCombine';
public out: Stream<Array<any>>;
public vals: Array<any> = [];
public Sc: number = 0;
public ils: Array<SampleCombineListener<any>> = [];

constructor(public ins: Stream<T>,
public streams: Array<Stream<any>>) {}

_start(out: Stream<Array<any>>): void {
if (!this.ins || !this.streams) {
out._n([]);
out._c();
} else {
this.Sc = this.streams.length;
this.ins._add(this);
this.out = out;
if (this.Sc) {
for (let i = 0; i < this.Sc; i++) {
this.vals[i] = undefined;
this.streams[i]._add(new SampleCombineListener<any>(i, this));
}
}
}
}

_stop(): void {
if (!this.ins || this.Sc) return;
this.ins._remove(this);
this.out = this.vals = null;
for (let i = 0; i < this.Sc; i++) {
this.streams[i]._remove(this.ils[i]);
}
}

_n(t: T): void {
if (!this.out) return;
this.out._n([t, ...this.vals]);
}

_e(err: any): void {
if (!this.out) return;
this.out._e(err);
}

_c(): void {
if (!this.out) return;
this.out._c();
}

d(i: number, l: SampleCombineListener<any>): void {
this.streams[i]._remove(l);
}
}

/**
* Combines a source stream with multiple other streams. The result stream
* will emit the latest events from all input streams, but only when the
* source stream emits.
*
* If the source, or any input stream, throws an error, the result stream
* will propagate the error. If any input streams end, their final emitted
* value will remain in the array of any subsequent events from the result
* stream.
*
* The result stream will only complete upon completion of the source stream.
*
* Marble diagram:
*
* ```text
* --1----2-----3--------4---
* ----a-----b-----c--d------
* sampleCombine
* --1?---2a----3b-------4d--
* ```
*
* Examples:
*
* ```js
* import sampleCombine from 'xstream/extra/sampleCombine'
* import xs from 'xstream'
*
* const sampler = xs.periodic(1000).take(3)
* const other = xs.periodic(100)
*
* const stream = sampleCombine(sampler, other)
*
* stream.addListener({
* next: i => console.log(i),
* error: err => console.error(err),
* complete: () => console.log('completed')
* })
* ```
*
* ```text
* > [0, 8]
* > [1, 18]
* > [2, 28]
* ```
*
* ```js
* import sampleCombine from 'xstream/extra/sampleCombine'
* import xs from 'xstream'
*
* const sampler = xs.periodic(1000).take(3)
* const other = xs.periodic(100).take(2)
*
* const stream = sampleCombine(sampler, other)
*
* stream.addListener({
* next: i => console.log(i),
* error: err => console.error(err),
* complete: () => console.log('completed')
* })
* ```
*
* ```text
* > [0, 1]
* > [1, 1]
* > [2, 1]
* ```
*
* @param {Stream} sampler The source stream of which to sample.
* @param {...Stream} streams One or more streams to combine.
* @return {Stream}
*/
let sampleCombine: CombineSignature;
sampleCombine = function(sampler: Stream<any>,
...streams: Array<Stream<any>>): Stream<Array<any>> {
return new Stream<Array<any>>(new SampleCombineOperator<any>(sampler, streams));
} as CombineSignature;

export default sampleCombine;
162 changes: 162 additions & 0 deletions tests/extra/sampleCombine.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/// <reference path="../../typings/globals/mocha/index.d.ts" />
/// <reference path="../../typings/globals/node/index.d.ts" />
import xs, {Stream} from '../../src/index';
import sampleCombine from '../../src/extra/sampleCombine';
import * as assert from 'assert';

describe('sampleCombine (extra)', () => {
it('should combine AND-style two streams together', (done) => {
const stream1 = xs.periodic(100).take(3);
const stream2 = xs.periodic(99).take(3);
const stream = sampleCombine(stream1, stream2);
let expected = [[0, 0], [1, 1], [2, 2]];
stream.addListener({
next: (x) => {
const e = expected.shift();
assert.equal(x[0], e[0]);
assert.equal(x[1], e[1]);
},
error: done,
complete: () => {
assert.equal(expected.length, 0);
done();
},
});
});

it('should have correct TypeScript signature', (done) => {
const stream1 = xs.create<string>({
start: listener => {},
stop: () => {}
});

const stream2 = xs.create<string>({
start: listener => {},
stop: () => {}
});

const combined: Stream<[string, string]> = sampleCombine(stream1, stream2);
done();
});

it('should complete only when the sample stream has completed', (done) => {
const stream1 = xs.periodic(100).take(4);
const stream2 = xs.periodic(99).take(1);
const stream = sampleCombine(stream1, stream2).map(arr => arr.join(''));
let expected = ['00', '10', '20', '30'];
stream.addListener({
next: (x) => {
assert.equal(x, expected.shift());
},
error: done,
complete: () => {
assert.equal(expected.length, 0);
done();
},
});
});

it('should emit an empty array if combining zero streams', (done) => {
const stream = sampleCombine();

stream.addListener({
next: (a) => {
assert.equal(Array.isArray(a), true);
assert.equal(a.length, 0);
},
error: done,
complete: () => {
done();
},
});
});

it('should just wrap the value if combining one stream', (done) => {
const source = xs.periodic(100).take(3);
const stream = sampleCombine(source);
let expected = [[0], [1], [2]];

stream.addListener({
next: (x) => {
const e = expected.shift();
assert.equal(x[0], e[0]);
},
error: done,
complete: () => {
assert.equal(expected.length, 0);
done();
},
});
});

it('should not break future listeners when SampleCombineProducer tears down', (done) => {
// --0---1--2---| innerA
// ----0----1---| innerB
// ----0-----1--2---| outer
// ------00--11-12--| stream
const innerA = xs.create<number>();
const innerB = xs.create<number>();
const outer = xs.create<number>();
const arrayInners: Array<Stream<number>> = [];
const stream = outer
.map(x => {
return sampleCombine(...arrayInners)
.map(combination => `${x}${combination.join('')}`);
})
.flatten();
const expected = ['00', '11', '12'];

setTimeout(() => {
arrayInners.push(innerA);
outer.shamefullySendNext(0);
}, 100);
setTimeout(() => {
innerA.shamefullySendNext(0);
}, 150);
setTimeout(() => {
innerB.shamefullySendNext(0);
}, 175);
setTimeout(() => {
arrayInners.push(innerB);
outer.shamefullySendNext(1);
innerA.shamefullySendNext(1);
}, 200);
setTimeout(() => {
innerA.shamefullySendNext(2);
outer.shamefullySendNext(2);
innerB.shamefullySendNext(1);
}, 250);
setTimeout(() => {
innerA.shamefullySendComplete();
innerB.shamefullySendComplete();
outer.shamefullySendComplete();
}, 550);

stream.addListener({
next: (x: string) => {
assert.equal(x, expected.shift());
},
error: (err: any) => done(err),
complete: () => {
assert.equal(expected.length, 0);
done();
},
});
});

it('should return a Stream when combining a MemoryStream with a Stream', (done) => {
const input1 = xs.periodic(80).take(4).remember();
const input2 = xs.periodic(50).take(3);
const stream: Stream<[number, number]> = sampleCombine(input1, input2);
assert.strictEqual(stream instanceof Stream, true);
done();
});

it('should return a Stream when combining a MemoryStream with a MemoryStream', (done) => {
const input1 = xs.periodic(80).take(4).remember();
const input2 = xs.periodic(50).take(3).remember();
const stream: Stream<[number, number]> = sampleCombine(input1, input2);
assert.strictEqual(stream instanceof Stream, true);
done();
});
});
1 change: 1 addition & 0 deletions tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"src/extra/fromDiagram.ts",
"src/extra/fromEvent.ts",
"src/extra/pairwise.ts",
"src/extra/sampleCombine.ts",
"src/extra/split.ts",
"src/extra/tween.ts",
"src/index.ts"
Expand Down

0 comments on commit d3aceed

Please sign in to comment.