Skip to content

Commit

Permalink
fix(Subject.create): ensure operator property not required for Observ…
Browse files Browse the repository at this point in the history
…able subscription

Because Subject was calling Observable.prototype._subscribe internally, in a situation where a raw Observable was
passed into Subject.create() that had no operators applied to it, it would error because Observable.prototype._subscribe
assumes the existence of an operator in the operator property. This adds a check to correct that behavior in this cases

fixes #483
  • Loading branch information
benlesh committed Oct 8, 2015
1 parent 9bbf04f commit 2259de2
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 13 deletions.
20 changes: 10 additions & 10 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ import $$observable from './util/Symbol_observable';
/**
* A representation of any set of values over any amount of time. This the most basic building block
* of RxJS.
*
*
* @class Observable<T>
*/
export default class Observable<T> implements CoreOperators<T> {
source: Observable<any>;
operator: Operator<any, T>;
_isScalar: boolean = false;

/**
* @constructor
* @param {Function} subscribe the function that is
Expand All @@ -31,25 +31,25 @@ export default class Observable<T> implements CoreOperators<T> {
this._subscribe = subscribe;
}
}
// HACK: Since TypeScript inherits static properties too, we have to

// HACK: Since TypeScript inherits static properties too, we have to
// fight against TypeScript here so Subject can have a different static create signature
/**
* @static
* @method create
* @param {Function} subscribe? the subscriber function to be passed to the Observable constructor
* @returns {Observable} a new cold observable
* @description creates a new cold Observable by calling the Observable constructor
* @description creates a new cold Observable by calling the Observable constructor
*/
static create: Function = <T>(subscribe?: <R>(subscriber: Subscriber<R>) => Subscription<T>|Function|void) => {
return new Observable<T>(subscribe);
};

/**
* @method lift
* @param {Operator} operator the operator defining the operation to take on the observable
* @returns {Observable} a new observable with the Operator applied
* @description creates a new Observable, with this Observable as the source, and the passed
* @description creates a new Observable, with this Observable as the source, and the passed
* operator defined as the new observable's operator.
*/
lift<T, R>(operator: Operator<T, R>): Observable<T> {
Expand All @@ -70,7 +70,7 @@ export default class Observable<T> implements CoreOperators<T> {

/**
* @method subscribe
* @param {Observer|Function} observerOrNext (optional) either an observer defining all functions to be called,
* @param {Observer|Function} observerOrNext (optional) either an observer defining all functions to be called,
* or the first of three possible handlers, which is the handler for each value emitted from the observable.
* @param {Function} error (optional) a handler for a terminal event resulting from an error. If no error handler is provided,
* the error will be thrown as unhandled
Expand Down Expand Up @@ -116,11 +116,11 @@ export default class Observable<T> implements CoreOperators<T> {
PromiseCtor = root.Promise;
}
}

if(!PromiseCtor) {
throw new Error('no Promise impl found');
}

return new PromiseCtor<void>((resolve, reject) => {
this.subscribe(next, reject, resolve);
});
Expand Down
6 changes: 3 additions & 3 deletions src/Subject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@ const _subscriberNext = Subscriber.prototype._next;
const _subscriberError = Subscriber.prototype._error;
const _subscriberComplete = Subscriber.prototype._complete;

const _observableSubscribe = Observable.prototype._subscribe;

export default class Subject<T> extends Observable<T> implements Observer<T>, Subscription<T> {
_subscriptions: Subscription<T>[];
_unsubscribe: () => void;

static create<T>(source: Observable<T>, destination: Observer<T>): Subject<T> {
return new BidirectionalSubject(source, destination);
}
Expand Down Expand Up @@ -177,7 +176,8 @@ class BidirectionalSubject<T> extends Subject<T> {
}

_subscribe(subscriber: Subscriber<T>) {
return _observableSubscribe.call(this, subscriber);
const operator = this.operator;
return this.source._subscribe.call(this.source, operator ? operator.call(subscriber) : subscriber);
}

next(x) {
Expand Down

0 comments on commit 2259de2

Please sign in to comment.