diff --git a/lib/rxjs/rxjs.ts b/lib/rxjs/rxjs.ts index 8a76d0ce8..d2681a496 100644 --- a/lib/rxjs/rxjs.ts +++ b/lib/rxjs/rxjs.ts @@ -27,30 +27,17 @@ declare let define: any; const completeSource = 'rxjs.Subscriber.complete'; const unsubscribeSource = 'rxjs.Subscriber.unsubscribe'; - const Observable = Rx.Observable; - - // monkey-patch Observable to save the - // current zone as ConstructorZone - Rx.Observable = function() { - Observable.apply(this, arguments); - this._zone = Zone.current; - + const patchObservableInstance = function(observable: any) { + observable._zone = Zone.current; // patch inner function this._subscribe to check // SubscriptionZone is same with ConstuctorZone or not - if (this._subscribe && typeof this._subscribe === 'function' && !this._originalSubscribe) { - this._originalSubscribe = this._subscribe; - this._subscribe = _patchedSubscribe; + if (observable._subscribe && typeof observable._subscribe === 'function' && + !observable._originalSubscribe) { + observable._originalSubscribe = observable._subscribe; + observable._subscribe = _patchedSubscribe; } - - return this; }; - Rx.Observable.prototype = Observable.prototype; - - const subscribe = Observable.prototype.subscribe; - const lift = Observable.prototype.lift; - const create = Observable.create; - const _patchedSubscribe = function() { const currentZone = Zone.current; const _zone = this._zone; @@ -84,124 +71,173 @@ declare let define: any; return tearDownLogic; }; - // patch Observable.prototype.subscribe - // if SubscripitionZone is different with ConstructorZone - // we should run _subscribe in ConstructorZone and - // create sinke in SubscriptionZone, - // and tearDown should also run into ConstructorZone - Observable.prototype.subscribe = function() { - const _zone = this._zone; - const currentZone = Zone.current; + const patchObservable = function(Rx: any, observableType: string) { + const symbol = Zone.__symbol__(observableType); + if (Rx[symbol]) { + // has patched + return; + } + + const Observable = Rx[symbol] = Rx[observableType]; + if (!Observable) { + return; + } - // if operator is involved, we should also - // patch the call method to save the Subscription zone - if (this.operator && _zone && _zone !== currentZone) { - const call = this.operator.call; - this.operator.call = function() { - const args = Array.prototype.slice.call(arguments); - const subscriber = args.length > 0 ? args[0] : undefined; - if (!subscriber._zone) { - subscriber._zone = currentZone; + // monkey-patch Observable to save the + // current zone as ConstructorZone + const patchedObservable: any = Rx[observableType] = function() { + Observable.apply(this, arguments); + patchObservableInstance(this); + return this; + }; + + patchedObservable.prototype = Observable.prototype; + Object.keys(Observable).forEach(key => { + patchedObservable[key] = Observable[key]; + }); + + const ObservablePrototype: any = Observable.prototype; + const symbolSubscribe = Zone.__symbol__('subscribe'); + + if (!ObservablePrototype[symbolSubscribe]) { + const subscribe = ObservablePrototype[symbolSubscribe] = ObservablePrototype.subscribe; + // patch Observable.prototype.subscribe + // if SubscripitionZone is different with ConstructorZone + // we should run _subscribe in ConstructorZone and + // create sinke in SubscriptionZone, + // and tearDown should also run into ConstructorZone + Observable.prototype.subscribe = function() { + const _zone = this._zone; + const currentZone = Zone.current; + + // if operator is involved, we should also + // patch the call method to save the Subscription zone + if (this.operator && _zone && _zone !== currentZone) { + const call = this.operator.call; + this.operator.call = function() { + const args = Array.prototype.slice.call(arguments); + const subscriber = args.length > 0 ? args[0] : undefined; + if (!subscriber._zone) { + subscriber._zone = currentZone; + } + return _zone.run(call, this, args); + }; } - return _zone.run(call, this, args); + const result = subscribe.apply(this, arguments); + // the result is the subscriber sink, + // we save the current Zone here + result._zone = currentZone; + return result; }; } - const result = subscribe.apply(this, arguments); - // the result is the subscriber sink, - // we save the current Zone here - result._zone = currentZone; - return result; - }; - // patch lift method to save ConstructorZone of Observable - Observable.prototype.lift = function() { - const observable = lift.apply(this, arguments); - observable._zone = Zone.current; - // patch inner function this._subscribe to check - // SubscriptionZone is same with ConstuctorZone or not - if (observable._subscribe && typeof observable._subscribe === 'function' && - !observable._originalSubscribe) { - observable._originalSubscribe = observable._subscribe; - observable._subscribe = _patchedSubscribe; - } + const symbolLift = Zone.__symbol__('lift'); + if (!ObservablePrototype[symbolLift]) { + const lift = ObservablePrototype[symbolLift] = ObservablePrototype.lift; - return observable; - }; + // patch lift method to save ConstructorZone of Observable + Observable.prototype.lift = function() { + const observable = lift.apply(this, arguments); + patchObservableInstance(observable); - // patch create method to save ConstructorZone of Observable - Rx.Observable.create = function() { - const observable = create.apply(this, arguments); - observable._zone = Zone.current; - // patch inner function this._subscribe to check - // SubscriptionZone is same with ConstuctorZone or not - if (observable._subscribe && typeof observable._subscribe === 'function' && - !observable._originalSubscribe) { - observable._originalSubscribe = observable._subscribe; - observable._subscribe = _patchedSubscribe; + return observable; + }; } - return observable; - }; - - const Subscriber = Rx.Subscriber; - - const next = Subscriber.prototype.next; - const error = Subscriber.prototype.error; - const complete = Subscriber.prototype.complete; - const unsubscribe = Subscriber.prototype.unsubscribe; + const symbolCreate = Zone.__symbol__('create'); + if (!Observable[symbolCreate]) { + const create = Observable[symbolCreate] = Observable.create; + // patch create method to save ConstructorZone of Observable + Rx.Observable.create = function() { + const observable = create.apply(this, arguments); + patchObservableInstance(observable); - // patch Subscriber.next to make sure it run - // into SubscriptionZone - Subscriber.prototype.next = function() { - const currentZone = Zone.current; - const subscriptionZone = this._zone; - - // for performance concern, check Zone.current - // equal with this._zone(SubscriptionZone) or not - if (subscriptionZone && subscriptionZone !== currentZone) { - return subscriptionZone.run(next, this, arguments, nextSource); - } else { - return next.apply(this, arguments); + return observable; + }; } }; - Subscriber.prototype.error = function() { - const currentZone = Zone.current; - const subscriptionZone = this._zone; - - // for performance concern, check Zone.current - // equal with this._zone(SubscriptionZone) or not - if (subscriptionZone && subscriptionZone !== currentZone) { - return subscriptionZone.run(error, this, arguments, nextSource); - } else { - return error.apply(this, arguments); - } + const patchSubscriber = function() { + const Subscriber = Rx.Subscriber; + + const next = Subscriber.prototype.next; + const error = Subscriber.prototype.error; + const complete = Subscriber.prototype.complete; + const unsubscribe = Subscriber.prototype.unsubscribe; + + // patch Subscriber.next to make sure it run + // into SubscriptionZone + Subscriber.prototype.next = function() { + const currentZone = Zone.current; + const subscriptionZone = this._zone; + + // for performance concern, check Zone.current + // equal with this._zone(SubscriptionZone) or not + if (subscriptionZone && subscriptionZone !== currentZone) { + return subscriptionZone.run(next, this, arguments, nextSource); + } else { + return next.apply(this, arguments); + } + }; + + Subscriber.prototype.error = function() { + const currentZone = Zone.current; + const subscriptionZone = this._zone; + + // for performance concern, check Zone.current + // equal with this._zone(SubscriptionZone) or not + if (subscriptionZone && subscriptionZone !== currentZone) { + return subscriptionZone.run(error, this, arguments, nextSource); + } else { + return error.apply(this, arguments); + } + }; + + Subscriber.prototype.complete = function() { + const currentZone = Zone.current; + const subscriptionZone = this._zone; + + // for performance concern, check Zone.current + // equal with this._zone(SubscriptionZone) or not + if (subscriptionZone && subscriptionZone !== currentZone) { + return subscriptionZone.run(complete, this, arguments, nextSource); + } else { + return complete.apply(this, arguments); + } + }; + + Subscriber.prototype.unsubscribe = function() { + const currentZone = Zone.current; + const subscriptionZone = this._zone; + + // for performance concern, check Zone.current + // equal with this._zone(SubscriptionZone) or not + if (subscriptionZone && subscriptionZone !== currentZone) { + return subscriptionZone.run(unsubscribe, this, arguments, nextSource); + } else { + return unsubscribe.apply(this, arguments); + } + }; }; - Subscriber.prototype.complete = function() { - const currentZone = Zone.current; - const subscriptionZone = this._zone; - - // for performance concern, check Zone.current - // equal with this._zone(SubscriptionZone) or not - if (subscriptionZone && subscriptionZone !== currentZone) { - return subscriptionZone.run(complete, this, arguments, nextSource); - } else { - return complete.apply(this, arguments); + const patchObservableFactoryCreator = function(obj: any, factoryName: string) { + const symbol = Zone.__symbol__(factoryName); + if (obj[symbol]) { + return; } + const factoryCreator: any = obj[symbol] = obj[factoryName]; + obj[factoryName] = function() { + const factory: any = factoryCreator.apply(this, arguments); + return function() { + const observable = factory.apply(this, arguments); + patchObservableInstance(observable); + return observable; + }; + }; }; - Subscriber.prototype.unsubscribe = function() { - const currentZone = Zone.current; - const subscriptionZone = this._zone; - - // for performance concern, check Zone.current - // equal with this._zone(SubscriptionZone) or not - if (subscriptionZone && subscriptionZone !== currentZone) { - return subscriptionZone.run(unsubscribe, this, arguments, nextSource); - } else { - return unsubscribe.apply(this, arguments); - } - }; + patchObservable(Rx, 'Observable'); + patchSubscriber(); + patchObservableFactoryCreator(Rx.Observable, 'bindCallback'); }); })); \ No newline at end of file diff --git a/test/rxjs/rxjs.spec.ts b/test/rxjs/rxjs.spec.ts index fffc360cb..e480e9467 100644 --- a/test/rxjs/rxjs.spec.ts +++ b/test/rxjs/rxjs.spec.ts @@ -6,14 +6,12 @@ * found in the LICENSE file at https://angular.io/license */ -let rxjs; +let rxjs: any; if (typeof window !== 'undefined') { rxjs = (window as any).Rx; } else if (typeof exports === 'object' && typeof module !== undefined) { - rxjs = require('rxjs'); + rxjs = require('rxjs/Rx'); } -const Observable = rxjs.Observable; -const Subscriber = rxjs.Subscriber; /** * The point of these tests, is to ensure that all callbacks execute in the Zone which was active @@ -33,16 +31,15 @@ describe('Zone interaction', () => { const constructorZone: Zone = Zone.current.fork({name: 'Constructor Zone'}); const subscriptionZone: Zone = Zone.current.fork({name: 'Subscription Zone'}); let subscriber: any = null; - const observable: any = - constructorZone.run(() => new Observable((_subscriber: any) => { - subscriber = _subscriber; - log.push('setup'); - expect(Zone.current.name).toEqual(constructorZone.name); - return () => { - expect(Zone.current.name).toEqual(constructorZone.name); - log.push('cleanup'); - }; - })); + const observable: any = constructorZone.run(() => new rxjs.Observable((_subscriber: any) => { + subscriber = _subscriber; + log.push('setup'); + expect(Zone.current.name).toEqual(constructorZone.name); + return () => { + expect(Zone.current.name).toEqual(constructorZone.name); + log.push('cleanup'); + }; + })); subscriptionZone.run( () => observable.subscribe( () => { @@ -75,20 +72,19 @@ describe('Zone interaction', () => { const rootZone: Zone = Zone.current; const constructorZone: Zone = Zone.current.fork({name: 'Constructor Zone'}); const subscriptionZone: Zone = Zone.current.fork({name: 'Subscription Zone'}); - const observable: any = - constructorZone.run(() => new Observable((subscriber: any) => { - // Execute the `next`/`complete` in different zone, and assert that - // correct zone - // is restored. - rootZone.run(() => { - subscriber.next('MyValue'); - subscriber.complete(); - }); - return () => { - expect(Zone.current.name).toEqual(constructorZone.name); - log.push('cleanup'); - }; - })); + const observable: any = constructorZone.run(() => new rxjs.Observable((subscriber: any) => { + // Execute the `next`/`complete` in different zone, and assert that + // correct zone + // is restored. + rootZone.run(() => { + subscriber.next('MyValue'); + subscriber.complete(); + }); + return () => { + expect(Zone.current.name).toEqual(constructorZone.name); + log.push('cleanup'); + }; + })); subscriptionZone.run( () => observable.subscribe( @@ -111,52 +107,7 @@ describe('Zone interaction', () => { const constructorZone: Zone = Zone.current.fork({name: 'Constructor Zone'}); const operatorZone: Zone = Zone.current.fork({name: 'Operator Zone'}); const subscriptionZone: Zone = Zone.current.fork({name: 'Subscription Zone'}); - let observable: any = - constructorZone.run(() => new Observable((subscriber: any) => { - // Execute the `next`/`complete` in different zone, and assert that - // correct zone - // is restored. - rootZone.run(() => { - subscriber.next('MyValue'); - subscriber.complete(); - }); - return () => { - expect(Zone.current.name).toEqual(constructorZone.name); - log.push('cleanup'); - }; - })); - - observable = operatorZone.run(() => observable.map((value: any) => { - expect(Zone.current.name).toEqual(operatorZone.name); - log.push('map: ' + value); - return value; - })); - - subscriptionZone.run( - () => observable.subscribe( - () => { - expect(Zone.current.name).toEqual(subscriptionZone.name); - log.push('next'); - }, - (e: any) => { - expect(Zone.current.name).toEqual(subscriptionZone.name); - log.push('error: ' + e); - }, - () => { - expect(Zone.current.name).toEqual(subscriptionZone.name); - log.push('complete'); - })); - - expect(log).toEqual(['map: MyValue', 'next', 'complete', 'cleanup']); - }); - - it('should run operators in the zone of declaration with Observable.create', () => { - const log: string[] = []; - const rootZone: Zone = Zone.current; - const constructorZone: Zone = Zone.current.fork({name: 'Constructor Zone'}); - const operatorZone: Zone = Zone.current.fork({name: 'Operator Zone'}); - const subscriptionZone: Zone = Zone.current.fork({name: 'Subscription Zone'}); - let observable: any = constructorZone.run(() => Observable.create((subscriber: any) => { + let observable: any = constructorZone.run(() => new rxjs.Observable((subscriber: any) => { // Execute the `next`/`complete` in different zone, and assert that // correct zone // is restored. @@ -194,5 +145,97 @@ describe('Zone interaction', () => { expect(log).toEqual(['map: MyValue', 'next', 'complete', 'cleanup']); }); + it('should run subscribe in zone of declaration with Observable.create', () => { + const log: string[] = []; + const constructorZone: Zone = Zone.current.fork({name: 'Constructor Zone'}); + let observable: any = constructorZone.run(() => rxjs.Observable.create((subscriber: any) => { + subscriber.next(1); + subscriber.complete(); + return () => { + expect(Zone.current.name).toEqual(constructorZone.name); + log.push('cleanup'); + }; + })); + observable.subscribe(() => { + log.push('next'); + }); + + expect(log).toEqual(['next', 'cleanup']); + }); + + it('should run in the zone when subscribe is called to the same Subject', () => { + const log: string[] = []; + const constructorZone: Zone = Zone.current.fork({name: 'Constructor Zone'}); + const subscriptionZone1: Zone = Zone.current.fork({name: 'Subscription Zone 1'}); + const subscriptionZone2: Zone = Zone.current.fork({name: 'Subscription Zone 2'}); + + let subject: any; + + constructorZone.run(() => { + subject = new rxjs.Subject(); + }); + + let subscription1: any; + let subscription2: any; + + subscriptionZone1.run(() => { + subscription1 = subject.subscribe( + () => { + expect(Zone.current.name).toEqual(subscriptionZone1.name); + log.push('next1'); + }, + () => {}, + () => { + expect(Zone.current.name).toEqual(subscriptionZone1.name); + log.push('complete1'); + }); + }); + + subscriptionZone2.run(() => { + subscription2 = subject.subscribe( + () => { + expect(Zone.current.name).toEqual(subscriptionZone2.name); + log.push('next2'); + }, + () => {}, + () => { + expect(Zone.current.name).toEqual(subscriptionZone2.name); + log.push('complete2'); + }); + }); + + subject.next(1); + subject.complete(); + + expect(log).toEqual(['next1', 'next2', 'complete1', 'complete2']); + }); + + it('bindCallback func callback should run in the correct zone', () => { + let log: string[] = []; + const constructorZone: Zone = Zone.current.fork({name: 'Constructor Zone'}); + const triggerZone: Zone = Zone.current.fork({name: 'Trigger Zone'}); + const subscriptionZone: Zone = Zone.current.fork({name: 'Subscription Zone'}); + + let func: any; + let boundFunc: any; + let observable: any; + + constructorZone.run(() => { + func = function(arg0: any, callback: Function) { + expect(Zone.current.name).toEqual(constructorZone.name); + callback(arg0); + }; + boundFunc = rxjs.Observable.bindCallback(func); + observable = boundFunc('test'); + }); + + subscriptionZone.run(() => { + observable.subscribe((arg: any) => { + log.push('next' + arg); + }); + }); + + expect(log).toEqual(['nexttest']); + }); }); \ No newline at end of file