diff --git a/src/extra/flattenConcurrently.ts b/src/extra/flattenConcurrently.ts index bb358ed..9afd51c 100644 --- a/src/extra/flattenConcurrently.ts +++ b/src/extra/flattenConcurrently.ts @@ -1,4 +1,4 @@ -import {Operator, Stream, OutSender, InternalListener} from '../index'; +import { Operator, Stream, MemoryStream, OutSender, InternalListener } from '../index'; class FCIL implements InternalListener, OutSender { constructor(public out: Stream, @@ -88,6 +88,6 @@ export class FlattenConcOperator implements Operator, T> { * * @return {Stream} */ -export default function flattenConcurrently(ins: Stream>): Stream { +export default function flattenConcurrently(ins: Stream | MemoryStream>): Stream { return new Stream(new FlattenConcOperator(ins)); } diff --git a/src/extra/flattenConcurrentlyAtMost.ts b/src/extra/flattenConcurrentlyAtMost.ts index eb34103..57033ad 100644 --- a/src/extra/flattenConcurrentlyAtMost.ts +++ b/src/extra/flattenConcurrentlyAtMost.ts @@ -1,4 +1,4 @@ -import {Operator, Stream, OutSender, InternalListener} from '../index'; +import { Operator, Stream, MemoryStream, OutSender, InternalListener } from '../index'; class FCAMIL implements InternalListener, OutSender { constructor(public out: Stream, @@ -125,8 +125,8 @@ export class FlattenConcAMOperator implements Operator, T> { * * @return {Stream} */ -export default function flattenConcurrentlyAtMost(n: number): (ins: Stream>) => Stream { - return function flattenConcAMOperator(ins: Stream>) { +export default function flattenConcurrentlyAtMost(n: number): (ins: Stream | MemoryStream>) => Stream { + return function flattenConcAMOperator(ins: Stream | MemoryStream>) { return new Stream(new FlattenConcAMOperator(n, ins)); }; } diff --git a/src/extra/flattenSequentially.ts b/src/extra/flattenSequentially.ts index bcb9abc..55986bd 100644 --- a/src/extra/flattenSequentially.ts +++ b/src/extra/flattenSequentially.ts @@ -1,4 +1,4 @@ -import {Operator, Stream, InternalListener} from '../index'; +import { Operator, Stream, MemoryStream, InternalListener } from '../index'; class FSInner implements InternalListener { constructor(private out: Stream, @@ -122,6 +122,6 @@ export class FlattenSeqOperator implements Operator, T> { * * @return {Stream} */ -export default function flattenSequentially(ins: Stream>): Stream { +export default function flattenSequentially(ins: Stream | MemoryStream>): Stream { return new Stream(new FlattenSeqOperator(ins)); } diff --git a/src/index.ts b/src/index.ts index 2147813..04deffd 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,7 +1,7 @@ import $$observable from 'symbol-observable'; const NO = {}; -function noop() {} +function noop() { } function cp(a: Array): Array { const l = a.length; @@ -93,7 +93,7 @@ function internalizeProducer(producer: Producer & Partial implements Subscription { - constructor(private _stream: Stream, private _listener: InternalListener) {} + constructor(private _stream: Stream, private _listener: InternalListener) { } unsubscribe(): void { this._stream._remove(this._listener); @@ -101,7 +101,7 @@ class StreamSub implements Subscription { } class Observer implements Listener { - constructor(private _listener: InternalListener) {} + constructor(private _listener: InternalListener) { } next(value: T) { this._listener._n(value); @@ -772,7 +772,7 @@ class Flatten implements Operator, T> { _n(s: Stream) { const u = this.out; if (u === NO) return; - const {inner, il} = this; + const { inner, il } = this; if (inner !== NO && il !== NO_IL) inner._remove(il); (this.inner = s)._add(this.il = new FlattenListener(u, this)); } @@ -1182,16 +1182,16 @@ export class Stream implements InternalListener { _hasNoSinks(x: InternalListener, trace: Array): boolean { if (trace.indexOf(x) !== -1) return true; else - if ((x as any as OutSender).out === this) - return true; else - if ((x as any as OutSender).out && (x as any as OutSender).out !== NO) - return this._hasNoSinks((x as any as OutSender).out, trace.concat(x)); else - if ((x as Stream)._ils) { - for (let i = 0, N = (x as Stream)._ils.length; i < N; i++) - if (!this._hasNoSinks((x as Stream)._ils[i], trace.concat(x))) - return false; - return true; - } else return false; + if ((x as any as OutSender).out === this) + return true; else + if ((x as any as OutSender).out && (x as any as OutSender).out !== NO) + return this._hasNoSinks((x as any as OutSender).out, trace.concat(x)); else + if ((x as Stream)._ils) { + for (let i = 0, N = (x as Stream)._ils.length; i < N; i++) + if (!this._hasNoSinks((x as Stream)._ils[i], trace.concat(x))) + return false; + return true; + } else return false; } private ctor(): typeof Stream { @@ -1251,7 +1251,7 @@ export class Stream implements InternalListener { static create(producer?: Producer): Stream { if (producer) { if (typeof producer.start !== 'function' - || typeof producer.stop !== 'function') + || typeof producer.stop !== 'function') throw new Error('producer requires both start and stop functions'); internalizeProducer(producer); // mutates the input } @@ -1285,7 +1285,7 @@ export class Stream implements InternalListener { * @return {Stream} */ static never(): Stream { - return new Stream({_start: noop, _stop: noop}); + return new Stream({ _start: noop, _stop: noop }); } /** @@ -1342,10 +1342,10 @@ export class Stream implements InternalListener { static from(input: PromiseLike | Stream | Array | Observable): Stream { if (typeof input[$$observable] === 'function') return Stream.fromObservable(input as Observable); else - if (typeof (input as PromiseLike).then === 'function') - return Stream.fromPromise(input as PromiseLike); else - if (Array.isArray(input)) - return Stream.fromArray(input); + if (typeof (input as PromiseLike).then === 'function') + return Stream.fromPromise(input as PromiseLike); else + if (Array.isArray(input)) + return Stream.fromArray(input); throw new TypeError(`Type of input to from() must be an Array, Promise, or Observable`); } @@ -1417,7 +1417,7 @@ export class Stream implements InternalListener { * @param {any} observable The observable to be converted as a stream. * @return {Stream} */ - static fromObservable(obs: {subscribe: any}): Stream { + static fromObservable(obs: { subscribe: any }): Stream { if ((obs as Stream).endWhen) return obs as Stream; const o = typeof obs[$$observable] === 'function' ? obs[$$observable]() : obs; return new Stream(new FromObservable(o)); @@ -1765,7 +1765,7 @@ export class Stream implements InternalListener { * * @return {Stream} */ - flatten(this: Stream>): Stream { + flatten(this: Stream | MemoryStream>): Stream { return new Stream(new Flatten(this)); } @@ -1895,8 +1895,8 @@ export class Stream implements InternalListener { imitate(target: Stream): void { if (target instanceof MemoryStream) throw new Error('A MemoryStream was given to imitate(), but it only ' + - 'supports a Stream. Read more about this restriction here: ' + - 'https://github.com/staltz/xstream#faq'); + 'supports a Stream. Read more about this restriction here: ' + + 'https://github.com/staltz/xstream#faq'); this._target = target; for (let ils = this._ils, N = ils.length, i = 0; i < N; i++) target._add(ils[i]); this._ils = []; @@ -2049,7 +2049,7 @@ export class MemoryStream extends Stream { } } -export {NO, NO_IL}; +export { NO, NO_IL }; const xs = Stream; type xs = Stream; export default xs;