Skip to content

Commit

Permalink
feat(imitate): move imitate() from MimicStream to Stream
Browse files Browse the repository at this point in the history
BREAKING CHANGE:
MimicStream and xs.createMimic() were removed entirely. The imitate() method now exists on every
Stream instance. To use the proxy stream technique, use xs.create() to create the proxy, then call
proxy.imitate(other).
  • Loading branch information
staltz committed Jun 12, 2016
1 parent 8a432b6 commit ad63372
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 123 deletions.
134 changes: 56 additions & 78 deletions src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1073,17 +1073,6 @@ export class Stream<T> implements InternalListener<T> {
return new MemoryStream<T>(<InternalProducer<T>> (<any> producer));
}

/**
* Creates a new MimicStream, which can `imitate` another Stream. Only a
* MimicStream has the `imitate()` method.
*
* @factory true
* @return {MimicStream}
*/
static createMimic<T>(): MimicStream<T> {
return new MimicStream<T>();
}

/**
* Creates a Stream that does nothing when started. It never emits any event.
*
Expand Down Expand Up @@ -1651,59 +1640,13 @@ export class Stream<T> implements InternalListener<T> {
}

/**
* Forces the Stream to emit the given value to its listeners.
*
* As the name indicates, if you use this, you are most likely doing something
* The Wrong Way. Please try to understand the reactive way before using this
* method. Use it only when you know what you are doing.
*
* @param value The "next" value you want to broadcast to all listeners of
* this Stream.
*/
shamefullySendNext(value: T) {
this._n(value);
}

/**
* Forces the Stream to emit the given error to its listeners.
*
* As the name indicates, if you use this, you are most likely doing something
* The Wrong Way. Please try to understand the reactive way before using this
* method. Use it only when you know what you are doing.
*
* @param {any} error The error you want to broadcast to all the listeners of
* this Stream.
*/
shamefullySendError(error: any) {
this._e(error);
}

/**
* Forces the Stream to emit the "completed" event to its listeners.
*
* As the name indicates, if you use this, you are most likely doing something
* The Wrong Way. Please try to understand the reactive way before using this
* method. Use it only when you know what you are doing.
*/
shamefullySendComplete() {
this._c();
}
}

export class MimicStream<T> extends Stream<T> {
constructor() {
super();
}

/**
* This method exists only on a MimicStream, which is created through
* `xs.createMimic()`. *imitate* changes this current MimicStream to behave
* like the `other` given stream.
* *imitate* changes this current Stream to emit the same events that the
* `other` given Stream does. This method returns nothing.
*
* The `imitate` method and the `MimicStream` type exist to allow one thing:
* **circular dependency of streams**. For instance, let's imagine that for
* some reason you need to create a circular dependency where stream `first$`
* depends on stream `second$` which in turn depends on `first$`:
* This method exists to allow one thing: **circular dependency of streams**.
* For instance, let's imagine that for some reason you need to create a
* circular dependency where stream `first$` depends on stream `second$`
* which in turn depends on `first$`:
*
* <!-- skip-example -->
* ```js
Expand All @@ -1714,39 +1657,35 @@ export class MimicStream<T> extends Stream<T> {
* ```
*
* However, that is invalid JavaScript, because `second$` is undefined
* on the first line. This is how a MimicStream and imitate can help solve it:
* on the first line. This is how *imitate* can help solve it:
*
* ```js
* import delay from 'xstream/extra/delay'
*
* var secondMimic$ = xs.createMimic();
* var first$ = secondMimic$.map(x => x * 10).take(3);
* var secondProxy$ = xs.create();
* var first$ = secondProxy$.map(x => x * 10).take(3);
* var second$ = first$.map(x => x + 1).startWith(1).compose(delay(100));
* secondMimic$.imitate(second$);
* secondProxy$.imitate(second$);
* ```
*
* We create `secondMimic$` before the others, so it can be used in the
* We create `secondProxy$` before the others, so it can be used in the
* declaration of `first$`. Then, after both `first$` and `second$` are
* defined, we hook `secondMimic$` with `second$` with `imitate()` to tell
* defined, we hook `secondProxy$` with `second$` with `imitate()` to tell
* that they are "the same". `imitate` will not trigger the start of any
* stream, it just binds `secondMimic$` and `second$` together.
* stream, it just binds `secondProxy$` and `second$` together.
*
* The following is an example where `imitate()` is important in Cycle.js
* applications. A parent component contains some child components. A child
* has an action stream which is given to the parent to define its state:
*
* <!-- skip-example -->
* ```js
* const childActionMimic$ = xs.createMimic();
* const parent = Parent({...sources, childAction$: childActionMimic$});
* const childActionProxy$ = xs.create();
* const parent = Parent({...sources, childAction$: childActionProxy$});
* const childAction$ = parent.state$.map(s => s.child.action$).flatten();
* childActionMimic$.imitate(childAction$);
* childActionProxy$.imitate(childAction$);
* ```
*
* The *imitate* method returns nothing. Instead, it changes the behavior of
* the current stream, making it re-emit whatever events are emitted by the
* given `other` stream.
*
* Note, though, that **`imitate()` does not support MemoryStreams**. If we
* would attempt to imitate a MemoryStream in a circular dependency, we would
* either get a race condition (where the symptom would be "nothing happens")
Expand All @@ -1759,7 +1698,7 @@ export class MimicStream<T> extends Stream<T> {
* MemoryStream, you should rework your code around `imitate()` to use a
* Stream instead. Look for the stream in the circular dependency that
* represents an event stream, and that would be a candidate for creating a
* MimicStream which then imitates the real event stream.
* proxy Stream which then imitates the target Stream.
*
* @param {Stream} other The stream to imitate on the current one. Must not be
* a MemoryStream.
Expand All @@ -1772,6 +1711,45 @@ export class MimicStream<T> extends Stream<T> {
}
other._setHIL(this);
}

/**
* Forces the Stream to emit the given value to its listeners.
*
* As the name indicates, if you use this, you are most likely doing something
* The Wrong Way. Please try to understand the reactive way before using this
* method. Use it only when you know what you are doing.
*
* @param value The "next" value you want to broadcast to all listeners of
* this Stream.
*/
shamefullySendNext(value: T) {
this._n(value);
}

/**
* Forces the Stream to emit the given error to its listeners.
*
* As the name indicates, if you use this, you are most likely doing something
* The Wrong Way. Please try to understand the reactive way before using this
* method. Use it only when you know what you are doing.
*
* @param {any} error The error you want to broadcast to all the listeners of
* this Stream.
*/
shamefullySendError(error: any) {
this._e(error);
}

/**
* Forces the Stream to emit the "completed" event to its listeners.
*
* As the name indicates, if you use this, you are most likely doing something
* The Wrong Way. Please try to understand the reactive way before using this
* method. Use it only when you know what you are doing.
*/
shamefullySendComplete() {
this._c();
}
}

export class MemoryStream<T> extends Stream<T> {
Expand Down
4 changes: 2 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
import {Stream, MemoryStream, MimicStream, Listener, Producer, Operator} from './core';
export {Stream, MemoryStream, MimicStream, Listener, Producer, Operator};
import {Stream, MemoryStream, Listener, Producer, Operator} from './core';
export {Stream, MemoryStream, Listener, Producer, Operator};
export default Stream;
72 changes: 31 additions & 41 deletions tests/mimicStream.ts → tests/operator/imitate.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,12 @@
/// <reference path="../typings/globals/mocha/index.d.ts" />
/// <reference path="../typings/globals/node/index.d.ts" />
import xs, {Producer, Listener, Stream, MemoryStream, MimicStream} from '../src/index';
import delay from '../src/extra/delay';
/// <reference path="../../typings/globals/mocha/index.d.ts" />
/// <reference path="../../typings/globals/node/index.d.ts" />
import xs, {Producer, Listener, Stream, MemoryStream} from '../../src/index';
import delay from '../../src/extra/delay';
import * as assert from 'assert';

describe('MimicStream', () => {
it('should be creatable with xs.createMimic()', (done) => {
const stream: MimicStream<number> = xs.createMimic<number>();
assert.equal(typeof stream.map, 'function');
assert.equal(typeof stream.take, 'function');
assert.equal(typeof stream.filter, 'function')
done();
});

describe('Stream.prototype.imitate', () => {
it('should be able to model a circular dependency in the stream graph', (done) => {
const fakeSecond = xs.createMimic<number>();
const fakeSecond = xs.create<number>();
const first = fakeSecond.map(x => x * 10).take(3);
const second = first.map(x => x + 1).startWith(1).compose(delay<number>(1));
fakeSecond.imitate(second);
Expand All @@ -33,7 +25,7 @@ describe('MimicStream', () => {
});

it('should broadcast the source stream to multiple listeners', (done) => {
const fakeSecond = xs.createMimic<number>();
const fakeSecond = xs.create<number>();
const first = fakeSecond.map(x => x * 10).take(3);
const second = first.map(x => x + 1).startWith(1).compose(delay<number>(100));
fakeSecond.imitate(second);
Expand Down Expand Up @@ -80,16 +72,16 @@ describe('MimicStream', () => {
});

it('should not cause leaked cyclic executions', (done) => {
const expectedMimic = [2, 4, 8, 16, 32 /* inertia due to stopping on next tick */];
const expectedProxy = [2, 4, 8, 16, 32 /* inertia due to stopping on next tick */];
const expectedResult = [2, 4, 8, 16];

const proxy$ = xs.createMimic<number>();
const proxy$ = xs.create<number>();
const source$ = proxy$.startWith(1).map(x => x * 2)
.debug(x => {
try {
assert.equal(expectedMimic.length > 0, true,
assert.equal(expectedProxy.length > 0, true,
'should be expecting the next value ' + x);
assert.equal(x, expectedMimic.shift());
assert.equal(x, expectedProxy.shift());
} catch (err) {
done(err);
}
Expand All @@ -103,7 +95,7 @@ describe('MimicStream', () => {
},
error: (err: any) => done(err),
complete: () => {
assert.equal(expectedMimic.length, 1); // still waiting for 32
assert.equal(expectedProxy.length, 1); // still waiting for 32
assert.equal(expectedResult.length, 0);
setTimeout(() => {
done();
Expand All @@ -112,30 +104,28 @@ describe('MimicStream', () => {
});
});

describe('imitate', () => {
it('should not by itself start the target stream execution', (done) => {
let nextDelivered = false;
const stream = xs.periodic(50).take(3).debug(() => {
nextDelivered = true;
});
const proxyStream = xs.createMimic<number>();

setTimeout(() => {
assert.equal(nextDelivered, false);
done();
}, 125);

proxyStream.imitate(stream);
it('should not by itself start the target stream execution', (done) => {
let nextDelivered = false;
const stream = xs.periodic(50).take(3).debug(() => {
nextDelivered = true;
});
const proxyStream = xs.create<number>();

it('should throw an error when given a MemoryStream', (done) => {
const stream = xs.periodic(50).take(3).remember();
assert.strictEqual(stream instanceof MemoryStream, true);
const proxyStream = xs.createMimic<number>();
assert.throws(() => {
proxyStream.imitate(stream);
});
setTimeout(() => {
assert.equal(nextDelivered, false);
done();
}, 125);

proxyStream.imitate(stream);
});

it('should throw an error when given a MemoryStream', (done) => {
const stream = xs.periodic(50).take(3).remember();
assert.strictEqual(stream instanceof MemoryStream, true);
const proxyStream = xs.create<number>();
assert.throws(() => {
proxyStream.imitate(stream);
});
done();
});
});
4 changes: 2 additions & 2 deletions tests/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ describe('Stream', () => {
it('should have all the core static operators', () => {
assert.equal(typeof xs.create, 'function');
assert.equal(typeof xs.createWithMemory, 'function');
assert.equal(typeof xs.createMimic, 'function');
assert.equal(typeof xs.never, 'function');
assert.equal(typeof xs.empty, 'function');
assert.equal(typeof xs.throw, 'function');
Expand Down Expand Up @@ -36,6 +35,7 @@ describe('Stream', () => {
assert.equal(typeof stream.compose, 'function');
assert.equal(typeof stream.remember, 'function');
assert.equal(typeof stream.debug, 'function');
assert.equal(typeof stream.imitate, 'function');
});

it('should be createable giving a custom producer object', (done) => {
Expand Down Expand Up @@ -349,7 +349,7 @@ describe('Stream', () => {
try {
stream.addListener(listener);
} catch (e) {
assert.equal(e.message, 'stream.addListener() requires all three ' +
assert.equal(e.message, 'stream.addListener() requires all three ' +
'next, error, and complete functions.');
done();
}
Expand Down

0 comments on commit ad63372

Please sign in to comment.