Skip to content

Commit

Permalink
fix(flatten): support TypeScript 3.4
Browse files Browse the repository at this point in the history
fix typings for later TS versions (3.4) (#278)
  • Loading branch information
wclr authored and staltz committed May 4, 2019
1 parent 152633f commit 1325bbf
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 32 deletions.
4 changes: 2 additions & 2 deletions src/extra/flattenConcurrently.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {Operator, Stream, OutSender, InternalListener} from '../index';
import { Operator, Stream, MemoryStream, OutSender, InternalListener } from '../index';

class FCIL<T> implements InternalListener<T>, OutSender<T> {
constructor(public out: Stream<T>,
Expand Down Expand Up @@ -88,6 +88,6 @@ export class FlattenConcOperator<T> implements Operator<Stream<T>, T> {
*
* @return {Stream}
*/
export default function flattenConcurrently<T>(ins: Stream<Stream<T>>): Stream<T> {
export default function flattenConcurrently<T>(ins: Stream<Stream<T> | MemoryStream<T>>): Stream<T> {
return new Stream<T>(new FlattenConcOperator(ins));
}
6 changes: 3 additions & 3 deletions src/extra/flattenConcurrentlyAtMost.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {Operator, Stream, OutSender, InternalListener} from '../index';
import { Operator, Stream, MemoryStream, OutSender, InternalListener } from '../index';

class FCAMIL<T> implements InternalListener<T>, OutSender<T> {
constructor(public out: Stream<T>,
Expand Down Expand Up @@ -125,8 +125,8 @@ export class FlattenConcAMOperator<T> implements Operator<Stream<T>, T> {
*
* @return {Stream}
*/
export default function flattenConcurrentlyAtMost<T>(n: number): (ins: Stream<Stream<T>>) => Stream<T> {
return function flattenConcAMOperator(ins: Stream<Stream<T>>) {
export default function flattenConcurrentlyAtMost<T>(n: number): (ins: Stream<Stream<T> | MemoryStream<T>>) => Stream<T> {
return function flattenConcAMOperator(ins: Stream<Stream<T> | MemoryStream<T>>) {
return new Stream<T>(new FlattenConcAMOperator(n, ins));
};
}
4 changes: 2 additions & 2 deletions src/extra/flattenSequentially.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {Operator, Stream, InternalListener} from '../index';
import { Operator, Stream, MemoryStream, InternalListener } from '../index';

class FSInner<T> implements InternalListener<T> {
constructor(private out: Stream<T>,
Expand Down Expand Up @@ -122,6 +122,6 @@ export class FlattenSeqOperator<T> implements Operator<Stream<T>, T> {
*
* @return {Stream}
*/
export default function flattenSequentially<T>(ins: Stream<Stream<T>>): Stream<T> {
export default function flattenSequentially<T>(ins: Stream<Stream<T> | MemoryStream<T>>): Stream<T> {
return new Stream<T>(new FlattenSeqOperator(ins));
}
50 changes: 25 additions & 25 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import $$observable from 'symbol-observable';

const NO = {};
function noop() {}
function noop() { }

function cp<T>(a: Array<T>): Array<T> {
const l = a.length;
Expand Down Expand Up @@ -93,15 +93,15 @@ function internalizeProducer<T>(producer: Producer<T> & Partial<InternalProducer
}

class StreamSub<T> implements Subscription {
constructor(private _stream: Stream<T>, private _listener: InternalListener<T>) {}
constructor(private _stream: Stream<T>, private _listener: InternalListener<T>) { }

unsubscribe(): void {
this._stream._remove(this._listener);
}
}

class Observer<T> implements Listener<T> {
constructor(private _listener: InternalListener<T>) {}
constructor(private _listener: InternalListener<T>) { }

next(value: T) {
this._listener._n(value);
Expand Down Expand Up @@ -772,7 +772,7 @@ class Flatten<T> implements Operator<Stream<T>, T> {
_n(s: Stream<T>) {
const u = this.out;
if (u === NO) return;
const {inner, il} = this;
const { inner, il } = this;
if (inner !== NO && il !== NO_IL) inner._remove(il);
(this.inner = s)._add(this.il = new FlattenListener(u, this));
}
Expand Down Expand Up @@ -1182,16 +1182,16 @@ export class Stream<T> implements InternalListener<T> {
_hasNoSinks(x: InternalListener<any>, trace: Array<any>): boolean {
if (trace.indexOf(x) !== -1)
return true; else
if ((x as any as OutSender<any>).out === this)
return true; else
if ((x as any as OutSender<any>).out && (x as any as OutSender<any>).out !== NO)
return this._hasNoSinks((x as any as OutSender<any>).out, trace.concat(x)); else
if ((x as Stream<any>)._ils) {
for (let i = 0, N = (x as Stream<any>)._ils.length; i < N; i++)
if (!this._hasNoSinks((x as Stream<any>)._ils[i], trace.concat(x)))
return false;
return true;
} else return false;
if ((x as any as OutSender<any>).out === this)
return true; else
if ((x as any as OutSender<any>).out && (x as any as OutSender<any>).out !== NO)
return this._hasNoSinks((x as any as OutSender<any>).out, trace.concat(x)); else
if ((x as Stream<any>)._ils) {
for (let i = 0, N = (x as Stream<any>)._ils.length; i < N; i++)
if (!this._hasNoSinks((x as Stream<any>)._ils[i], trace.concat(x)))
return false;
return true;
} else return false;
}

private ctor(): typeof Stream {
Expand Down Expand Up @@ -1251,7 +1251,7 @@ export class Stream<T> implements InternalListener<T> {
static create<T>(producer?: Producer<T>): Stream<T> {
if (producer) {
if (typeof producer.start !== 'function'
|| typeof producer.stop !== 'function')
|| typeof producer.stop !== 'function')
throw new Error('producer requires both start and stop functions');
internalizeProducer(producer); // mutates the input
}
Expand Down Expand Up @@ -1285,7 +1285,7 @@ export class Stream<T> implements InternalListener<T> {
* @return {Stream}
*/
static never(): Stream<any> {
return new Stream<any>({_start: noop, _stop: noop});
return new Stream<any>({ _start: noop, _stop: noop });
}

/**
Expand Down Expand Up @@ -1342,10 +1342,10 @@ export class Stream<T> implements InternalListener<T> {
static from<T>(input: PromiseLike<T> | Stream<T> | Array<T> | Observable<T>): Stream<T> {
if (typeof input[$$observable] === 'function')
return Stream.fromObservable<T>(input as Observable<T>); else
if (typeof (input as PromiseLike<T>).then === 'function')
return Stream.fromPromise<T>(input as PromiseLike<T>); else
if (Array.isArray(input))
return Stream.fromArray<T>(input);
if (typeof (input as PromiseLike<T>).then === 'function')
return Stream.fromPromise<T>(input as PromiseLike<T>); else
if (Array.isArray(input))
return Stream.fromArray<T>(input);

throw new TypeError(`Type of input to from() must be an Array, Promise, or Observable`);
}
Expand Down Expand Up @@ -1417,7 +1417,7 @@ export class Stream<T> implements InternalListener<T> {
* @param {any} observable The observable to be converted as a stream.
* @return {Stream}
*/
static fromObservable<T>(obs: {subscribe: any}): Stream<T> {
static fromObservable<T>(obs: { subscribe: any }): Stream<T> {
if ((obs as Stream<T>).endWhen) return obs as Stream<T>;
const o = typeof obs[$$observable] === 'function' ? obs[$$observable]() : obs;
return new Stream<T>(new FromObservable(o));
Expand Down Expand Up @@ -1765,7 +1765,7 @@ export class Stream<T> implements InternalListener<T> {
*
* @return {Stream}
*/
flatten<R>(this: Stream<Stream<R>>): Stream<R> {
flatten<R>(this: Stream<Stream<R> | MemoryStream<R>>): Stream<R> {
return new Stream<R>(new Flatten(this));
}

Expand Down Expand Up @@ -1895,8 +1895,8 @@ export class Stream<T> implements InternalListener<T> {
imitate(target: Stream<T>): void {
if (target instanceof MemoryStream)
throw new Error('A MemoryStream was given to imitate(), but it only ' +
'supports a Stream. Read more about this restriction here: ' +
'https://github.com/staltz/xstream#faq');
'supports a Stream. Read more about this restriction here: ' +
'https://github.com/staltz/xstream#faq');
this._target = target;
for (let ils = this._ils, N = ils.length, i = 0; i < N; i++) target._add(ils[i]);
this._ils = [];
Expand Down Expand Up @@ -2049,7 +2049,7 @@ export class MemoryStream<T> extends Stream<T> {
}
}

export {NO, NO_IL};
export { NO, NO_IL };
const xs = Stream;
type xs<T> = Stream<T>;
export default xs;

0 comments on commit 1325bbf

Please sign in to comment.