Skip to content

Commit

Permalink
feat(get, set, call): support both Rx4 and Rx5+ formats
Browse files Browse the repository at this point in the history
In order to support backward compatibility with other falcor toolsets, both Rx4 style and modern observable style signatures must be supported. This still returns an Rx5 observable, but it may be subscribed to and disposed in the Rx4 style
  • Loading branch information
benlesh committed Jan 10, 2017
1 parent d28d5b6 commit 25cc985
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 6 deletions.
5 changes: 3 additions & 2 deletions src/router/call.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ var collapse = pathUtils.collapse;
var Observable = require('../RouterRx.js').Observable;
var MaxPathsExceededError = require('../errors/MaxPathsExceededError');
var getPathsCount = require('./getPathsCount');
var rxNewToRxNewAndOld = require('../run/conversion/rxNewToRxNewAndOld');

/**
* Performs the call mutation. If a call is unhandled, IE throws error, then
Expand All @@ -18,7 +19,7 @@ module.exports = function routerCall(callPath, args,
refPathsArg, thisPathsArg) {
var router = this;

return Observable.defer(function() {
return rxNewToRxNewAndOld(Observable.defer(function() {

var refPaths = normalizePathSets(refPathsArg || []);
var thisPaths = normalizePathSets(thisPathsArg || []);
Expand Down Expand Up @@ -79,5 +80,5 @@ module.exports = function routerCall(callPath, args,
})
.do(null, function errorHookHandler(err) {
router._errorHook(callPath, err);
});
}));
};
5 changes: 3 additions & 2 deletions src/router/get.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ var Observable = require('../RouterRx.js').Observable;
var mCGRI = require('./../run/mergeCacheAndGatherRefsAndInvalidations');
var MaxPathsExceededError = require('../errors/MaxPathsExceededError');
var getPathsCount = require('./getPathsCount');
var rxNewToRxNewAndOld = require('../run/conversion/rxNewToRxNewAndOld');

/**
* The router get function
Expand All @@ -15,7 +16,7 @@ module.exports = function routerGet(paths) {

var router = this;

return Observable.defer(function() {
return rxNewToRxNewAndOld(Observable.defer(function() {

var jsongCache = {};
var action = runGetAction(router, jsongCache);
Expand Down Expand Up @@ -68,5 +69,5 @@ module.exports = function routerGet(paths) {
map(function(jsonGraphEnvelope) {
return materialize(router, normPS, jsonGraphEnvelope);
});
});
}));
};
5 changes: 3 additions & 2 deletions src/router/set.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ var collapse = pathUtils.collapse;
var mCGRI = require('./../run/mergeCacheAndGatherRefsAndInvalidations');
var MaxPathsExceededError = require('../errors/MaxPathsExceededError');
var getPathsCount = require('./getPathsCount');
var rxNewToRxNewAndOld = require('../run/conversion/rxNewToRxNewAndOld');

/**
* @returns {Observable.<JSONGraph>}
Expand All @@ -24,7 +25,7 @@ module.exports = function routerSet(jsonGraph) {

var router = this;

return Observable.defer(function() {
return rxNewToRxNewAndOld(Observable.defer(function() {
var jsongCache = {};
var action = runSetAction(router, jsonGraph, jsongCache);
jsonGraph.paths = normalizePathSets(jsonGraph.paths);
Expand Down Expand Up @@ -152,5 +153,5 @@ module.exports = function routerSet(jsonGraph) {
map(function(jsonGraphEnvelope) {
return materialize(router, jsonGraph.paths, jsonGraphEnvelope);
});
});
}));
};
57 changes: 57 additions & 0 deletions src/run/conversion/rxNewToRxNewAndOld.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// WHY NOT BOTH?
module.exports = function rxNewToRxNewAndOld(rxNewObservable) {
var _subscribe = rxNewObservable.subscribe;

rxNewObservable.subscribe = function (observerOrNextFn, errFn, compFn) {
var subscription;
switch (typeof observerOrNextFn) {
case 'function':
subscription = _subscribe.call(this,
observerOrNextFn, errFn, compFn);
break;
case 'object':
var observer = observerOrNextFn;
if (typeof observerOrNextFn.onNext === 'function') {
// old observer!
observer = {
next: function (x) {
var destination = this.destination;
destination.onNext(x);
},
error: function (err) {
var destination = this.destination;
if (destination.onError) {
destination.onError(err);
}
},
complete: function () {
var destination = this.destination;
if (destination.onCompleted) {
destination.onCompleted();
}
},
destination: observerOrNextFn
}
}
subscription = _subscribe.call(this, observer);
break;
case 'undefined':
subscription = _subscribe.call(this);
break;
default:
throw new TypeError('cannot subscribe to observable with ' +
'type ' + typeof observerOrNextFn);
}

var _unsubscribe = subscription.unsubscribe;

subscription.unsubscribe = subscription.dispose = function () {
this.isDisposed = true;
_unsubscribe.call(subscription);
};

return subscription;
}

return rxNewObservable;
}
42 changes: 42 additions & 0 deletions test/unit/core/get.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,48 @@ describe('Get', function() {
subscribe(noOp, done, done);
});

it('should return observables consumable in an Rx4 and under format', function() {

var router = new R([{
route: 'videos.falsey',
get: function(path) {
return Observable.of({
value: false,
path: ['videos', 'falsey']
});
}
}]);

var completed = false;
var results = [];

var source = router.get([['videos', 'falsey']]);
var sub = source .subscribe({
onNext: function (x) {
results.push(x);
},
onError: function () {
throw new Error('this should not be reached');
},
onCompleted: function () {
completed = true;
}
});

expect(sub.dispose).to.be.a('function');
expect(sub.unsubscribe).to.be.a('function');
expect(completed).to.equal(true);
expect(results).to.deep.equal([
{
jsonGraph: {
videos: {
falsey: false
}
}
}
]);
});

it('should not return empty atoms for a false path value', function(done) {

var router = new R([{
Expand Down
127 changes: 127 additions & 0 deletions test/unit/internal/rxNewToRxNewAndOld.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
var Observable = require('rxjs/Rx').Observable;
var rxNewToRxNewAndOld =
require('../../../../src/run/conversion/rxNewToRxNewAndOld');
var chai = require('chai');
var expect = chai.expect;

describe('rxNewToRxNewAndOld', function () {
it('should work with "old" observers', function () {
var source = Observable.of(1, 2, 3);
var results = [];

var sub = rxNewToRxNewAndOld(source).subscribe({
onNext: function (x) {
results.push(x);
},
onError: function (err) {
throw err;
},
onCompleted: function () {
results.push('done');
}
});

expect(sub.dispose).to.be.a('function');
expect(sub.unsubscribe).to.be.a('function');
expect(results).to.deep.equal([1, 2, 3, 'done']);
});

it('should work with "new" observers', function () {
var source = Observable.of(1, 2, 3);
var results = [];

var sub = rxNewToRxNewAndOld(source).subscribe({
next: function (x) {
results.push(x);
},
error: function (err) {
throw err;
},
complete: function () {
results.push('done');
}
});

expect(sub.dispose).to.be.a('function');
expect(sub.unsubscribe).to.be.a('function');
expect(results).to.deep.equal([1, 2, 3, 'done']);
});

it('should work with three functions', function () {
var source = Observable.of(1, 2, 3);
var results = [];

var sub = rxNewToRxNewAndOld(source).subscribe(
function (x) {
results.push(x);
},
function (err) {
throw err;
},
function () {
results.push('done');
}
);

expect(sub.dispose).to.be.a('function');
expect(sub.unsubscribe).to.be.a('function');
expect(results).to.deep.equal([1, 2, 3, 'done']);
});

it('should work with no arguments', function () {
var source = Observable.of(1, 2, 3);
var results = [];

var sub = rxNewToRxNewAndOld(source).subscribe();

expect(sub.dispose).to.be.a('function');
expect(sub.unsubscribe).to.be.a('function');
expect(results).to.deep.equal([1, 2, 3, 'done']);
});

it('should unsubscribe with `dispose`', function () {
var source = Observable.of('hello');
var results = [];

var sub = rxNewToRxNewAndOld(source).subscribe(
function (x) {
results.push(x);
},
function (err) {
throw err;
},
function () {
results.push('done');
}
);

sub.dispose();

expect(sub.closed).to.be(true);
expect(sub.isDisposed).to.be(true);
expect(results).to.deep.equal([]);
});

it('should unsubscribe with `unsubscribe`', function () {
var source = Observable.of('hello');
var results = [];

var sub = rxNewToRxNewAndOld(source).subscribe(
function (x) {
results.push(x);
},
function (err) {
throw err;
},
function () {
results.push('done');
}
);

sub.unsubscribe();

expect(sub.closed).to.be(true);
expect(sub.isDisposed).to.be(true);
expect(results).to.deep.equal([]);
});
})

0 comments on commit 25cc985

Please sign in to comment.