-
-
Notifications
You must be signed in to change notification settings - Fork 139
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(sampleCombine): add sampleCombine extra
SampleCombine combines a source stream with multiple other streams. The result stream will emit the latest events from all input streams, but only when the source stream emits. If the source, or any input stream, throws an error, the result stream will propagate the error. If any input streams end, their final emitted value will remain in the array of any subsequent events from the result stream. The result stream will only complete upon completion of the source stream. This extra operator implements the CombineSignature core interface. Resolves #102.
- Loading branch information
1 parent
0fc6c62
commit d3aceed
Showing
4 changed files
with
321 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
import {CombineSignature, InternalListener, Operator, Stream} from '../core'; | ||
|
||
export class SampleCombineListener<T> implements InternalListener<T> { | ||
constructor(private i: number, private p: SampleCombineOperator<any>) { | ||
p.ils[i] = this; | ||
} | ||
|
||
_n(t: T): void { | ||
if (!this.p.out) return; | ||
this.p.vals[this.i] = t; | ||
} | ||
|
||
_e(err: any): void { | ||
this.p._e(err); | ||
} | ||
|
||
_c(): void { | ||
this.p.d(this.i, this); | ||
} | ||
} | ||
|
||
export class SampleCombineOperator<T> implements Operator<T, Array<any>> { | ||
public type = 'sampleCombine'; | ||
public out: Stream<Array<any>>; | ||
public vals: Array<any> = []; | ||
public Sc: number = 0; | ||
public ils: Array<SampleCombineListener<any>> = []; | ||
|
||
constructor(public ins: Stream<T>, | ||
public streams: Array<Stream<any>>) {} | ||
|
||
_start(out: Stream<Array<any>>): void { | ||
if (!this.ins || !this.streams) { | ||
out._n([]); | ||
out._c(); | ||
} else { | ||
this.Sc = this.streams.length; | ||
this.ins._add(this); | ||
this.out = out; | ||
if (this.Sc) { | ||
for (let i = 0; i < this.Sc; i++) { | ||
this.vals[i] = undefined; | ||
this.streams[i]._add(new SampleCombineListener<any>(i, this)); | ||
} | ||
} | ||
} | ||
} | ||
|
||
_stop(): void { | ||
if (!this.ins || this.Sc) return; | ||
this.ins._remove(this); | ||
this.out = this.vals = null; | ||
for (let i = 0; i < this.Sc; i++) { | ||
this.streams[i]._remove(this.ils[i]); | ||
} | ||
} | ||
|
||
_n(t: T): void { | ||
if (!this.out) return; | ||
this.out._n([t, ...this.vals]); | ||
} | ||
|
||
_e(err: any): void { | ||
if (!this.out) return; | ||
this.out._e(err); | ||
} | ||
|
||
_c(): void { | ||
if (!this.out) return; | ||
this.out._c(); | ||
} | ||
|
||
d(i: number, l: SampleCombineListener<any>): void { | ||
this.streams[i]._remove(l); | ||
} | ||
} | ||
|
||
/** | ||
* Combines a source stream with multiple other streams. The result stream | ||
* will emit the latest events from all input streams, but only when the | ||
* source stream emits. | ||
* | ||
* If the source, or any input stream, throws an error, the result stream | ||
* will propagate the error. If any input streams end, their final emitted | ||
* value will remain in the array of any subsequent events from the result | ||
* stream. | ||
* | ||
* The result stream will only complete upon completion of the source stream. | ||
* | ||
* Marble diagram: | ||
* | ||
* ```text | ||
* --1----2-----3--------4--- | ||
* ----a-----b-----c--d------ | ||
* sampleCombine | ||
* --1?---2a----3b-------4d-- | ||
* ``` | ||
* | ||
* Examples: | ||
* | ||
* ```js | ||
* import sampleCombine from 'xstream/extra/sampleCombine' | ||
* import xs from 'xstream' | ||
* | ||
* const sampler = xs.periodic(1000).take(3) | ||
* const other = xs.periodic(100) | ||
* | ||
* const stream = sampleCombine(sampler, other) | ||
* | ||
* stream.addListener({ | ||
* next: i => console.log(i), | ||
* error: err => console.error(err), | ||
* complete: () => console.log('completed') | ||
* }) | ||
* ``` | ||
* | ||
* ```text | ||
* > [0, 8] | ||
* > [1, 18] | ||
* > [2, 28] | ||
* ``` | ||
* | ||
* ```js | ||
* import sampleCombine from 'xstream/extra/sampleCombine' | ||
* import xs from 'xstream' | ||
* | ||
* const sampler = xs.periodic(1000).take(3) | ||
* const other = xs.periodic(100).take(2) | ||
* | ||
* const stream = sampleCombine(sampler, other) | ||
* | ||
* stream.addListener({ | ||
* next: i => console.log(i), | ||
* error: err => console.error(err), | ||
* complete: () => console.log('completed') | ||
* }) | ||
* ``` | ||
* | ||
* ```text | ||
* > [0, 1] | ||
* > [1, 1] | ||
* > [2, 1] | ||
* ``` | ||
* | ||
* @param {Stream} sampler The source stream of which to sample. | ||
* @param {...Stream} streams One or more streams to combine. | ||
* @return {Stream} | ||
*/ | ||
let sampleCombine: CombineSignature; | ||
sampleCombine = function(sampler: Stream<any>, | ||
...streams: Array<Stream<any>>): Stream<Array<any>> { | ||
return new Stream<Array<any>>(new SampleCombineOperator<any>(sampler, streams)); | ||
} as CombineSignature; | ||
|
||
export default sampleCombine; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,162 @@ | ||
/// <reference path="../../typings/globals/mocha/index.d.ts" /> | ||
/// <reference path="../../typings/globals/node/index.d.ts" /> | ||
import xs, {Stream} from '../../src/index'; | ||
import sampleCombine from '../../src/extra/sampleCombine'; | ||
import * as assert from 'assert'; | ||
|
||
describe('sampleCombine (extra)', () => { | ||
it('should combine AND-style two streams together', (done) => { | ||
const stream1 = xs.periodic(100).take(3); | ||
const stream2 = xs.periodic(99).take(3); | ||
const stream = sampleCombine(stream1, stream2); | ||
let expected = [[0, 0], [1, 1], [2, 2]]; | ||
stream.addListener({ | ||
next: (x) => { | ||
const e = expected.shift(); | ||
assert.equal(x[0], e[0]); | ||
assert.equal(x[1], e[1]); | ||
}, | ||
error: done, | ||
complete: () => { | ||
assert.equal(expected.length, 0); | ||
done(); | ||
}, | ||
}); | ||
}); | ||
|
||
it('should have correct TypeScript signature', (done) => { | ||
const stream1 = xs.create<string>({ | ||
start: listener => {}, | ||
stop: () => {} | ||
}); | ||
|
||
const stream2 = xs.create<string>({ | ||
start: listener => {}, | ||
stop: () => {} | ||
}); | ||
|
||
const combined: Stream<[string, string]> = sampleCombine(stream1, stream2); | ||
done(); | ||
}); | ||
|
||
it('should complete only when the sample stream has completed', (done) => { | ||
const stream1 = xs.periodic(100).take(4); | ||
const stream2 = xs.periodic(99).take(1); | ||
const stream = sampleCombine(stream1, stream2).map(arr => arr.join('')); | ||
let expected = ['00', '10', '20', '30']; | ||
stream.addListener({ | ||
next: (x) => { | ||
assert.equal(x, expected.shift()); | ||
}, | ||
error: done, | ||
complete: () => { | ||
assert.equal(expected.length, 0); | ||
done(); | ||
}, | ||
}); | ||
}); | ||
|
||
it('should emit an empty array if combining zero streams', (done) => { | ||
const stream = sampleCombine(); | ||
|
||
stream.addListener({ | ||
next: (a) => { | ||
assert.equal(Array.isArray(a), true); | ||
assert.equal(a.length, 0); | ||
}, | ||
error: done, | ||
complete: () => { | ||
done(); | ||
}, | ||
}); | ||
}); | ||
|
||
it('should just wrap the value if combining one stream', (done) => { | ||
const source = xs.periodic(100).take(3); | ||
const stream = sampleCombine(source); | ||
let expected = [[0], [1], [2]]; | ||
|
||
stream.addListener({ | ||
next: (x) => { | ||
const e = expected.shift(); | ||
assert.equal(x[0], e[0]); | ||
}, | ||
error: done, | ||
complete: () => { | ||
assert.equal(expected.length, 0); | ||
done(); | ||
}, | ||
}); | ||
}); | ||
|
||
it('should not break future listeners when SampleCombineProducer tears down', (done) => { | ||
// --0---1--2---| innerA | ||
// ----0----1---| innerB | ||
// ----0-----1--2---| outer | ||
// ------00--11-12--| stream | ||
const innerA = xs.create<number>(); | ||
const innerB = xs.create<number>(); | ||
const outer = xs.create<number>(); | ||
const arrayInners: Array<Stream<number>> = []; | ||
const stream = outer | ||
.map(x => { | ||
return sampleCombine(...arrayInners) | ||
.map(combination => `${x}${combination.join('')}`); | ||
}) | ||
.flatten(); | ||
const expected = ['00', '11', '12']; | ||
|
||
setTimeout(() => { | ||
arrayInners.push(innerA); | ||
outer.shamefullySendNext(0); | ||
}, 100); | ||
setTimeout(() => { | ||
innerA.shamefullySendNext(0); | ||
}, 150); | ||
setTimeout(() => { | ||
innerB.shamefullySendNext(0); | ||
}, 175); | ||
setTimeout(() => { | ||
arrayInners.push(innerB); | ||
outer.shamefullySendNext(1); | ||
innerA.shamefullySendNext(1); | ||
}, 200); | ||
setTimeout(() => { | ||
innerA.shamefullySendNext(2); | ||
outer.shamefullySendNext(2); | ||
innerB.shamefullySendNext(1); | ||
}, 250); | ||
setTimeout(() => { | ||
innerA.shamefullySendComplete(); | ||
innerB.shamefullySendComplete(); | ||
outer.shamefullySendComplete(); | ||
}, 550); | ||
|
||
stream.addListener({ | ||
next: (x: string) => { | ||
assert.equal(x, expected.shift()); | ||
}, | ||
error: (err: any) => done(err), | ||
complete: () => { | ||
assert.equal(expected.length, 0); | ||
done(); | ||
}, | ||
}); | ||
}); | ||
|
||
it('should return a Stream when combining a MemoryStream with a Stream', (done) => { | ||
const input1 = xs.periodic(80).take(4).remember(); | ||
const input2 = xs.periodic(50).take(3); | ||
const stream: Stream<[number, number]> = sampleCombine(input1, input2); | ||
assert.strictEqual(stream instanceof Stream, true); | ||
done(); | ||
}); | ||
|
||
it('should return a Stream when combining a MemoryStream with a MemoryStream', (done) => { | ||
const input1 = xs.periodic(80).take(4).remember(); | ||
const input2 = xs.periodic(50).take(3).remember(); | ||
const stream: Stream<[number, number]> = sampleCombine(input1, input2); | ||
assert.strictEqual(stream instanceof Stream, true); | ||
done(); | ||
}); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters