diff --git a/spec/observables/from-spec.js b/spec/observables/from-spec.js index 1357038a6a..c98f1f9069 100644 --- a/spec/observables/from-spec.js +++ b/spec/observables/from-spec.js @@ -1,6 +1,8 @@ -/* globals describe, it, expect */ +/* globals describe, it, expect, Symbol */ var Rx = require('../../dist/cjs/Rx'); +var Promise = require('promise'); var Observable = Rx.Observable; +var $$iterator = require('../../dist/cjs/util/Symbol_iterator').default; describe('Observable.from', function () { it('should enumerate an Array', function (done) { @@ -10,4 +12,55 @@ describe('Observable.from', function () { expect(x).toBe(expected[i++]) }, null, done); }, 300); + + it('should handle a promise', function (done) { + var promise = Promise.resolve('pinky swear'); + + Observable.from(promise).subscribe(function (x) { + expect(x).toBe('pinky swear'); + }, null, done); + }); + + it('should handle an "observableque" object', function (done) { + var observablesque = {}; + observablesque[Symbol.observer] = function (observer) { + observer.next('test'); + observer.complete(); + }; + + Observable.from(observablesque).subscribe(function (x) { + expect(x).toBe('test'); + }, null, done); + }); + + it('should handle a string', function (done) { + var expected = ['a', 'b', 'c']; + Observable.from('abc').subscribe(function (x) { + expect(x).toBe(expected.shift()); + }, null, done); + }); + + it('should handle any iterable thing', function (done) { + var iterable = {}; + var iteratorResults = [ + { value: 'one', done: false }, + { value: 'two', done: false }, + { done: true } + ]; + var expected = ['one', 'two']; + + expect(Symbol.iterator).toBe($$iterator); + + iterable[Symbol.iterator] = function () { + return { + next: function() { + return iteratorResults.shift(); + } + } + }; + + Observable.from(iterable).subscribe(function (x) { + expect(x).toBe(expected.shift()); + }, null, done); + }); }); \ No newline at end of file diff --git a/src/Observable.ts b/src/Observable.ts index cc56b1c40d..61f89c6bf2 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -74,7 +74,7 @@ export default class Observable { // TODO: convert this to an `abstract` class in TypeScript 1.6. static defer: (observableFactory: () => Observable) => Observable; - static from: (iterable: any, project?: (x?: any, i?: number) => T, thisArg?: any, scheduler?: Scheduler) => Observable; + static from: (iterable: any, scheduler?: Scheduler) => Observable; static fromArray: (array: T[], scheduler?: Scheduler) => Observable; static fromEvent: (element: any, eventName: string, selector: (...args:Array) => T) => Observable; static fromEventPattern: (addHandler: (handler:Function)=>void, removeHandler: (handler:Function) => void, selector?: (...args:Array) => T) => Observable; diff --git a/src/Rx.ts b/src/Rx.ts index 89e454cf06..e09db1b4d1 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -15,7 +15,6 @@ import EmptyObservable from './observables/EmptyObservable'; import ErrorObservable from './observables/ErrorObservable'; import InfiniteObservable from './observables/InfiniteObservable'; import IntervalObservable from './observables/IntervalObservable'; -import IteratorObservable from './observables/IteratorObservable'; import PromiseObservable from './observables/PromiseObservable'; import RangeObservable from './observables/RangeObservable'; import ScalarObservable from './observables/ScalarObservable'; @@ -23,9 +22,10 @@ import TimerObservable from './observables/TimerObservable'; import FromEventPatternObservable from './observables/FromEventPatternObservable'; import FromEventObservable from './observables/FromEventObservable'; import ForkJoinObservable from './observables/ForkJoinObservable'; +import FromObservable from './observables/FromObservable'; Observable.defer = DeferObservable.create; -Observable.from = IteratorObservable.create; +Observable.from = FromObservable.create; Observable.fromArray = ArrayObservable.create; Observable.fromPromise = PromiseObservable.create; Observable.of = ArrayObservable.of; diff --git a/src/observables/FromObservable.ts b/src/observables/FromObservable.ts new file mode 100644 index 0000000000..04f71eb35b --- /dev/null +++ b/src/observables/FromObservable.ts @@ -0,0 +1,43 @@ +import PromiseObservable from './PromiseObservable'; +import IteratorObservable from'./IteratorObservable'; +import ArrayObservable from './ArrayObservable'; + +import isArray from '../util/isArray'; +import isPromise from '../util/isPromise'; +import isObservable from '../util/isObservable'; +import Scheduler from '../Scheduler'; +import $$observer from '../util/Symbol_observer'; +import Observable from '../Observable'; +import Subscriber from '../Subscriber'; +import { ObserveOnSubscriber } from '../operators/observeOn'; + +export default class FromObservable extends Observable { + constructor(private observablesque: any, private scheduler: Scheduler) { + super(null); + } + + static create(observablesque: any, scheduler: Scheduler = Scheduler.immediate): Observable { + if (isArray(observablesque)) { + return new ArrayObservable(observablesque, scheduler); + } else if (isPromise(observablesque)) { + return new PromiseObservable(observablesque, scheduler); + } else if (isObservable(observablesque)) { + if(observablesque instanceof Observable) { + return observablesque; + } + return new FromObservable(observablesque, scheduler); + } else { + return new IteratorObservable(observablesque, null, null, scheduler); + } + } + + _subscribe(subscriber: Subscriber) { + const observablesque = this.observablesque; + const scheduler = this.scheduler; + if(scheduler === Scheduler.immediate) { + return this.observablesque[$$observer](subscriber); + } else { + return this.observablesque[$$observer](new ObserveOnSubscriber(subscriber, scheduler, 0)); + } + } +} \ No newline at end of file diff --git a/src/observables/IteratorObservable.ts b/src/observables/IteratorObservable.ts index fd21f3957b..3a707db2f9 100644 --- a/src/observables/IteratorObservable.ts +++ b/src/observables/IteratorObservable.ts @@ -2,7 +2,7 @@ import Scheduler from '../Scheduler'; import Observable from '../Observable'; import {root} from '../util/root'; -import $iterator$ from '../util/Symbol_iterator'; +import $$iterator from '../util/Symbol_iterator'; import tryCatch from '../util/tryCatch'; import {errorObject} from '../util/errorObject'; @@ -108,7 +108,7 @@ class StringIterator { private idx: number = 0, private len: number = str.length) { } - [$iterator$]() { return (this); } + [$$iterator]() { return (this); } next() { return this.idx < this.len ? { done: false, @@ -125,7 +125,7 @@ class ArrayIterator { private idx: number = 0, private len: number = toLength(arr)) { } - [$iterator$]() { return this; } + [$$iterator]() { return this; } next() { return this.idx < this.len ? { done: false, @@ -138,7 +138,7 @@ class ArrayIterator { } function getIterator(o) { - const i = o[$iterator$]; + const i = o[$$iterator]; if (!i && typeof o === 'string') { return new StringIterator(o); } @@ -148,7 +148,7 @@ function getIterator(o) { if (!i) { throw new TypeError('Object is not iterable'); } - return o[$iterator$](); + return o[$$iterator](); } function toLength(o) { diff --git a/src/observables/PromiseObservable.ts b/src/observables/PromiseObservable.ts index f5d71c521d..48b6b6ff01 100644 --- a/src/observables/PromiseObservable.ts +++ b/src/observables/PromiseObservable.ts @@ -1,29 +1,42 @@ import Observable from '../Observable'; import Subscriber from '../Subscriber'; +import Scheduler from '../Scheduler'; +import Subscription from '../Subscription'; export default class PromiseObservable extends Observable { - static create(promise: Promise) { - return new PromiseObservable(promise); + static create(promise: Promise, scheduler: Scheduler = Scheduler.immediate) { + return new PromiseObservable(promise, scheduler); } - constructor(protected promise: Promise) { + constructor(private promise: Promise, private scheduler: Scheduler) { super(); } _subscribe(subscriber: Subscriber) { - this.promise.then( - (x) => { - if(!subscriber.isUnsubscribed) { - subscriber.next(x); + const scheduler = this.scheduler; + const promise = this.promise; + + if (scheduler === Scheduler.immediate) { + promise.then(value => { + subscriber.next(value); subscriber.complete(); - } - }, - (e) => { - if(!subscriber.isUnsubscribed) { - subscriber.error(e); - } - } - ); + }, + err => subscriber.error(err)); + } else { + let subscription = new Subscription(); + promise.then(value => subscription.add(scheduler.schedule(0, { value, subscriber }, dispatchNext)), + err => subscription.add(scheduler.schedule(0, { err, subscriber }, dispatchError))); + return subscription; + } } } + +function dispatchNext({ value, subscriber }) { + subscriber.next(value); + subscriber.complete(); +} + +function dispatchError({ err, subscriber }) { + subscriber.error(err); +} diff --git a/src/util/Symbol_iterator.ts b/src/util/Symbol_iterator.ts index e06ea76dab..5783d36431 100644 --- a/src/util/Symbol_iterator.ts +++ b/src/util/Symbol_iterator.ts @@ -16,7 +16,7 @@ if (!root.Symbol.iterator) { } } -export default root.Symbol.dispose; +export default root.Symbol.iterator; // // Shim in iterator support // export var $iterator$ = (typeof Symbol === 'function' && Symbol.iterator) || '_es6shim_iterator_'; diff --git a/src/util/isArray.ts b/src/util/isArray.ts new file mode 100644 index 0000000000..09bdc628a6 --- /dev/null +++ b/src/util/isArray.ts @@ -0,0 +1,3 @@ +const isArray = Array.isArray || (x => x && typeof x.length === 'number'); + +export default isArray; \ No newline at end of file diff --git a/src/util/isObservable.ts b/src/util/isObservable.ts new file mode 100644 index 0000000000..227b0459d0 --- /dev/null +++ b/src/util/isObservable.ts @@ -0,0 +1,5 @@ +import $$observer from './Symbol_observer'; + +export default function isObservable(x): boolean { + return x && typeof x[$$observer] === 'function'; +} \ No newline at end of file diff --git a/src/util/isPromise.ts b/src/util/isPromise.ts new file mode 100644 index 0000000000..903c5b13d2 --- /dev/null +++ b/src/util/isPromise.ts @@ -0,0 +1,3 @@ +export default function isPromise(x): boolean { + return x && typeof x.then === 'function'; +} \ No newline at end of file