Skip to content
This repository has been archived by the owner on Feb 26, 2024. It is now read-only.

feat(rxjs): fix #830, monkey patch rxjs to make rxjs run in correct zone #843

Merged
merged 18 commits into from
Jul 20, 2017
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion gulpfile.js
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,14 @@ gulp.task('build/sync-test.js', ['compile-esm'], function(cb) {
return generateScript('./lib/zone-spec/sync-test.ts', 'sync-test.js', false, cb);
});

gulp.task('build/rxjs.js', ['compile-esm'], function(cb) {
return generateScript('./lib/rxjs/rxjs.ts', 'zone-patch-rxjs.js', false, cb);
});

gulp.task('build/rxjs.min.js', ['compile-esm'], function(cb) {
return generateScript('./lib/rxjs/rxjs.ts', 'zone-patch-rxjs.min.js', true, cb);
});

gulp.task('build', [
'build/zone.js',
'build/zone.js.d.ts',
Expand Down Expand Up @@ -231,7 +239,9 @@ gulp.task('build', [
'build/wtf.min.js',
'build/async-test.js',
'build/fake-async-test.js',
'build/sync-test.js'
'build/sync-test.js',
'build/rxjs.js',
'build/rxjs.min.js'
]);

gulp.task('test/node', ['compile'], function(cb) {
Expand Down
3 changes: 2 additions & 1 deletion karma-base.conf.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ module.exports = function (config) {
'node_modules/systemjs/dist/system-polyfills.js',
'node_modules/systemjs/dist/system.src.js',
'node_modules/whatwg-fetch/fetch.js',
'node_modules/rxjs/bundles/Rx.js',
{pattern: 'test/assets/**/*.*', watched: true, served: true, included: false},
{pattern: 'build/**/*.js.map', watched: true, served: true, included: false},
{pattern: 'build/**/*.js', watched: true, served: true, included: false}
Expand All @@ -24,7 +25,7 @@ module.exports = function (config) {
require('karma-sourcemap-loader')
],

preprocessors: {
preprocessors: {
'**/*.js': ['sourcemap']
},

Expand Down
1 change: 1 addition & 0 deletions karma-build.conf.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ module.exports = function (config) {
config.files.push('build/lib/zone.js');
config.files.push('build/lib/common/promise.js');
config.files.push('build/lib/common/error-rewrite.js');
config.files.push('build/lib/rxjs/rxjs.js');
config.files.push('build/test/main.js');
};
1 change: 1 addition & 0 deletions karma-dist.conf.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ module.exports = function (config) {
config.files.push('dist/sync-test.js');
config.files.push('dist/task-tracking.js');
config.files.push('dist/wtf.js');
config.files.push('dist/zone-patch-rxjs.js');
config.files.push('build/test/main.js');
};
244 changes: 244 additions & 0 deletions lib/rxjs/rxjs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
/**
* @license
* Copyright Google Inc. All Rights Reserved.
*
* Use of this source code is governed by an MIT-style license that can be
* found in the LICENSE file at https://angular.io/license
*/

declare let define: any;
(function(root: any, factory: (Rx: any) => any) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

@JiaLiPassion JiaLiPassion Jul 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it into import * as Rx from 'rxjs/Rx'

if (typeof exports === 'object' && typeof module !== 'undefined') {
// Node, CommonJS-like
module.exports = factory(require('rxjs'));
} else if (typeof define === 'function' && define.amd) {
// AMD
define(['rxjs'], factory);
} else {
factory(root.Rx);
}
}(typeof window !== 'undefined' && window || typeof self !== 'undefined' && self || global,
(Rx: any) => {
'use strict';
Zone.__load_patch('rxjs', (global: any, Zone: ZoneType, api: _ZonePrivate) => {
const subscribeSource = 'rxjs.subscribe';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of these consts only nextSource seems to be used.

Copy link
Collaborator Author

@JiaLiPassion JiaLiPassion Jul 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have modified this one.

const nextSource = 'rxjs.Subscriber.next';
const errorSource = 'rxjs.Subscriber.error';
const completeSource = 'rxjs.Subscriber.complete';
const unsubscribeSource = 'rxjs.Subscriber.unsubscribe';

const patchObservableInstance = function(observable: any) {
observable._zone = Zone.current;
// patch inner function this._subscribe to check
// SubscriptionZone is same with ConstuctorZone or not
if (observable._subscribe && typeof observable._subscribe === 'function' &&
!observable._originalSubscribe) {
observable._originalSubscribe = observable._subscribe;
observable._subscribe = _patchedSubscribe;
}
};

const _patchedSubscribe = function() {
const currentZone = Zone.current;
const _zone = this._zone;

const args = Array.prototype.slice.call(arguments);
const subscriber = args.length > 0 ? args[0] : undefined;
// also keep currentZone in Subscriber
// for later Subscriber.next/error/complete method
if (subscriber && !subscriber._zone) {
subscriber._zone = currentZone;
}
// _subscribe should run in ConstructorZone
// but for performance concern, we should check
// whether ConsturctorZone === Zone.current here
const tearDownLogic = _zone !== Zone.current ?
_zone.run(this._originalSubscribe, this, args) :
this._originalSubscribe.apply(this, args);
if (tearDownLogic && typeof tearDownLogic === 'function') {
const patchedTearDownLogic = function() {
// tearDownLogic should also run in ConstructorZone
// but for performance concern, we should check
// whether ConsturctorZone === Zone.current here
if (_zone && _zone !== Zone.current) {
return _zone.run(tearDownLogic, this, arguments);
} else {
return tearDownLogic.apply(this, arguments);
}
};
return patchedTearDownLogic;
}
return tearDownLogic;
};

const patchObservable = function(Rx: any, observableType: string) {
const symbol = Zone.__symbol__(observableType);
if (Rx[symbol]) {
// has patched
return;
}

const Observable = Rx[symbol] = Rx[observableType];
if (!Observable) {
// the subclass of Observable not loaded
return;
}

// monkey-patch Observable to save the
// current zone as ConstructorZone
const patchedObservable: any = Rx[observableType] = function() {
Observable.apply(this, arguments);
patchObservableInstance(this);
return this;
};

patchedObservable.prototype = Observable.prototype;
Object.keys(Observable).forEach(key => {
patchedObservable[key] = Observable[key];
});

const ObservablePrototype: any = Observable.prototype;
const symbolSubscribe = Zone.__symbol__('subscribe');

if (!ObservablePrototype[symbolSubscribe]) {
const subscribe = ObservablePrototype[symbolSubscribe] = ObservablePrototype.subscribe;
// patch Observable.prototype.subscribe
// if SubscripitionZone is different with ConstructorZone
// we should run _subscribe in ConstructorZone and
// create sinke in SubscriptionZone,
// and tearDown should also run into ConstructorZone
Observable.prototype.subscribe = function() {
const _zone = this._zone;
const currentZone = Zone.current;

// if operator is involved, we should also
// patch the call method to save the Subscription zone
if (this.operator && _zone && _zone !== currentZone) {
const call = this.operator.call;
this.operator.call = function() {
const args = Array.prototype.slice.call(arguments);
const subscriber = args.length > 0 ? args[0] : undefined;
if (!subscriber._zone) {
subscriber._zone = currentZone;
}
return _zone.run(call, this, args);
};
}
const result = subscribe.apply(this, arguments);
// the result is the subscriber sink,
// we save the current Zone here
result._zone = currentZone;
return result;
};
}

const symbolLift = Zone.__symbol__('lift');
if (!ObservablePrototype[symbolLift]) {
const lift = ObservablePrototype[symbolLift] = ObservablePrototype.lift;

// patch lift method to save ConstructorZone of Observable
Observable.prototype.lift = function() {
const observable = lift.apply(this, arguments);
patchObservableInstance(observable);

return observable;
};
}

const symbolCreate = Zone.__symbol__('create');
if (!Observable[symbolCreate]) {
const create = Observable[symbolCreate] = Observable.create;
// patch create method to save ConstructorZone of Observable
Rx.Observable.create = function() {
const observable = create.apply(this, arguments);
patchObservableInstance(observable);

return observable;
};
}
};

const patchSubscriber = function() {
const Subscriber = Rx.Subscriber;

const next = Subscriber.prototype.next;
const error = Subscriber.prototype.error;
const complete = Subscriber.prototype.complete;
const unsubscribe = Subscriber.prototype.unsubscribe;

// patch Subscriber.next to make sure it run
// into SubscriptionZone
Subscriber.prototype.next = function() {
const currentZone = Zone.current;
const subscriptionZone = this._zone;

// for performance concern, check Zone.current
// equal with this._zone(SubscriptionZone) or not
if (subscriptionZone && subscriptionZone !== currentZone) {
return subscriptionZone.run(next, this, arguments, nextSource);
} else {
return next.apply(this, arguments);
}
};

Subscriber.prototype.error = function() {
const currentZone = Zone.current;
const subscriptionZone = this._zone;

// for performance concern, check Zone.current
// equal with this._zone(SubscriptionZone) or not
if (subscriptionZone && subscriptionZone !== currentZone) {
return subscriptionZone.run(error, this, arguments, nextSource);
} else {
return error.apply(this, arguments);
}
};

Subscriber.prototype.complete = function() {
const currentZone = Zone.current;
const subscriptionZone = this._zone;

// for performance concern, check Zone.current
// equal with this._zone(SubscriptionZone) or not
if (subscriptionZone && subscriptionZone !== currentZone) {
return subscriptionZone.run(complete, this, arguments, nextSource);
} else {
return complete.apply(this, arguments);
}
};

Subscriber.prototype.unsubscribe = function() {
const currentZone = Zone.current;
const subscriptionZone = this._zone;

// for performance concern, check Zone.current
// equal with this._zone(SubscriptionZone) or not
if (subscriptionZone && subscriptionZone !== currentZone) {
return subscriptionZone.run(unsubscribe, this, arguments, nextSource);
} else {
return unsubscribe.apply(this, arguments);
}
};
};

const patchObservableFactoryCreator = function(obj: any, factoryName: string) {
const symbol = Zone.__symbol__(factoryName);
if (obj[symbol]) {
return;
}
const factoryCreator: any = obj[symbol] = obj[factoryName];
obj[factoryName] = function() {
const factory: any = factoryCreator.apply(this, arguments);
return function() {
const observable = factory.apply(this, arguments);
patchObservableInstance(observable);
return observable;
};
};
};

patchObservable(Rx, 'Observable');
patchSubscriber();
patchObservableFactoryCreator(Rx.Observable, 'bindCallback');
});
}));
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
"phantomjs": "^2.1.7",
"promises-aplus-tests": "^2.1.2",
"pump": "^1.0.1",
"rxjs": "^5.4.2",
"selenium-webdriver": "^3.4.0",
"systemjs": "^0.19.37",
"ts-loader": "^0.6.0",
Expand Down
3 changes: 2 additions & 1 deletion test/browser_entry_point.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ import './browser/MediaQuery.spec';
import './browser/Notification.spec';
import './mocha-patch.spec';
import './jasmine-patch.spec';
import './extra/cordova.spec';
import './extra/cordova.spec';
import './rxjs/rxjs.spec';
1 change: 1 addition & 0 deletions test/common_tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ import './zone-spec/sync-test.spec';
import './zone-spec/fake-async-test.spec';
import './zone-spec/proxy.spec';
import './zone-spec/task-tracking.spec';
import './rxjs/rxjs.spec';

Error.stackTraceLimit = Number.POSITIVE_INFINITY;
3 changes: 2 additions & 1 deletion test/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ declare const __karma__: {
__karma__.loaded = function() {};
(window as any).global = window;

System.config({defaultJSExtensions: true});
System.config(
{defaultJSExtensions: true, map: {'rxjs/Rx': 'base/node_modules/rxjs/bundles/Rx.js'}});
let browserPatchedPromise: any = null;
if ((window as any)[(Zone as any).__symbol__('setTimeout')]) {
browserPatchedPromise = Promise.resolve('browserPatched');
Expand Down
1 change: 1 addition & 0 deletions test/node_entry_point.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import '../lib/zone-spec/proxy';
import '../lib/zone-spec/sync-test';
import '../lib/zone-spec/task-tracking';
import '../lib/zone-spec/wtf';
import '../lib/rxjs/rxjs';

// Setup test environment
import './test-env-setup-jasmine';
Expand Down
Loading