Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix "init" relay logic (#36) + unified listeners + Pipe and AsyncLoopbackConnection refactoring #45

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Swarm
_reactive data sync lib: replicated model for your web app_

[![Build Status](https://img.shields.io/travis/gritzko/swarm/master.svg)](https://travis-ci.org/gritzko/swarm)
[![Build Status](https://travis-ci.org/gritzko/swarm.svg?branch=master)](https://travis-ci.org/gritzko/swarm)

[Swarm](http://swarmjs.github.io/articles/todomvc/) is an isomorphic reactive M-of-MVC library that synchronizes objects in real-time and may work offline. Swarm is perfect for implementing collaboration or continuity features in Web and mobile apps. Swarm supports complex data types by relying on its op-based CRDT base.

Expand Down Expand Up @@ -62,7 +62,7 @@ someMouse.set({x:1,y:2});
var mickey = new Mouse('Mickey');

// 4.b. ...wait for the state to arrive
mickey.on('init', function () {
mickey.onStateReady(function () {
// ...so we may touch it finally.
mickey.set({x: 3, y: 4});
});
Expand Down
121 changes: 72 additions & 49 deletions lib/AsyncLoopbackConnection.js
Original file line number Diff line number Diff line change
@@ -1,65 +1,88 @@
"use strict";

var env = require('./env');
var knownStreams = require('./env').streams;

/**
* @param {string|AsyncLoopbackConnection} url
* @constructor
*/
function AsyncLoopbackConnection(url) {
var m = url.match(/loopback:(\w+)/);
if (!m) {
throw new Error('invalid url');
}
this.id = m[1];
this.lstn = {};
this.queue = [];
if (this.id in AsyncLoopbackConnection.pipes) {
throw new Error('duplicate');
var stream = this;

var lstn = {};
var queue = [];
var id;
var paired;

Object.defineProperty(stream, 'id', { get: function () { return id; } });
stream.on = on;
stream.write = write;
stream.close = close;
stream.toString = toString;
stream._receive = receive;

if (typeof url === 'string') {
var m = url.match(/loopback:(\w+)/);
if (!m) {
throw new Error('invalid url');
}
id = m[1];
paired = new AsyncLoopbackConnection(stream);
} else if (url instanceof AsyncLoopbackConnection) {
paired = url;
id = paired.id.match(/./g).reverse().join('');

var uplink = AsyncLoopbackConnection.uplinks[paired.id];
if (!uplink) {
throw new Error('no uplink set for connection url: "' + url + '"');
}
uplink.accept(stream);
}
AsyncLoopbackConnection.pipes[this.id] = this;
var pair = this.pair();
if (pair && pair.queue.length) {
pair.write();

function on(evname, fn) {
if (evname in lstn) {
throw new Error('multiple listeners not supported');
}
lstn[evname] = fn;
}
}
AsyncLoopbackConnection.pipes = {};

AsyncLoopbackConnection.delay = function delay_for_1ms() {
return 1;
};
function receive(string) {
lstn.data && lstn.data(string);
}

env.streams.loopback = AsyncLoopbackConnection;
function write(obj) {
obj && queue.push(obj.toString());
setTimeout(function asyncWrite() {
while (queue.length) {
paired._receive(queue.shift());
}
}, AsyncLoopbackConnection.delay());
}

AsyncLoopbackConnection.prototype.pair = function () {
var pairId = this.id.match(/./g).reverse().join('');
return AsyncLoopbackConnection.pipes[pairId];
};
function close(closePaired) {
if (!closePaired) { paired.close(true); }
setTimeout(function asyncClose() {
lstn.close && lstn.close();
}, AsyncLoopbackConnection.delay());
}

AsyncLoopbackConnection.prototype.on = function (evname, fn) {
if (evname in this.lstn) {
throw new Error('multiple listeners not supported');
function toString() {
return '|' + id;
}
this.lstn[evname] = fn;
};
}
AsyncLoopbackConnection.pipes = {};

AsyncLoopbackConnection.prototype.receive = function (string) {
this.lstn.data && this.lstn.data(string);
AsyncLoopbackConnection.uplinks = {};
AsyncLoopbackConnection.registerUplink = function (url, uplink) {
var m = url.match(/loopback:(\w+)/);
if (!m) { throw new Error('invalid url'); }
AsyncLoopbackConnection.uplinks[m[1]] = uplink;
};

AsyncLoopbackConnection.prototype.write = function (obj) {
var self = this;
obj && self.queue.push(obj.toString());
setTimeout(function () {
var pair = self.pair();
if (!pair) {
return;
}
while (self.queue.length) {
pair.receive(self.queue.shift());
}
}, AsyncLoopbackConnection.delay());
AsyncLoopbackConnection.delay = function delay_for_1ms() {
return 1;
};

AsyncLoopbackConnection.prototype.close = function () {
delete AsyncLoopbackConnection.pipes[this.id];
var pair = this.pair();
pair && pair.close();
this.lstn.close && this.lstn.close();
};
knownStreams.loopback = AsyncLoopbackConnection;

module.exports = AsyncLoopbackConnection;
1 change: 0 additions & 1 deletion lib/CollectionMethodsMixin.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ module.exports = {
* @this Set|Vector
*/
onObjectEvent: function (callback) {
this._proxy.owner = this;
this._proxy.on(callback);
},

Expand Down
84 changes: 55 additions & 29 deletions lib/Host.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ var SecondPreciseClock = require('./SecondPreciseClock');
function Host(id, ms, storage) {
this.objects = {};
this.sources = {};
this.pipeReconnection = {};
this.storage = storage;
this._host = this; // :)
this._lstn = [','];
Expand Down Expand Up @@ -97,7 +98,7 @@ module.exports = Syncable.extend(Host, {
}
o = new t(typeid, undefined, this);
if (typeof(callback) === 'function') {
o.on('.init', callback);
o.onStateReady(callback);
}
}
return o;
Expand Down Expand Up @@ -166,20 +167,33 @@ module.exports = Syncable.extend(Host, {
this.addSource(spec, host);
},

off: function (spec, nothing, peer) {
off: function (spec, error, peer) {
peer.deliver(peer.spec().add(this.time(), '!').add('.reoff'), '', this);
this.removeSource(spec, peer);
this.removeSource(spec, error, peer);
},

reoff: function hostReOff(spec, nothing, peer) {
this.removeSource(spec, peer);
reoff: function hostReOff(spec, error, peer) {
this.removeSource(spec, error, peer);
}

}, // neutrals

removeSource: function (spec, peer) {
if (spec.type() !== 'Host') {
throw new Error('Host.removeSource(/NoHost)');
removeSource: function (spec, error, peer) {
if (spec.type() !== 'Host') { throw new Error('Host.removeSource(/NoHost)'); }

if (error && peer instanceof Pipe) {
// pipe reconnection on error
var pipe_opts = peer.opts;
if (pipe_opts.url) {
//reconnect delay for next disconnection
pipe_opts.errorsCount = Math.max(7, peer.errorsCount + 1);
var timeout = Math.min(30000, (pipe_opts.reconnectDelay || 1000) << pipe_opts.errorsCount);
// schedule a retry
env.debug && env.log(this, peer._id + '.scheduleReconnect', timeout);
this.pipeReconnection[peer._id] = setTimeout((function reconnectPipe() {
this.connect(pipe_opts.url, pipe_opts);
}).bind(this), timeout);
}
}

if (this.sources[peer._id] !== peer) {
Expand All @@ -189,10 +203,7 @@ module.exports = Syncable.extend(Host, {
delete this.sources[peer._id];
for (var sp in this.objects) {
var obj = this.objects[sp];
if (obj.getListenerIndex(peer, true) > -1) {
obj.off(sp, '', peer);
obj.checkUplink(sp);
}
obj.reoff('', peer);
}
},

Expand Down Expand Up @@ -282,37 +293,52 @@ module.exports = Syncable.extend(Host, {
},

// waits for handshake from stream
accept: function (stream_or_url, pipe_env) {
new Pipe(this, stream_or_url, pipe_env);
accept: function (stream, pipeOpts) {
if (typeof stream.write !== 'function') { throw new Error('stream expected'); }
env.debug && env.log(this, '.accept', stream.toString());
new Pipe(this, stream, pipeOpts);
},

// initiate handshake with peer
connect: function (stream_or_url, pipe_env) {
var pipe = new Pipe(this, stream_or_url, pipe_env);
connect: function (stream_or_url, pipeOpts) {
pipeOpts || (pipeOpts = {});
env.debug && env.log(this, '.connect', stream_or_url.toString());
if (typeof stream_or_url.write !== 'function') { // not stream
var url = stream_or_url.toString();
var sc = env.getStreamConstructor(url);
stream_or_url = new sc(url);
pipeOpts.url = url;
}
var pipe = new Pipe(this, stream_or_url, pipeOpts);
pipe.deliver(new Spec('/Host#'+this._id+'!0.on'), '', this); //this.newEventSpec
return pipe;
},

disconnect: function (id) {
for (var peer_id in this.sources) {
if (id && peer_id != id) {
continue;
if (id) {
if (id === this._id) {
return;
}
if (peer_id === this._id) {
// storage
continue;
var peer = this.sources[id];
if (peer) {
// normally, .off is sent by a downlink
peer.deliver(peer.spec().add(this.time(), '!').add('.off'));
} else if (this.pipeReconnection[id]) {
clearTimeout(this.pipeReconnection[id]);
delete this.pipeReconnection[id];
}
} else {
for (id in this.sources) {
this.disconnect(id);
}
for (id in this.pipeReconnection) {
this.disconnect(id);
}
var peer = this.sources[peer_id];
// normally, .off is sent by a downlink
peer.deliver(peer.spec().add(this.time(), '!').add('.off'));
}
},

close: function (cb) {
for(var id in this.sources) {
if (id===this._id) {continue;}
this.disconnect(id);
}
this.disconnect();
if (this.storage) {
this.storage.close(cb);
} else if (cb) {
Expand Down
23 changes: 11 additions & 12 deletions lib/Html5Client.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,27 @@ Swarm.get = function (spec) {
var env = Swarm.env;

if (env.isWebKit || env.isGecko) {
env.log = function css_log(spec, value, replica, host) {
if (!host && replica && replica._host) {
host = replica._host;
}
env.log = function css_log(object, spec, value, src) {
var host = env.multihost && object && object._host ? '@' + object._host._id : '';
var obj = object && object.toString() || '';
var source = src && src.toString() || '';

if (value && value.constructor.name === 'Spec') {
value = value.toString();
}

console.log(
"%c%s %c%s %c%O %c%s @%c%s",
"%c%s %c%s %c%s %c%O %c%s",
"color: #ccd",
host,
"color: #888",
env.multihost ? host && host._id : '',
obj,
"color: #024; font-style: italic",
spec.toString(),
"font-style: normal; color: #042",
value,
"color: #88a",
(replica && ((replica.spec && replica.spec().toString()) || replica._id)) ||
(replica ? 'no id' : 'undef'),
"color: #ccd",
replica && replica._host && replica._host._id
//replica&&replica.spec&&(replica.spec()+
// (this._host===replica._host?'':' @'+replica._host._id)
source
);
};
}
Loading