Skip to content
This repository has been archived by the owner on Feb 26, 2024. It is now read-only.

Commit

Permalink
feat(rxjs): fix #830, monkey patch rxjs to make rxjs run in correct z…
Browse files Browse the repository at this point in the history
…one (#843)

* feat(rxjs): fix #830, monkey patch rxjs to make rxjs run in correct zone

* make test pass

* add karma config

* pass compile-esm

* change rxjs to umd

* change spec to pass test

* add comment, patch Observable.create

* refactor, not create a new closure _subscribe everytime

* fix set wrong object

* begin to apply observable methods, 1: bindCallback

* fix async scheduler reset zone issue

* change require to import

* still need require in rxjs.spec

* make build and dist pass test

* remove meta def

* make import work on both build and dist

* continue to add cases
  • Loading branch information
JiaLiPassion authored and mhevery committed Jul 20, 2017
1 parent ab82852 commit 1ed83d0
Show file tree
Hide file tree
Showing 18 changed files with 760 additions and 7 deletions.
10 changes: 10 additions & 0 deletions gulpfile.js
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,14 @@ gulp.task('build/sync-test.js', ['compile-esm'], function(cb) {
return generateScript('./lib/zone-spec/sync-test.ts', 'sync-test.js', false, cb);
});

gulp.task('build/rxjs.js', ['compile-esm'], function(cb) {
return generateScript('./lib/rxjs/rxjs.ts', 'zone-patch-rxjs.js', false, cb);
});

gulp.task('build/rxjs.min.js', ['compile-esm'], function(cb) {
return generateScript('./lib/rxjs/rxjs.ts', 'zone-patch-rxjs.min.js', true, cb);
});

gulp.task('build/closure.js', function() {
return gulp.src('./lib/closure/zone_externs.js')
.pipe(gulp.dest('./dist'));
Expand Down Expand Up @@ -237,6 +245,8 @@ gulp.task('build', [
'build/async-test.js',
'build/fake-async-test.js',
'build/sync-test.js',
'build/rxjs.js',
'build/rxjs.min.js',
'build/closure.js'
]);

Expand Down
3 changes: 2 additions & 1 deletion karma-base.conf.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ module.exports = function (config) {
'node_modules/systemjs/dist/system-polyfills.js',
'node_modules/systemjs/dist/system.src.js',
'node_modules/whatwg-fetch/fetch.js',
{pattern: 'node_modules/rxjs/bundles/Rx.js', watched: true, served: true, included: false},
{pattern: 'test/assets/**/*.*', watched: true, served: true, included: false},
{pattern: 'build/**/*.js.map', watched: true, served: true, included: false},
{pattern: 'build/**/*.js', watched: true, served: true, included: false}
Expand All @@ -24,7 +25,7 @@ module.exports = function (config) {
require('karma-sourcemap-loader')
],

preprocessors: {
preprocessors: {
'**/*.js': ['sourcemap']
},

Expand Down
2 changes: 2 additions & 0 deletions karma-dist.conf.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,7 @@ module.exports = function (config) {
config.files.push('dist/sync-test.js');
config.files.push('dist/task-tracking.js');
config.files.push('dist/wtf.js');
config.files.push('node_modules/rxjs/bundles/Rx.js');
config.files.push('dist/zone-patch-rxjs.js');
config.files.push('build/test/main.js');
};
234 changes: 234 additions & 0 deletions lib/rxjs/rxjs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
/**
* @license
* Copyright Google Inc. All Rights Reserved.
*
* Use of this source code is governed by an MIT-style license that can be
* found in the LICENSE file at https://angular.io/license
*/

import * as Rx from 'rxjs/Rx';

(Zone as any).__load_patch('rxjs', (global: any, Zone: ZoneType, api: any) => {
const symbol: (symbolString: string) => string = (Zone as any).__symbol__;
const subscribeSource = 'rxjs.subscribe';
const nextSource = 'rxjs.Subscriber.next';
const errorSource = 'rxjs.Subscriber.error';
const completeSource = 'rxjs.Subscriber.complete';
const unsubscribeSource = 'rxjs.Subscriber.unsubscribe';
const teardownSource = 'rxjs.Subscriber.teardownLogic';

const patchObservableInstance = function(observable: any) {
observable._zone = Zone.current;
// patch inner function this._subscribe to check
// SubscriptionZone is same with ConstuctorZone or not
if (observable._subscribe && typeof observable._subscribe === 'function' &&
!observable._originalSubscribe) {
observable._originalSubscribe = observable._subscribe;
observable._subscribe = _patchedSubscribe;
}
};

const _patchedSubscribe = function() {
const currentZone = Zone.current;
const _zone = this._zone;

const args = Array.prototype.slice.call(arguments);
const subscriber = args.length > 0 ? args[0] : undefined;
// also keep currentZone in Subscriber
// for later Subscriber.next/error/complete method
if (subscriber && !subscriber._zone) {
subscriber._zone = currentZone;
}
// _subscribe should run in ConstructorZone
// but for performance concern, we should check
// whether ConsturctorZone === Zone.current here
const tearDownLogic = _zone !== Zone.current ?
_zone.run(this._originalSubscribe, this, args, subscribeSource) :
this._originalSubscribe.apply(this, args);
if (tearDownLogic && typeof tearDownLogic === 'function') {
const patchedTearDownLogic = function() {
// tearDownLogic should also run in ConstructorZone
// but for performance concern, we should check
// whether ConsturctorZone === Zone.current here
if (_zone && _zone !== Zone.current) {
return _zone.run(tearDownLogic, this, arguments, teardownSource);
} else {
return tearDownLogic.apply(this, arguments);
}
};
return patchedTearDownLogic;
}
return tearDownLogic;
};

const patchObservable = function(Rx: any, observableType: string) {
const symbolObservable = symbol(observableType);

const Observable = Rx[observableType];
if (!Observable || Observable[symbolObservable]) {
// the subclass of Observable not loaded or have been patched
return;
}

// monkey-patch Observable to save the
// current zone as ConstructorZone
const patchedObservable: any = Rx[observableType] = function() {
Observable.apply(this, arguments);
patchObservableInstance(this);
return this;
};

patchedObservable.prototype = Observable.prototype;
patchedObservable[symbolObservable] = Observable;

Object.keys(Observable).forEach(key => {
patchedObservable[key] = Observable[key];
});

const ObservablePrototype: any = Observable.prototype;
const symbolSubscribe = symbol('subscribe');

if (!ObservablePrototype[symbolSubscribe]) {
const subscribe = ObservablePrototype[symbolSubscribe] = ObservablePrototype.subscribe;
// patch Observable.prototype.subscribe
// if SubscripitionZone is different with ConstructorZone
// we should run _subscribe in ConstructorZone and
// create sinke in SubscriptionZone,
// and tearDown should also run into ConstructorZone
Observable.prototype.subscribe = function() {
const _zone = this._zone;
const currentZone = Zone.current;

// if operator is involved, we should also
// patch the call method to save the Subscription zone
if (this.operator && _zone && _zone !== currentZone) {
const call = this.operator.call;
this.operator.call = function() {
const args = Array.prototype.slice.call(arguments);
const subscriber = args.length > 0 ? args[0] : undefined;
if (!subscriber._zone) {
subscriber._zone = currentZone;
}
return _zone.run(call, this, args, subscribeSource);
};
}
const result = subscribe.apply(this, arguments);
// the result is the subscriber sink,
// we save the current Zone here
if (!result._zone) {
result._zone = currentZone;
}
return result;
};
}

const symbolLift = symbol('lift');
if (!ObservablePrototype[symbolLift]) {
const lift = ObservablePrototype[symbolLift] = ObservablePrototype.lift;

// patch lift method to save ConstructorZone of Observable
Observable.prototype.lift = function() {
const observable = lift.apply(this, arguments);
patchObservableInstance(observable);

return observable;
};
}

const symbolCreate = symbol('create');
if (!patchedObservable[symbolCreate]) {
const create = patchedObservable[symbolCreate] = Observable.create;
// patch create method to save ConstructorZone of Observable
Rx.Observable.create = function() {
const observable = create.apply(this, arguments);
patchObservableInstance(observable);

return observable;
};
}
};

const patchSubscriber = function() {
const Subscriber = Rx.Subscriber;

const next = Subscriber.prototype.next;
const error = Subscriber.prototype.error;
const complete = Subscriber.prototype.complete;
const unsubscribe = Subscriber.prototype.unsubscribe;

// patch Subscriber.next to make sure it run
// into SubscriptionZone
Subscriber.prototype.next = function() {
const currentZone = Zone.current;
const subscriptionZone = this._zone;

// for performance concern, check Zone.current
// equal with this._zone(SubscriptionZone) or not
if (subscriptionZone && subscriptionZone !== currentZone) {
return subscriptionZone.run(next, this, arguments, nextSource);
} else {
return next.apply(this, arguments);
}
};

Subscriber.prototype.error = function() {
const currentZone = Zone.current;
const subscriptionZone = this._zone;

// for performance concern, check Zone.current
// equal with this._zone(SubscriptionZone) or not
if (subscriptionZone && subscriptionZone !== currentZone) {
return subscriptionZone.run(error, this, arguments, errorSource);
} else {
return error.apply(this, arguments);
}
};

Subscriber.prototype.complete = function() {
const currentZone = Zone.current;
const subscriptionZone = this._zone;

// for performance concern, check Zone.current
// equal with this._zone(SubscriptionZone) or not
if (subscriptionZone && subscriptionZone !== currentZone) {
return subscriptionZone.run(complete, this, arguments, completeSource);
} else {
return complete.apply(this, arguments);
}
};

Subscriber.prototype.unsubscribe = function() {
const currentZone = Zone.current;
const subscriptionZone = this._zone;

// for performance concern, check Zone.current
// equal with this._zone(SubscriptionZone) or not
if (subscriptionZone && subscriptionZone !== currentZone) {
return subscriptionZone.run(unsubscribe, this, arguments, unsubscribeSource);
} else {
return unsubscribe.apply(this, arguments);
}
};
};

const patchObservableFactoryCreator = function(obj: any, factoryName: string) {
const symbolFactory: string = symbol(factoryName);
if (obj[symbolFactory]) {
return;
}
const factoryCreator: any = obj[symbolFactory] = obj[factoryName];
obj[factoryName] = function() {
const factory: any = factoryCreator.apply(this, arguments);
return function() {
const observable = factory.apply(this, arguments);
patchObservableInstance(observable);
return observable;
};
};
};

patchObservable(Rx, 'Observable');
patchSubscriber();
patchObservableFactoryCreator(Rx.Observable, 'bindCallback');
patchObservableFactoryCreator(Rx.Observable, 'bindNodeCallback');
});
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
"webdriver-sauce-test": "node test/webdriver/test.sauce.js",
"ws-client": "node ./test/ws-client.js",
"ws-server": "node ./test/ws-server.js",
"tsc": "tsc",
"tsc:w": "tsc -w",
"tsc": "tsc -p .",
"tsc:w": "tsc -w -p .",
"test": "npm run tsc && concurrently \"npm run tsc:w\" \"npm run ws-server\" \"npm run karma-jasmine\"",
"test:phantomjs": "npm run tsc && concurrently \"npm run tsc:w\" \"npm run ws-server\" \"npm run karma-jasmine:phantomjs\"",
"test:phantomjs-single": "concurrently \"npm run ws-server\" \"npm run karma-jasmine-phantomjs:autoclose\"",
Expand Down Expand Up @@ -87,6 +87,7 @@
"phantomjs": "^2.1.7",
"promises-aplus-tests": "^2.1.2",
"pump": "^1.0.1",
"rxjs": "^5.4.2",
"selenium-webdriver": "^3.4.0",
"systemjs": "^0.19.37",
"ts-loader": "^0.6.0",
Expand Down
3 changes: 2 additions & 1 deletion test/browser-zone-setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ import '../lib/zone-spec/proxy';
import '../lib/zone-spec/sync-test';
import '../lib/zone-spec/task-tracking';
import '../lib/zone-spec/wtf';
import '../lib/extra/cordova';
import '../lib/extra/cordova';
import '../lib/rxjs/rxjs';
3 changes: 2 additions & 1 deletion test/browser_entry_point.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ import './browser/MediaQuery.spec';
import './browser/Notification.spec';
import './mocha-patch.spec';
import './jasmine-patch.spec';
import './extra/cordova.spec';
import './extra/cordova.spec';
import './rxjs/rxjs.spec';
1 change: 1 addition & 0 deletions test/common_tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ import './zone-spec/sync-test.spec';
import './zone-spec/fake-async-test.spec';
import './zone-spec/proxy.spec';
import './zone-spec/task-tracking.spec';
import './rxjs/rxjs.spec';

Error.stackTraceLimit = Number.POSITIVE_INFINITY;
11 changes: 11 additions & 0 deletions test/global-rxjs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/**
* @license
* Copyright Google Inc. All Rights Reserved.
*
* Use of this source code is governed by an MIT-style license that can be
* found in the LICENSE file at https://angular.io/license
*/
const globalRx: any = (window as any).Rx;
exports.Observable = globalRx.Observable;
exports.Subject = globalRx.Subject;
exports.Scheduler = globalRx.Scheduler;
9 changes: 8 additions & 1 deletion test/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,18 @@ declare const __karma__: {
__karma__.loaded = function() {};
(window as any).global = window;

System.config({defaultJSExtensions: true});
let browserPatchedPromise: any = null;
if ((window as any)[(Zone as any).__symbol__('setTimeout')]) {
System.config({
defaultJSExtensions: true,
map: {'rxjs/Rx': 'base/build/test/global-rxjs.js'},
});
browserPatchedPromise = Promise.resolve('browserPatched');
} else {
System.config({
defaultJSExtensions: true,
map: {'rxjs/Rx': 'base/node_modules/rxjs/bundles/Rx.js'},
});
// this means that Zone has not patched the browser yet, which means we must be running in
// build mode and need to load the browser patch.
browserPatchedPromise = System.import('/base/build/test/browser-zone-setup');
Expand Down
1 change: 1 addition & 0 deletions test/node_entry_point.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import '../lib/zone-spec/proxy';
import '../lib/zone-spec/sync-test';
import '../lib/zone-spec/task-tracking';
import '../lib/zone-spec/wtf';
import '../lib/rxjs/rxjs';

// Setup test environment
import './test-env-setup-jasmine';
Expand Down
Loading

0 comments on commit 1ed83d0

Please sign in to comment.