Skip to content

Commit

Permalink
Merge pull request #1536 from PradnyaBaviskar:issue418-replication
Browse files Browse the repository at this point in the history
Promisify 'PersistedModel - replication'

Close #1536
  • Loading branch information
Miroslav Bajtoš committed Aug 12, 2015
2 parents 9d776d7 + 64a1dba commit 33351e2
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 26 deletions.
6 changes: 3 additions & 3 deletions lib/persisted-model.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ var async = require('async');
var deprecated = require('depd')('loopback');
var debug = require('debug')('loopback:persisted-model');
var PassThrough = require('stream').PassThrough;
var utils = require('./utils');

module.exports = function(registry) {
var Model = registry.getModel('Model');
Expand Down Expand Up @@ -916,9 +917,7 @@ module.exports = function(registry) {
options = options || {};

var sourceModel = this;
callback = callback || function defaultReplicationCallback(err) {
if (err) throw err;
};
callback = callback || utils.createPromiseCallback();

debug('replicating %s since %s to %s since %s',
sourceModel.modelName,
Expand All @@ -944,6 +943,7 @@ module.exports = function(registry) {
var MAX_ATTEMPTS = 3;

run(1, since);
return callback.promise;

function run(attempt, since) {
debug('\titeration #%s', attempt);
Expand Down
141 changes: 118 additions & 23 deletions test/replication.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,43 +82,70 @@ describe('Replication / Change APIs', function() {
});

describe('Model.replicate(since, targetModel, options, callback)', function() {

function assertTargetModelEqualsSourceModel(conflicts, sourceModel,
targetModel, done) {
var sourceData;
var targetData;

assert(conflicts.length === 0);
async.parallel([
function(cb) {
sourceModel.find(function(err, result) {
if (err) return cb(err);
sourceData = result;
cb();
});
},
function(cb) {
targetModel.find(function(err, result) {
if (err) return cb(err);
targetData = result;
cb();
});
}
], function(err) {
if (err) return done(err);

assert.deepEqual(sourceData, targetData);
done();
});
}

it('Replicate data using the target model', function(done) {
var test = this;
var options = {};
var sourceData;
var targetData;

this.SourceModel.create({name: 'foo'}, function(err) {
if (err) return done(err);
test.SourceModel.replicate(test.startingCheckpoint, test.TargetModel,
options, function(err, conflicts) {
if (err) return done(err);
assert(conflicts.length === 0);
async.parallel([
function(cb) {
test.SourceModel.find(function(err, result) {
if (err) return cb(err);
sourceData = result;
cb();
});
},
function(cb) {
test.TargetModel.find(function(err, result) {
if (err) return cb(err);
targetData = result;
cb();
});
}
], function(err) {
if (err) return done(err);

assert.deepEqual(sourceData, targetData);
done();
});
assertTargetModelEqualsSourceModel(conflicts, test.SourceModel,
test.TargetModel, done);
});
});
});

it('Replicate data using the target model - promise variant', function(done) {
var test = this;
var options = {};

this.SourceModel.create({name: 'foo'}, function(err) {
if (err) return done(err);
test.SourceModel.replicate(test.startingCheckpoint, test.TargetModel,
options)
.then(function(conflicts) {
assertTargetModelEqualsSourceModel(conflicts, test.SourceModel,
test.TargetModel, done);
})
.catch(function(err) {
done(err);
});
});
});

it('applies "since" filter on source changes', function(done) {
async.series([
function createModelInSourceCp1(next) {
Expand Down Expand Up @@ -147,6 +174,38 @@ describe('Replication / Change APIs', function() {
], done);
});

it('applies "since" filter on source changes - promise variant', function(done) {
async.series([
function createModelInSourceCp1(next) {
SourceModel.create({ id: '1' }, next);
},
function checkpoint(next) {
SourceModel.checkpoint(next);
},
function createModelInSourceCp2(next) {
SourceModel.create({ id: '2' }, next);
},
function replicateLastChangeOnly(next) {
SourceModel.currentCheckpoint(function(err, cp) {
if (err) return done(err);
SourceModel.replicate(cp, TargetModel, {})
.then(function(next) {
done();
})
.catch(err);
});
},
function verify(next) {
TargetModel.find(function(err, list) {
if (err) return done(err);
// '1' should be skipped by replication
expect(getIds(list)).to.eql(['2']);
next();
});
}
], done);
});

it('applies "since" filter on target changes', function(done) {
// Because the "since" filter is just an optimization,
// there isn't really any observable behaviour we could
Expand All @@ -161,6 +220,23 @@ describe('Replication / Change APIs', function() {
});
});

it('applies "since" filter on target changes - promise variant', function(done) {
// Because the "since" filter is just an optimization,
// there isn't really any observable behaviour we could
// check to assert correct implementation.
var diffSince = [];
spyAndStoreSinceArg(TargetModel, 'diff', diffSince);

SourceModel.replicate(10, TargetModel, {})
.then(function() {
expect(diffSince).to.eql([10]);
done();
})
.catch(function(err) {
done(err);
});
});

it('uses different "since" value for source and target', function(done) {
var sourceSince = [];
var targetSince = [];
Expand All @@ -177,6 +253,25 @@ describe('Replication / Change APIs', function() {
});
});

it('uses different "since" value for source and target - promise variant', function(done) {
var sourceSince = [];
var targetSince = [];

spyAndStoreSinceArg(SourceModel, 'changes', sourceSince);
spyAndStoreSinceArg(TargetModel, 'diff', targetSince);

var since = { source: 1, target: 2 };
SourceModel.replicate(since, TargetModel, {})
.then(function() {
expect(sourceSince).to.eql([1]);
expect(targetSince).to.eql([2]);
done();
})
.catch(function(err) {
done(err);
});
});

it('picks up changes made during replication', function(done) {
setupRaceConditionInReplication(function(cb) {
// simulate the situation when another model is created
Expand Down

0 comments on commit 33351e2

Please sign in to comment.