Skip to content
This repository has been archived by the owner on Feb 26, 2024. It is now read-only.

feat(rxjs): fix #830, monkey patch rxjs to make rxjs run in correct zone #843

Merged
merged 18 commits into from
Jul 20, 2017
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion gulpfile.js
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,14 @@ gulp.task('build/sync-test.js', ['compile-esm'], function(cb) {
return generateScript('./lib/zone-spec/sync-test.ts', 'sync-test.js', false, cb);
});

gulp.task('build/rxjs.js', ['compile-esm'], function(cb) {
return generateScript('./lib/rxjs/rxjs.ts', 'zone-patch-rxjs.js', false, cb);
});

gulp.task('build/rxjs.min.js', ['compile-esm'], function(cb) {
return generateScript('./lib/rxjs/rxjs.ts', 'zone-patch-rxjs.min.js', true, cb);
});

gulp.task('build', [
'build/zone.js',
'build/zone.js.d.ts',
Expand Down Expand Up @@ -231,7 +239,9 @@ gulp.task('build', [
'build/wtf.min.js',
'build/async-test.js',
'build/fake-async-test.js',
'build/sync-test.js'
'build/sync-test.js',
'build/rxjs.js',
'build/rxjs.min.js'
]);

gulp.task('test/node', ['compile'], function(cb) {
Expand Down
3 changes: 2 additions & 1 deletion karma-base.conf.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ module.exports = function (config) {
'node_modules/systemjs/dist/system-polyfills.js',
'node_modules/systemjs/dist/system.src.js',
'node_modules/whatwg-fetch/fetch.js',
{pattern: 'node_modules/rxjs/bundles/Rx.js', watched: true, served: true, included: false},
{pattern: 'test/assets/**/*.*', watched: true, served: true, included: false},
{pattern: 'build/**/*.js.map', watched: true, served: true, included: false},
{pattern: 'build/**/*.js', watched: true, served: true, included: false}
Expand All @@ -24,7 +25,7 @@ module.exports = function (config) {
require('karma-sourcemap-loader')
],

preprocessors: {
preprocessors: {
'**/*.js': ['sourcemap']
},

Expand Down
2 changes: 2 additions & 0 deletions karma-dist.conf.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,7 @@ module.exports = function (config) {
config.files.push('dist/sync-test.js');
config.files.push('dist/task-tracking.js');
config.files.push('dist/wtf.js');
config.files.push('node_modules/rxjs/bundles/Rx.js');
config.files.push('dist/zone-patch-rxjs.js');
config.files.push('build/test/main.js');
};
234 changes: 234 additions & 0 deletions lib/rxjs/rxjs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
/**
* @license
* Copyright Google Inc. All Rights Reserved.
*
* Use of this source code is governed by an MIT-style license that can be
* found in the LICENSE file at https://angular.io/license
*/

import * as Rx from 'rxjs/Rx';

(Zone as any).__load_patch('rxjs', (global: any, Zone: ZoneType, api: any) => {
const symbol: (symbolString: string) => string = (Zone as any).__symbol__;
const subscribeSource = 'rxjs.subscribe';
const nextSource = 'rxjs.Subscriber.next';
const errorSource = 'rxjs.Subscriber.error';
const completeSource = 'rxjs.Subscriber.complete';
const unsubscribeSource = 'rxjs.Subscriber.unsubscribe';
const teardownSource = 'rxjs.Subscriber.teardownLogic';

const patchObservableInstance = function(observable: any) {
observable._zone = Zone.current;
// patch inner function this._subscribe to check
// SubscriptionZone is same with ConstuctorZone or not
if (observable._subscribe && typeof observable._subscribe === 'function' &&
!observable._originalSubscribe) {
observable._originalSubscribe = observable._subscribe;
observable._subscribe = _patchedSubscribe;
}
};

const _patchedSubscribe = function() {
const currentZone = Zone.current;
const _zone = this._zone;

const args = Array.prototype.slice.call(arguments);
const subscriber = args.length > 0 ? args[0] : undefined;
// also keep currentZone in Subscriber
// for later Subscriber.next/error/complete method
if (subscriber && !subscriber._zone) {
subscriber._zone = currentZone;
}
// _subscribe should run in ConstructorZone
// but for performance concern, we should check
// whether ConsturctorZone === Zone.current here
const tearDownLogic = _zone !== Zone.current ?
_zone.run(this._originalSubscribe, this, args, subscribeSource) :
this._originalSubscribe.apply(this, args);
if (tearDownLogic && typeof tearDownLogic === 'function') {
const patchedTearDownLogic = function() {
// tearDownLogic should also run in ConstructorZone
// but for performance concern, we should check
// whether ConsturctorZone === Zone.current here
if (_zone && _zone !== Zone.current) {
return _zone.run(tearDownLogic, this, arguments, teardownSource);
} else {
return tearDownLogic.apply(this, arguments);
}
};
return patchedTearDownLogic;
}
return tearDownLogic;
};

const patchObservable = function(Rx: any, observableType: string) {
const symbolObservable = symbol(observableType);

const Observable = Rx[observableType];
if (!Observable || Observable[symbolObservable]) {
// the subclass of Observable not loaded or have been patched
return;
}

// monkey-patch Observable to save the
// current zone as ConstructorZone
const patchedObservable: any = Rx[observableType] = function() {
Observable.apply(this, arguments);
patchObservableInstance(this);
return this;
};

patchedObservable.prototype = Observable.prototype;
patchedObservable[symbolObservable] = Observable;

Object.keys(Observable).forEach(key => {
patchedObservable[key] = Observable[key];
});

const ObservablePrototype: any = Observable.prototype;
const symbolSubscribe = symbol('subscribe');

if (!ObservablePrototype[symbolSubscribe]) {
const subscribe = ObservablePrototype[symbolSubscribe] = ObservablePrototype.subscribe;
// patch Observable.prototype.subscribe
// if SubscripitionZone is different with ConstructorZone
// we should run _subscribe in ConstructorZone and
// create sinke in SubscriptionZone,
// and tearDown should also run into ConstructorZone
Observable.prototype.subscribe = function() {
const _zone = this._zone;
const currentZone = Zone.current;

// if operator is involved, we should also
// patch the call method to save the Subscription zone
if (this.operator && _zone && _zone !== currentZone) {
const call = this.operator.call;
this.operator.call = function() {
const args = Array.prototype.slice.call(arguments);
const subscriber = args.length > 0 ? args[0] : undefined;
if (!subscriber._zone) {
subscriber._zone = currentZone;
}
return _zone.run(call, this, args, subscribeSource);
};
}
const result = subscribe.apply(this, arguments);
// the result is the subscriber sink,
// we save the current Zone here
if (!result._zone) {
result._zone = currentZone;
}
return result;
};
}

const symbolLift = symbol('lift');
if (!ObservablePrototype[symbolLift]) {
const lift = ObservablePrototype[symbolLift] = ObservablePrototype.lift;

// patch lift method to save ConstructorZone of Observable
Observable.prototype.lift = function() {
const observable = lift.apply(this, arguments);
patchObservableInstance(observable);

return observable;
};
}

const symbolCreate = symbol('create');
if (!patchedObservable[symbolCreate]) {
const create = patchedObservable[symbolCreate] = Observable.create;
// patch create method to save ConstructorZone of Observable
Rx.Observable.create = function() {
const observable = create.apply(this, arguments);
patchObservableInstance(observable);

return observable;
};
}
};

const patchSubscriber = function() {
const Subscriber = Rx.Subscriber;

const next = Subscriber.prototype.next;
const error = Subscriber.prototype.error;
const complete = Subscriber.prototype.complete;
const unsubscribe = Subscriber.prototype.unsubscribe;

// patch Subscriber.next to make sure it run
// into SubscriptionZone
Subscriber.prototype.next = function() {
const currentZone = Zone.current;
const subscriptionZone = this._zone;

// for performance concern, check Zone.current
// equal with this._zone(SubscriptionZone) or not
if (subscriptionZone && subscriptionZone !== currentZone) {
return subscriptionZone.run(next, this, arguments, nextSource);
} else {
return next.apply(this, arguments);
}
};

Subscriber.prototype.error = function() {
const currentZone = Zone.current;
const subscriptionZone = this._zone;

// for performance concern, check Zone.current
// equal with this._zone(SubscriptionZone) or not
if (subscriptionZone && subscriptionZone !== currentZone) {
return subscriptionZone.run(error, this, arguments, errorSource);
} else {
return error.apply(this, arguments);
}
};

Subscriber.prototype.complete = function() {
const currentZone = Zone.current;
const subscriptionZone = this._zone;

// for performance concern, check Zone.current
// equal with this._zone(SubscriptionZone) or not
if (subscriptionZone && subscriptionZone !== currentZone) {
return subscriptionZone.run(complete, this, arguments, completeSource);
} else {
return complete.apply(this, arguments);
}
};

Subscriber.prototype.unsubscribe = function() {
const currentZone = Zone.current;
const subscriptionZone = this._zone;

// for performance concern, check Zone.current
// equal with this._zone(SubscriptionZone) or not
if (subscriptionZone && subscriptionZone !== currentZone) {
return subscriptionZone.run(unsubscribe, this, arguments, unsubscribeSource);
} else {
return unsubscribe.apply(this, arguments);
}
};
};

const patchObservableFactoryCreator = function(obj: any, factoryName: string) {
const symbolFactory: string = symbol(factoryName);
if (obj[symbolFactory]) {
return;
}
const factoryCreator: any = obj[symbolFactory] = obj[factoryName];
obj[factoryName] = function() {
const factory: any = factoryCreator.apply(this, arguments);
return function() {
const observable = factory.apply(this, arguments);
patchObservableInstance(observable);
return observable;
};
};
};

patchObservable(Rx, 'Observable');
patchSubscriber();
patchObservableFactoryCreator(Rx.Observable, 'bindCallback');
patchObservableFactoryCreator(Rx.Observable, 'bindNodeCallback');
});
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
"webdriver-sauce-test": "node test/webdriver/test.sauce.js",
"ws-client": "node ./test/ws-client.js",
"ws-server": "node ./test/ws-server.js",
"tsc": "tsc",
"tsc:w": "tsc -w",
"tsc": "tsc -p .",
"tsc:w": "tsc -w -p .",
"test": "npm run tsc && concurrently \"npm run tsc:w\" \"npm run ws-server\" \"npm run karma-jasmine\"",
"test:phantomjs": "npm run tsc && concurrently \"npm run tsc:w\" \"npm run ws-server\" \"npm run karma-jasmine:phantomjs\"",
"test:phantomjs-single": "concurrently \"npm run ws-server\" \"npm run karma-jasmine-phantomjs:autoclose\"",
Expand Down Expand Up @@ -87,6 +87,7 @@
"phantomjs": "^2.1.7",
"promises-aplus-tests": "^2.1.2",
"pump": "^1.0.1",
"rxjs": "^5.4.2",
"selenium-webdriver": "^3.4.0",
"systemjs": "^0.19.37",
"ts-loader": "^0.6.0",
Expand Down
3 changes: 2 additions & 1 deletion test/browser-zone-setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ import '../lib/zone-spec/proxy';
import '../lib/zone-spec/sync-test';
import '../lib/zone-spec/task-tracking';
import '../lib/zone-spec/wtf';
import '../lib/extra/cordova';
import '../lib/extra/cordova';
import '../lib/rxjs/rxjs';
3 changes: 2 additions & 1 deletion test/browser_entry_point.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ import './browser/MediaQuery.spec';
import './browser/Notification.spec';
import './mocha-patch.spec';
import './jasmine-patch.spec';
import './extra/cordova.spec';
import './extra/cordova.spec';
import './rxjs/rxjs.spec';
1 change: 1 addition & 0 deletions test/common_tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ import './zone-spec/sync-test.spec';
import './zone-spec/fake-async-test.spec';
import './zone-spec/proxy.spec';
import './zone-spec/task-tracking.spec';
import './rxjs/rxjs.spec';

Error.stackTraceLimit = Number.POSITIVE_INFINITY;
11 changes: 11 additions & 0 deletions test/global-rxjs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/**
* @license
* Copyright Google Inc. All Rights Reserved.
*
* Use of this source code is governed by an MIT-style license that can be
* found in the LICENSE file at https://angular.io/license
*/
const globalRx: any = (window as any).Rx;
exports.Observable = globalRx.Observable;
exports.Subject = globalRx.Subject;
exports.Scheduler = globalRx.Scheduler;
9 changes: 8 additions & 1 deletion test/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,18 @@ declare const __karma__: {
__karma__.loaded = function() {};
(window as any).global = window;

System.config({defaultJSExtensions: true});
let browserPatchedPromise: any = null;
if ((window as any)[(Zone as any).__symbol__('setTimeout')]) {
System.config({
defaultJSExtensions: true,
map: {'rxjs/Rx': 'base/build/test/global-rxjs.js'},
});
browserPatchedPromise = Promise.resolve('browserPatched');
} else {
System.config({
defaultJSExtensions: true,
map: {'rxjs/Rx': 'base/node_modules/rxjs/bundles/Rx.js'},
});
// this means that Zone has not patched the browser yet, which means we must be running in
// build mode and need to load the browser patch.
browserPatchedPromise = System.import('/base/build/test/browser-zone-setup');
Expand Down
1 change: 1 addition & 0 deletions test/node_entry_point.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import '../lib/zone-spec/proxy';
import '../lib/zone-spec/sync-test';
import '../lib/zone-spec/task-tracking';
import '../lib/zone-spec/wtf';
import '../lib/rxjs/rxjs';

// Setup test environment
import './test-env-setup-jasmine';
Expand Down
Loading