Skip to content

Commit

Permalink
feat(core): implement basic Observable interop.
Browse files Browse the repository at this point in the history
Add Stream.from(observable)
Add Stream.prototype.subscribe(observer)
Add Stream[Symbol.observable]
  • Loading branch information
TylorS committed Sep 15, 2016
1 parent 00601a6 commit 8fe7069
Showing 1 changed file with 104 additions and 0 deletions.
104 changes: 104 additions & 0 deletions src/core.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {Promise} from 'es6-promise';
import $$observable from 'symbol-observable';

const NO = {};
function noop() {}
Expand Down Expand Up @@ -56,6 +57,8 @@ export interface Listener<T> {
complete: () => void;
}

export type FromInput<T> = Promise<T> | Stream<T> | Array<T>

// mutates the input
function internalizeProducer<T>(producer: Producer<T>) {
(producer as InternalProducer<T> & Producer<T>)._start =
Expand Down Expand Up @@ -1060,6 +1063,42 @@ export class TakeOperator<T> implements Operator<T, T> {
}
}

class ObservableProducer<T> implements InternalProducer<T> {
public type = 'fromObservable';
public ins: any;
public out: Stream<T>;
private _unsusbcribe: () => void;

constructor (observable: any) {
this.ins = observable;
}

_start (out: Stream<T>) {
this.out = out;
this._unsusbcribe = this.ins.subscribe(new ObservableListener(out));
}

_stop () {
this._unsusbcribe();
}
}

class ObservableListener<T> implements Listener<T> {
constructor (private _listener: InternalListener<T>) {}

next (value: T) {
this._listener._n(value);
}

error (err: any) {
this._listener._e(err);
}

complete () {
this._listener._c();
}
}

export class Stream<T> implements InternalListener<T> {
public _prod: InternalProducer<T>;
protected _ils: Array<InternalListener<T>>; // 'ils' = Internal listeners
Expand Down Expand Up @@ -1223,6 +1262,33 @@ export class Stream<T> implements InternalListener<T> {
this._remove(listener as InternalListener<T> & Listener<T>);
}

/**
* Adds a Listener to the Stream returning a Subscription to remove that
* listener.
*
* @param {Listener<T>} listener
* @returns {Subscription<T>}
*
* @memberOf Stream
*/
subscribe(listener: Listener<T>): Subscription<T> {
this.addListener(listener);

return new Subscription<T>(this, listener);
}

/**
* Add interop between most.js and RxJS 5
*
* @returns
*
* @memberOf Stream
*
*/
[$$observable] (): Stream<T> {
return this;
}

/**
* Creates a new Stream given a Producer.
*
Expand Down Expand Up @@ -1318,6 +1384,25 @@ export class Stream<T> implements InternalListener<T> {
});
}

/**
* Creates a string from an array, promise, or an Observable.
*
* @factory true
* @param {Array|Promise|Stream} input The input to make a stream from.
* @return {Stream}
*/
static from<T>(input: FromInput<T>): Stream<T> {
if (typeof input[$$observable] === 'function') {
return Stream.fromObservable<T>(input);
} else if (typeof (input as Promise<T>).then === 'function') {
return Stream.fromPromise<T>(input as Promise<T>);
} else if (Array.isArray(input)) {
return Stream.fromArray<T>(input);
}

throw new TypeError(`Type of input to from() must be an Array, Promise, or Observable`);
}

/**
* Creates a Stream that immediately emits the arguments that you give to
* *of*, then completes.
Expand Down Expand Up @@ -1378,6 +1463,17 @@ export class Stream<T> implements InternalListener<T> {
return new Stream<T>(new FromPromiseProducer<T>(promise));
}

/**
* Converts an Observable into a Stream.
*
* @factory true
* @param {any} observable The observable to be converted as a stream.
* @return {Stream}
*/
static fromObservable<T>(observable: any): Stream<T> {
return new Stream<T>(new ObservableProducer(observable));
}

/**
* Creates a stream that periodically emits incremental numbers, every
* `period` milliseconds.
Expand Down Expand Up @@ -2015,4 +2111,12 @@ export class MemoryStream<T> extends Stream<T> {
}
}

class Subscription<T> {
constructor (private _stream: Stream<T>, private _listener: Listener<T>) {}

unsubscribe (): void {
this._stream.removeListener(this._listener);
}
}

export default Stream;

0 comments on commit 8fe7069

Please sign in to comment.