Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix typings for later TS versions (3.4) #278

Merged
merged 1 commit into from
May 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/extra/flattenConcurrently.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {Operator, Stream, OutSender, InternalListener} from '../index';
import { Operator, Stream, MemoryStream, OutSender, InternalListener } from '../index';

class FCIL<T> implements InternalListener<T>, OutSender<T> {
constructor(public out: Stream<T>,
Expand Down Expand Up @@ -88,6 +88,6 @@ export class FlattenConcOperator<T> implements Operator<Stream<T>, T> {
*
* @return {Stream}
*/
export default function flattenConcurrently<T>(ins: Stream<Stream<T>>): Stream<T> {
export default function flattenConcurrently<T>(ins: Stream<Stream<T> | MemoryStream<T>>): Stream<T> {
return new Stream<T>(new FlattenConcOperator(ins));
}
6 changes: 3 additions & 3 deletions src/extra/flattenConcurrentlyAtMost.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {Operator, Stream, OutSender, InternalListener} from '../index';
import { Operator, Stream, MemoryStream, OutSender, InternalListener } from '../index';

class FCAMIL<T> implements InternalListener<T>, OutSender<T> {
constructor(public out: Stream<T>,
Expand Down Expand Up @@ -125,8 +125,8 @@ export class FlattenConcAMOperator<T> implements Operator<Stream<T>, T> {
*
* @return {Stream}
*/
export default function flattenConcurrentlyAtMost<T>(n: number): (ins: Stream<Stream<T>>) => Stream<T> {
return function flattenConcAMOperator(ins: Stream<Stream<T>>) {
export default function flattenConcurrentlyAtMost<T>(n: number): (ins: Stream<Stream<T> | MemoryStream<T>>) => Stream<T> {
return function flattenConcAMOperator(ins: Stream<Stream<T> | MemoryStream<T>>) {
return new Stream<T>(new FlattenConcAMOperator(n, ins));
};
}
4 changes: 2 additions & 2 deletions src/extra/flattenSequentially.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {Operator, Stream, InternalListener} from '../index';
import { Operator, Stream, MemoryStream, InternalListener } from '../index';

class FSInner<T> implements InternalListener<T> {
constructor(private out: Stream<T>,
Expand Down Expand Up @@ -122,6 +122,6 @@ export class FlattenSeqOperator<T> implements Operator<Stream<T>, T> {
*
* @return {Stream}
*/
export default function flattenSequentially<T>(ins: Stream<Stream<T>>): Stream<T> {
export default function flattenSequentially<T>(ins: Stream<Stream<T> | MemoryStream<T>>): Stream<T> {
return new Stream<T>(new FlattenSeqOperator(ins));
}
50 changes: 25 additions & 25 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import $$observable from 'symbol-observable';

const NO = {};
function noop() {}
function noop() { }

function cp<T>(a: Array<T>): Array<T> {
const l = a.length;
Expand Down Expand Up @@ -93,15 +93,15 @@ function internalizeProducer<T>(producer: Producer<T> & Partial<InternalProducer
}

class StreamSub<T> implements Subscription {
constructor(private _stream: Stream<T>, private _listener: InternalListener<T>) {}
constructor(private _stream: Stream<T>, private _listener: InternalListener<T>) { }

unsubscribe(): void {
this._stream._remove(this._listener);
}
}

class Observer<T> implements Listener<T> {
constructor(private _listener: InternalListener<T>) {}
constructor(private _listener: InternalListener<T>) { }

next(value: T) {
this._listener._n(value);
Expand Down Expand Up @@ -772,7 +772,7 @@ class Flatten<T> implements Operator<Stream<T>, T> {
_n(s: Stream<T>) {
const u = this.out;
if (u === NO) return;
const {inner, il} = this;
const { inner, il } = this;
if (inner !== NO && il !== NO_IL) inner._remove(il);
(this.inner = s)._add(this.il = new FlattenListener(u, this));
}
Expand Down Expand Up @@ -1182,16 +1182,16 @@ export class Stream<T> implements InternalListener<T> {
_hasNoSinks(x: InternalListener<any>, trace: Array<any>): boolean {
if (trace.indexOf(x) !== -1)
return true; else
if ((x as any as OutSender<any>).out === this)
return true; else
if ((x as any as OutSender<any>).out && (x as any as OutSender<any>).out !== NO)
return this._hasNoSinks((x as any as OutSender<any>).out, trace.concat(x)); else
if ((x as Stream<any>)._ils) {
for (let i = 0, N = (x as Stream<any>)._ils.length; i < N; i++)
if (!this._hasNoSinks((x as Stream<any>)._ils[i], trace.concat(x)))
return false;
return true;
} else return false;
if ((x as any as OutSender<any>).out === this)
return true; else
if ((x as any as OutSender<any>).out && (x as any as OutSender<any>).out !== NO)
return this._hasNoSinks((x as any as OutSender<any>).out, trace.concat(x)); else
if ((x as Stream<any>)._ils) {
for (let i = 0, N = (x as Stream<any>)._ils.length; i < N; i++)
if (!this._hasNoSinks((x as Stream<any>)._ils[i], trace.concat(x)))
return false;
return true;
} else return false;
}

private ctor(): typeof Stream {
Expand Down Expand Up @@ -1251,7 +1251,7 @@ export class Stream<T> implements InternalListener<T> {
static create<T>(producer?: Producer<T>): Stream<T> {
if (producer) {
if (typeof producer.start !== 'function'
|| typeof producer.stop !== 'function')
|| typeof producer.stop !== 'function')
throw new Error('producer requires both start and stop functions');
internalizeProducer(producer); // mutates the input
}
Expand Down Expand Up @@ -1285,7 +1285,7 @@ export class Stream<T> implements InternalListener<T> {
* @return {Stream}
*/
static never(): Stream<any> {
return new Stream<any>({_start: noop, _stop: noop});
return new Stream<any>({ _start: noop, _stop: noop });
}

/**
Expand Down Expand Up @@ -1342,10 +1342,10 @@ export class Stream<T> implements InternalListener<T> {
static from<T>(input: PromiseLike<T> | Stream<T> | Array<T> | Observable<T>): Stream<T> {
if (typeof input[$$observable] === 'function')
return Stream.fromObservable<T>(input as Observable<T>); else
if (typeof (input as PromiseLike<T>).then === 'function')
return Stream.fromPromise<T>(input as PromiseLike<T>); else
if (Array.isArray(input))
return Stream.fromArray<T>(input);
if (typeof (input as PromiseLike<T>).then === 'function')
return Stream.fromPromise<T>(input as PromiseLike<T>); else
if (Array.isArray(input))
return Stream.fromArray<T>(input);

throw new TypeError(`Type of input to from() must be an Array, Promise, or Observable`);
}
Expand Down Expand Up @@ -1417,7 +1417,7 @@ export class Stream<T> implements InternalListener<T> {
* @param {any} observable The observable to be converted as a stream.
* @return {Stream}
*/
static fromObservable<T>(obs: {subscribe: any}): Stream<T> {
static fromObservable<T>(obs: { subscribe: any }): Stream<T> {
if ((obs as Stream<T>).endWhen) return obs as Stream<T>;
const o = typeof obs[$$observable] === 'function' ? obs[$$observable]() : obs;
return new Stream<T>(new FromObservable(o));
Expand Down Expand Up @@ -1765,7 +1765,7 @@ export class Stream<T> implements InternalListener<T> {
*
* @return {Stream}
*/
flatten<R>(this: Stream<Stream<R>>): Stream<R> {
flatten<R>(this: Stream<Stream<R> | MemoryStream<R>>): Stream<R> {
return new Stream<R>(new Flatten(this));
}

Expand Down Expand Up @@ -1895,8 +1895,8 @@ export class Stream<T> implements InternalListener<T> {
imitate(target: Stream<T>): void {
if (target instanceof MemoryStream)
throw new Error('A MemoryStream was given to imitate(), but it only ' +
'supports a Stream. Read more about this restriction here: ' +
'https://github.com/staltz/xstream#faq');
'supports a Stream. Read more about this restriction here: ' +
'https://github.com/staltz/xstream#faq');
this._target = target;
for (let ils = this._ils, N = ils.length, i = 0; i < N; i++) target._add(ils[i]);
this._ils = [];
Expand Down Expand Up @@ -2049,7 +2049,7 @@ export class MemoryStream<T> extends Stream<T> {
}
}

export {NO, NO_IL};
export { NO, NO_IL };
const xs = Stream;
type xs<T> = Stream<T>;
export default xs;