From a6ec5166e8a6ae66fb8d26bc33392d561d461c88 Mon Sep 17 00:00:00 2001 From: Martynas Bagdonas Date: Thu, 6 Jul 2017 19:28:24 +0300 Subject: [PATCH 1/2] Combined update --- .travis.yml | 10 +-- config/default.json | 4 +- lib/controller.js | 126 +++++++++++++++++------------ lib/listener.js | 189 ++++++++++++++++++++----------------------- lib/plugins/index.js | 25 +++++- lib/subscription.js | 2 +- lib/sync.js | 2 +- makefile | 2 + 8 files changed, 194 insertions(+), 166 deletions(-) diff --git a/.travis.yml b/.travis.yml index 54739d7..94689f8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,9 +1,9 @@ language: node_js node_js: - - "5" + - "8" + - "6" - "4" - - "0.12" sudo: false cache: @@ -13,12 +13,8 @@ cache: matrix: fast_finish: true -before_install: - # For 0.12 without harmony - - npm install gnode - script: "make test-travis" -after_script: "npm install coveralls@2.10.0 && cat ./coverage/lcov.info | coveralls" +after_script: "npm install coveralls@2.12.0 && cat ./coverage/lcov.info | coveralls" notifications: email: diff --git a/config/default.json b/config/default.json index b1722b9..f83fdf0 100644 --- a/config/default.json +++ b/config/default.json @@ -32,8 +32,8 @@ }, "plugins": [ - "./logger", - "./fs" + "logger", + "fs" ], "redis": { diff --git a/lib/controller.js b/lib/controller.js index 4489a99..4e8703b 100644 --- a/lib/controller.js +++ b/lib/controller.js @@ -12,6 +12,7 @@ var trace = require('debug')('arkivo:trace'); var B = require('bluebird'); var config = require('./config').controller; +var zotero = require('./zotero'); var common = require('./common'); var extend = common.extend; @@ -50,10 +51,9 @@ function Controller(options) { id: subscription.id, title: 'Zotero Stream-issued Synchronization Request' }, this.options.delay, this.options.attempts); - }.bind(this)) - - .on('error', fail('uncaught listener error')); + }.bind(this)); + this.subscriptionsInSyncQueue = {}; /** * The default job types to process. @@ -100,18 +100,25 @@ Controller.prototype.subscribe = function (data, job) { report(job, ['subscribing to "%s"...', subscription.url]); - return subscription - .save() - - .tap(function (s) { + // Theoretically the validated url can become invalid + // when stream-server tries to create subscription. + // But in practice this shouldn't be a problem. + return validateUrl(data.url, data.key) + .then(function () { + return subscription.save(); + }) + .then(function () { + if (this.options.listen) + this.listener.add(subscription); + // If sync job would finish faster than stream-server + // would add the new subscription, this could result in missed updates, + // but each new stream-server subscription triggers another sync, + // therefore this shouldn't happen. this.notify('sync', { - id: s.id, + id: subscription.id, skip: data.skip }, 0, this.options.attempts); - - if (this.options.listen) - return this.listener.add(s); - + return subscription; }.bind(this)); }; @@ -144,7 +151,7 @@ Controller.prototype.unsubscribe = function (data, job) { .call('destroy') .tap(function (s) { - if (this.options.listen) return this.listener.remove(s); + if (this.options.listen) this.listener.remove(s); }.bind(this)); }; @@ -177,6 +184,8 @@ Controller.prototype.synchronize = function (data, job) { var lock = this.lock; + delete this.subscriptionsInSyncQueue[data.id]; + if (data.all) { report(job, ['loading all subscriptions...']); subscriptions = Subscription.all(); @@ -266,6 +275,17 @@ Controller.prototype.sync = Controller.prototype.synchronize; * @return {Promise} */ Controller.prototype.notify = function (name, data, delay, attempts) { + + // This is a workaround to job deduplication which is not supported + // by Kue. We just skip the job if its sync id is in queue. + // This doesn't help if Arkivo is restarted. + if (name === 'sync') { + if (this.subscriptionsInSyncQueue[data.id]) { + return B.resolve(null); + } + this.subscriptionsInSyncQueue[data.id] = true; + } + return new B(function (resolve, reject) { var job = q.jobs.create(name, data); @@ -348,39 +368,21 @@ Controller.prototype.start = function (jobs) { var workers = this.options.workers; var listener = this.listener; - // Initially sync all subscriptions; - // then start processing new or interrupted jobs! - var subscriptions = this - .sync({ all: true }) - .catch(fail('initial synchronization failed')); + jobs.forEach(function (type) { + debug('listening for "%s" requests...', type); + q.jobs.process(type, workers, handler); + }); - subscriptions - .finally(function resume() { - - jobs.forEach(function (type) { - debug('listening for "%s" requests...', type); - q.jobs.process(type, workers, handler); - }); + q.interrupted().each(function (job) { + debug('re-sheduling interrupted job %s#%d...', job.type, job.id); + job.inactive(); + }); - q.interrupted().each(function (job) { - debug('re-sheduling interrupted job %s#%d...', job.type, job.id); - job.inactive(); - }); - }); - - - // Connect to Zotero Stream API and start - // listening when initial sync is done. + // Connect to Zotero Stream API and start listening if (this.options.listen) { - listener - .start() - .once('connected', function () { - debug('listener connected successfully'); - - subscriptions - .then(function (s) { if (s.length) return listener.add(s); }); - }); - + listener.start(); + var subscriptions = Subscription.all(); + subscriptions.tap(function (s) { if (s.length) listener.add(s); }); } return this; @@ -452,16 +454,38 @@ function progressor(job, total) { }; } -function fail(message) { - return function failed(error) { - debug('%s: %s', message, error.message); - trace(error.stack); - - throw error; - }; -} +// function fail(message) { +// return function failed(error) { +// debug('%s: %s', message, error.message); +// trace(error.stack); +// +// throw error; +// }; +// } function listify(i) { return [i]; } +function validateUrl(url, key) { + return new B(function (resolve, reject) { + if (!(/^(\/?(users|groups)\/\d+\/?(publications\/items|items))/).test(url)) { + return reject(new Error('Invalid URL')); + } + + var headers = {}; + if (key) { + headers['Zotero-API-Key'] = key; + } + zotero.request({ + method: 'get', + path: url, + headers: headers + }, null, function (error, resp) { + // This directly returns all dataserver errors + if (error) return reject(error); + resolve(resp); + }); + }); +} + // --- Exports --- module.exports = Controller; diff --git a/lib/listener.js b/lib/listener.js index fdce672..8dc74c3 100644 --- a/lib/listener.js +++ b/lib/listener.js @@ -14,7 +14,6 @@ var common = require('./common'); var extend = common.extend; var index = common.findIndex; var pick = common.pick; -var pluck = common.pluck; /** @module arkivo */ @@ -32,148 +31,132 @@ function Listener(options) { this.options = extend({}, config, options); this.current = []; - this.pending = []; + this.created = {}; } inherits(Listener, EventEmitter); -Listener.prototype.subscribed = function (subscriptions, errors) { - var i, ii, j, jj, key, subscription; - - if (subscriptions) { +Listener.prototype.connected = function () { + this.created = {}; + var data = this.current.map(toData); + if (data.length) { + this.stream.subscribe(data); + } +}; - for (i = 0, ii = subscriptions.length; i < ii; ++i) { - subscription = subscriptions[i]; - key = subscription.apiKey; +Listener.prototype.subscribed = function (subscriptions) { + // We always get all existing subscribed topic here, instead, + // maybe we should only receive what was actually subscribed + // in this stream-server response. + var i, j, k, s, key, subscription; - for (j = 0, jj = subscription.topics.length; j < jj; ++j) - this.resolve(key, subscription.topics[j]); - } + for (i = 0; i < subscriptions.length; i++) { + subscription = subscriptions[i]; + key = subscription.apiKey; - } + for (j = 0; j < subscription.topics.length; j++) { + var topic = subscription.topics[j]; - if (errors) { + if (!this.created[topic]) { + this.created[topic] = true; + debug('topic %s created...', topic); - for (i = 0, ii = errors.length; i < ii; ++i) - this.reject.apply(this, pluck(errors[i], 'apiKey', 'topic', 'error')); + for (k = 0; k < this.current.length; k++) { + s = this.current[k]; + if (s.topic === topic && s.key === key) { + this.emit('updated', s); + } + } + } else { + debug('topic %s already exists...', topic); + } + } } }; Listener.prototype.updated = function (data) { - var i, ii, s; - var predicate = by(pick(data, 'apiKey', 'topic')); - debug('topic %s updated...', data.topic); + for (var i = 0; i < this.current.length; i++) { + var s = this.current[i]; - for (i = 0, ii = this.current.length; i < ii; ++i) { - s = this.current[i]; - - if (predicate(s)) + if (s.topic === data.topic) { this.emit('updated', s); + } } - - return this; -}; - -Listener.prototype.resolve = function (key, topic) { - var s; - var data; - - while ((s = remove(this.pending, { key: key, topic: topic }))) { - debug('[%s] listening for updates of %s...', s.id, topic); - - data = pick(s, 'id', 'key', 'topic'); - - s.resolve(data); - - this.current.push(data); - this.emit('added', data); - } - - return this; -}; - -Listener.prototype.reject = function (key, topic, reason) { - var s, error; - - while ((s = remove(this.pending, { key: key, topic: topic }))) { - debug('[%s] failed to subscribe %s: %s', s.id, topic, reason); - - error = new Error(reason); - error.data = pick(s, 'id', 'key', 'topic'); - - s.reject(error); - } - return this; }; -Listener.prototype.register = function (subscription) { - var data = pick(subscription, 'id', 'key', 'topic'); - - var promise = new B(function (resolve, reject) { - data.resolve = resolve; - data.reject = reject; - }); - - this.pending.push(data); - - return promise; +Listener.prototype.error = function (error) { + debug('stream error: %s', error.message); }; Listener.prototype.add = function (subscriptions) { - var self = this; - var register = this.register.bind(this); - assert(subscriptions); + assert(this.stream); - return new B(function (resolve, reject) { - assert(self.stream); + if (!Array.isArray(subscriptions)) + subscriptions = [subscriptions]; - if (!Array.isArray(subscriptions)) - subscriptions = [subscriptions]; + assert(subscriptions.length); + debug('adding %d subscription(s)...', subscriptions.length); - assert(subscriptions.length); - debug('adding %d subscription(s)...', subscriptions.length); + var data = []; - var data = subscriptions.map(toData); + for (var i = 0; i < subscriptions.length; i++) { + var subscription = subscriptions[i]; - self.stream.subscribe(data, function (error) { - if (error) return reject(error); + // This, already existing subscriptions' filtering logic, + // can go to zotero-api-node. + var found = false; + for (var j = 0; j < this.current.length; j++) { + var s = this.current[j]; + if (s.topic === subscription.topic && s.key === subscription.key) { + found = true; + break; + } + } - B.all(subscriptions.map(register)).then(resolve, reject); - }); - }); + if (!found) { + debug('subscribing %s to stream', subscription.topic); + data.push(toData(subscription)); + } else { + debug('skipping subscribing %s to stream', subscription.topic); + } + + this.current.push(pick(subscription, 'id', 'key', 'topic')); + } + + if (data.length) { + this.stream.subscribe(data); + } }; Listener.prototype.remove = function (subscription) { - var self = this; + assert(this.stream); - return new B(function (resolve, reject) { - assert(self.stream); + var data = remove(this.current, { id: subscription.id }); - var data = remove(self.current, { id: subscription.id }); + if (!data) { + debug('failed to remove %s: not registered', subscription.id); + } - if (!data) { - debug('failed to remove %s: not registered', subscription.id); - return reject(new Error('not registered')); + var found = false; + for (var i = 0; i < this.current.length; i++) { + var s = this.current[i]; + if (s.topic === data.topic && s.key === data.key) { + found = true; + break; } + } - self.stream.unsubscribe({ + if (!found) { + delete this.created[data.topic]; + this.stream.unsubscribe({ apiKey: data.key, topic: data.topic - - }, function (error) { - if (error) { - debug('failed to remove %s: %s', subscription.id, error); - return reject(error); - } - - debug('successfully removed %s from stream', subscription.id); - resolve(data); }); - }); + } }; @@ -186,8 +169,8 @@ Listener.prototype.start = function () { .stream(this.options) .on('topicUpdated', this.updated.bind(this)) .on('subscriptionsCreated', this.subscribed.bind(this)) - .on('connected', this.emit.bind(this, 'connected')) - .on('error', this.emit.bind(this, 'error')); + .on('connected', this.connected.bind(this)) + .on('error', this.error.bind(this)); return this; }; diff --git a/lib/plugins/index.js b/lib/plugins/index.js index bc4772c..811d14b 100644 --- a/lib/plugins/index.js +++ b/lib/plugins/index.js @@ -2,6 +2,7 @@ // --- Dependencies --- var assert = require('assert'); +var path = require('path'); var debug = require('debug')('arkivo:plugin'); var trace = require('debug')('arkivo:trace'); @@ -119,8 +120,30 @@ Plugins.prototype.update = function () { plugin = config.plugins[i]; debug('loading "%s"...', plugin); - this.add(require(plugin)); + var module; + if (path.isAbsolute(plugin)) { + // If an absolute path, just use it. + module = require(plugin); + } else if (plugin[0] === '.') { + // If a relative path, look into the current execution dir. + module = require(path.join(process.cwd(), plugin)); + } else { + // If without a path, try to firstly look into + // the bundled plugin dir (current dir). + try { + module = require(path.join(__dirname, plugin)); + } catch (error) { + if (error.code === 'MODULE_NOT_FOUND') { + // Let 'require' to find it. + module = require(plugin); + } else { + throw error; + } + } + } + + this.add(module); } catch (error) { debug('failed to load "%s": %s', plugin, error.message); trace(error.stack); diff --git a/lib/subscription.js b/lib/subscription.js index 46d947c..484f545 100644 --- a/lib/subscription.js +++ b/lib/subscription.js @@ -63,7 +63,7 @@ function Subscription(values) { * @property timestamp * @type String */ - var timestamp; + var timestamp = 0; property(this, 'timestamp', { enumerable: true, diff --git a/lib/sync.js b/lib/sync.js index f2cb3fb..4a4419a 100644 --- a/lib/sync.js +++ b/lib/sync.js @@ -645,7 +645,7 @@ Synchronizer.prototype.dispatch = co(function* (session) { .process(session, callback); - if (done.then) + if (done && done.then) done.then(resolve, reject); }).catch(function (error) { diff --git a/makefile b/makefile index 3867b21..947d695 100644 --- a/makefile +++ b/makefile @@ -3,6 +3,8 @@ BIN = ./node_modules/.bin SRC = lib/*.js lib/**/*.js TEST = test/*.js test/*.js test/http/*.js test/plugins/*.js +export ALLOW_CONFIG_MUTATIONS = 1 + doc: ${BIN}/yuidoc . From e8b2d845fd23afff94e623230db53d5d97bffb17 Mon Sep 17 00:00:00 2001 From: Martynas Bagdonas Date: Thu, 13 Jul 2017 15:12:00 +0300 Subject: [PATCH 2/2] Comments. Bug fixes. Test fixes. --- lib/controller.js | 117 ++++++++++++------ lib/http/index.js | 2 + lib/listener.js | 283 +++++++++++++++++++++++++++---------------- lib/subscription.js | 23 ++++ lib/sync.js | 2 +- test/controller.js | 5 + test/http/api.js | 9 +- test/listener.js | 70 ++++------- test/subscription.js | 2 +- 9 files changed, 324 insertions(+), 189 deletions(-) diff --git a/lib/controller.js b/lib/controller.js index 4e8703b..c9a4b9d 100644 --- a/lib/controller.js +++ b/lib/controller.js @@ -12,7 +12,6 @@ var trace = require('debug')('arkivo:trace'); var B = require('bluebird'); var config = require('./config').controller; -var zotero = require('./zotero'); var common = require('./common'); var extend = common.extend; @@ -29,6 +28,78 @@ var Listener = require('./listener'); /** @module arkivo */ +/** + * # How it works? + * ## What happens on startup? + * 1. Arkivo is started. + * 2. Continues processing the previous Kue jobs [subscribe, unusbscribe, sync]. + * 3. Creates listener, connects to stream-server, subscribes to all + * topics from all subscriptions. + * 4. Stream-server responds with subscriptionsCreated, and the listener + * emits 'update' event for each subscription that is listening + * for the specific topic and key pair. + * 5. Sync jobs are scheduled for each subscription. And this basically means + * that on each stream-server (re)connect we (re)synchronize all subscriptions. + * + * ## What happens when creating a subscription? + * 1. The URL is validated to make sure it's an items url and we have access to it. + * 2. If the URL is ok, the new subscription is created and saved. + * 3. Two concurrent operations are issued: + * subscribe to stream-server, and schedule the first sync job. + * 4. Listener receives 'subscriptionsCreated' response, and emits + * 'update' events only for subscriptions which are newly subscribed + * to stream-server. + * 5. Each 'update' event should schedule a 'sync' job for the updated + * subscription, but in practise the job will be skipped because, + * it should already exists when created in point 4. + * + * ## What happens when a topic is updated? + * 1. Listener receives 'topicUpdated' from stream-server. + * 2. Each subscription that is listening for the updated topic + * will get the 'update' event. + * 3. Each update event should schedule a 'sync' job for that subscription, + * unless there already exists a 'sync' job for the subscription. + * + * ## What happens when removing subscription? + * 1. Destroys the actual subscription. + * 2. Removes the subscription from the listener. + * 3. Listener checks if there is no more subscriptions listening + * for the same topic(+key) combination, and usubscribes from + * stream-server if so. + * + * --- + * + * # Optional stream-server + * Stream-sever can be disabled by setting listen:false in config. + * ## What this would change? + * * On startup we only continue to do the previously scheduled jobs. + * * But we don't initialize the listener. + * * Therefore we don't add subscriptions to it. + * * Therefore the listener doesn't subscribe to stream-server. + * * Therefore we don't get subscriptionsCreated response from server, + * which emits 'update' event and schedules 'sync' jobs. + * * But, 'sync' job is scheduled on subscription creation. + * * And also it can be triggered by `POST /api/sync` API request. + * + * --- + * + * # The Kue based 'sync' queue + * The Kue based sync queue is probably unnecessary and can be replaced with + * a simple array marking subscriptions that need to be updated: + * var subscriptionsToUpdate = [ + * "vrrui780gb", + * "olxbwld2t4", + * ] + * + * Especially because Kue doesn't have a simple way to deduplicate jobs. + * + * The parameter 'skip' (to skip fetching already existing items) + * can be saved together with the subscription, + * therefore we would no longer need to persist the sync queue. + * On each restart all subscriptions are resynchronized anyway. + * Unless there is something else why we need the persisting sync queue? + * + */ /** * @class Controller @@ -100,24 +171,26 @@ Controller.prototype.subscribe = function (data, job) { report(job, ['subscribing to "%s"...', subscription.url]); - // Theoretically the validated url can become invalid - // when stream-server tries to create subscription. + // Theoretically the validated url can become invalid at the time + // when stream-server tries to create a subscription. // But in practice this shouldn't be a problem. - return validateUrl(data.url, data.key) + return Subscription.validateItemsUrl(data.url, data.key) .then(function () { return subscription.save(); }) .then(function () { - if (this.options.listen) + if (this.options.listen) { this.listener.add(subscription); - // If sync job would finish faster than stream-server + } + // If sync job would finish before stream-server // would add the new subscription, this could result in missed updates, - // but each new stream-server subscription triggers another sync, - // therefore this shouldn't happen. + // but each new stream-server subscription schedules another sync job, + // therefore this problem should be impossible. this.notify('sync', { id: subscription.id, skip: data.skip }, 0, this.options.attempts); + return subscription; }.bind(this)); }; @@ -151,7 +224,7 @@ Controller.prototype.unsubscribe = function (data, job) { .call('destroy') .tap(function (s) { - if (this.options.listen) this.listener.remove(s); + if (this.options.listen) this.listener.remove(s.id); }.bind(this)); }; @@ -206,7 +279,7 @@ Controller.prototype.synchronize = function (data, job) { subscriptions = Subscription .load(data.id) - .then(listify); + .then(function (subscription) { return [subscription]; }); } return subscriptions @@ -463,29 +536,5 @@ function progressor(job, total) { // }; // } -function listify(i) { return [i]; } - -function validateUrl(url, key) { - return new B(function (resolve, reject) { - if (!(/^(\/?(users|groups)\/\d+\/?(publications\/items|items))/).test(url)) { - return reject(new Error('Invalid URL')); - } - - var headers = {}; - if (key) { - headers['Zotero-API-Key'] = key; - } - zotero.request({ - method: 'get', - path: url, - headers: headers - }, null, function (error, resp) { - // This directly returns all dataserver errors - if (error) return reject(error); - resolve(resp); - }); - }); -} - // --- Exports --- module.exports = Controller; diff --git a/lib/http/index.js b/lib/http/index.js index fdae380..dd64cf3 100644 --- a/lib/http/index.js +++ b/lib/http/index.js @@ -38,6 +38,8 @@ Server.prototype.initialize = function () { this.app = express(); + this.app.set('json spaces', 2); + this.app.use(this.options.api, q.app); this.app.use(this.options.api, routes.api); diff --git a/lib/listener.js b/lib/listener.js index 8dc74c3..86459cc 100644 --- a/lib/listener.js +++ b/lib/listener.js @@ -12,8 +12,8 @@ var zotero = require('./zotero'); var common = require('./common'); var extend = common.extend; -var index = common.findIndex; -var pick = common.pick; + +/* eslint-disable no-redeclare */ /** @module arkivo */ @@ -30,135 +30,235 @@ function Listener(options) { this.options = extend({}, config, options); - this.current = []; - this.created = {}; + /** + * Keeps all current subscriptions. + * I.e.: + * [{id:'zrfa0lo2rv', topic:'/users/1', key:'rGWO8xlHCtkEeShmrpDFlzfJ'}] + * + * @type {Array} + */ + this.subscriptions = []; + + /** + * Keeps all unique topics we are currently subscribing. + * I.e.: + * {'zrfa0lo2rv':true, 'btgpbmth56':true} + * + * This is necessary because stream-server doesn't return newly created + * subscriptions, instead it returns all subscriptions of the connection, + * so we are doing the diff between subscriptionsCreated topics and this.topics. + * + * The idea is that a topic to this.topics is added when subscriptionsCreated + * sends us a topic that doesn't exist in this.topics. And the topic is deleted, + * only when all subscriptions in this.subscriptions having that topic are removed. + * + * @type {Object} + */ + this.topics = []; } inherits(Listener, EventEmitter); +/** + * Handles stream-server 'connected' response. + * + * This function is called each time when zotero.stream (re)connects + * to stream-server. And each time we resubscribe to all subscriptions + * we currently have in the listener. + * + * zotero.stream has a resubscribe logic too, but it's currently disabled, + * because doesn't work as it should. Moreover it's better to have one + * subscriptions source here, than keep two separate subscriptions list: + * one here and another in zotero.stream. + */ Listener.prototype.connected = function () { - this.created = {}; - var data = this.current.map(toData); - if (data.length) { - this.stream.subscribe(data); + // If we just connected, it means nothing is subscribed. + this.topics = []; + + // Transform all subscriptions to stream-server suitable format. + var subscriptions = this.subscriptions.map(function (subscription) { + return { + topics: [subscription.topic], + apiKey: subscription.key + }; + }); + + // Todo: 'zotero.stream' should accept empty subscriptions array and ignore it. + if (subscriptions.length) { + this.stream.subscribe(subscriptions); } }; - +/** + * Handles stream-server 'subscriptionsCreated' response. + * + * subscriptionsCreated also triggers 'update' event, because we want to + * synchronize subscriptions only when started listening + * for the updates. Otherwise we could miss updates. + * + * @param subscriptions + */ Listener.prototype.subscribed = function (subscriptions) { // We always get all existing subscribed topic here, instead, // maybe we should only receive what was actually subscribed - // in this stream-server response. - var i, j, k, s, key, subscription; + // in this stream-server response? - for (i = 0; i < subscriptions.length; i++) { - subscription = subscriptions[i]; - key = subscription.apiKey; - - for (j = 0; j < subscription.topics.length; j++) { + // Collect all newly created topics. + var topics = []; + for (var i = 0; i < subscriptions.length; i++) { + var subscription = subscriptions[i]; + for (var j = 0; j < subscription.topics.length; j++) { var topic = subscription.topics[j]; - - if (!this.created[topic]) { - this.created[topic] = true; - debug('topic %s created...', topic); - - for (k = 0; k < this.current.length; k++) { - s = this.current[k]; - - if (s.topic === topic && s.key === key) { - this.emit('updated', s); - } - } - } else { - debug('topic %s already exists...', topic); + // Checking if topic is totally new for us. + if (this.topics.indexOf(topic) < 0 && topics.indexOf(topic) < 0) { + debug('new topic %s subscribed', topic); + topics.push(topic); } } } + + // Inform each subscription that is listening for one of the new topics. + for (var i = 0; i < this.subscriptions.length; i++) { + var subscription = this.subscriptions[i]; + if (topics.indexOf(subscription.topic) >= 0) { + this.emit('updated', subscription); + } + } }; +/** + * Handles stream-server 'topicUpdate' response. + * + * Notice: stream-server sends only one topicUpdate, + * no matter how many subscriptions stream server has + * for the same topic but different keys. + * + * @param data + */ Listener.prototype.updated = function (data) { debug('topic %s updated...', data.topic); - for (var i = 0; i < this.current.length; i++) { - var s = this.current[i]; - - if (s.topic === data.topic) { - this.emit('updated', s); + for (var i = 0; i < this.subscriptions.length; i++) { + var subscription = this.subscriptions[i]; + if (subscription.topic === data.topic) { + this.emit('updated', subscription); } } - return this; -}; - -Listener.prototype.error = function (error) { - debug('stream error: %s', error.message); }; +/** + * Adds subscriptions to this.subscriptions. + * Subscribes to stream-server if this topic and key pair doesn't exist + * in this.subscriptions. + * + * @param subscriptions + */ Listener.prototype.add = function (subscriptions) { assert(subscriptions); assert(this.stream); - if (!Array.isArray(subscriptions)) + if (!Array.isArray(subscriptions)) { subscriptions = [subscriptions]; + } assert(subscriptions.length); - debug('adding %d subscription(s)...', subscriptions.length); + debug('adding %d subscription(s)', subscriptions.length); - var data = []; + var streamSubscriptions = []; for (var i = 0; i < subscriptions.length; i++) { var subscription = subscriptions[i]; - // This, already existing subscriptions' filtering logic, - // can go to zotero-api-node. - var found = false; - for (var j = 0; j < this.current.length; j++) { - var s = this.current[j]; + // Checks if the topic and key pair exists in this.subscriptions. + var exists = false; + for (var j = 0; j < this.subscriptions.length; j++) { + var s = this.subscriptions[j]; if (s.topic === subscription.topic && s.key === subscription.key) { - found = true; + exists = true; break; } } - - if (!found) { - debug('subscribing %s to stream', subscription.topic); - data.push(toData(subscription)); - } else { - debug('skipping subscribing %s to stream', subscription.topic); + if (!exists) { + debug('subscribing to %s %s', subscription.topic, subscription.key); + streamSubscriptions.push({ + topics: [subscription.topic], + apiKey: subscription.key + }); } - this.current.push(pick(subscription, 'id', 'key', 'topic')); + // Add the subscription to the listener. + this.subscriptions.push({ + id: subscription.id, + topic: subscription.topic, + key: subscription.key + }); } - if (data.length) { - this.stream.subscribe(data); + if (streamSubscriptions.length) { + this.stream.subscribe(streamSubscriptions); } }; - -Listener.prototype.remove = function (subscription) { +/** + * Removes subscription by id. + * Unsubscribes from stream-server if no one else is using that topic and key pair. + * If the topic totally disappears from this.subscriptions, we also remove it + * from this.topics. + * + * @param id + */ +Listener.prototype.remove = function (id) { assert(this.stream); - var data = remove(this.current, { id: subscription.id }); + var data = null; + // Get the subscription by id, and remove it. + for (var i = 0; i < this.subscriptions.length; i++) { + var subscription = this.subscriptions[i]; + if (subscription.id === id) { + data = subscription; + this.subscriptions.splice(i, 1); + break; + } + } if (!data) { - debug('failed to remove %s: not registered', subscription.id); + debug('failed to remove %s: not registered', id); + return; } - var found = false; - for (var i = 0; i < this.current.length; i++) { - var s = this.current[i]; - if (s.topic === data.topic && s.key === data.key) { - found = true; + // Check if more subscriptions exist with the same topic and key. + var exists = false; + for (var i = 0; i < this.subscriptions.length; i++) { + var subscription = this.subscriptions[i]; + if (subscription.topic === data.topic && subscription.key === data.key) { + exists = true; break; } } - - if (!found) { - delete this.created[data.topic]; + if (!exists) { + // If that was the last subscription with this topic and key, + // unsubscribe it form stream-server. + // We are doing this because stream-server manages subscriptions + // by topic and key pairs. this.stream.unsubscribe({ - apiKey: data.key, topic: data.topic + topic: data.topic, + apiKey: data.key }); } -}; + // Check if more subscriptions exist with this topic (key doesn't matter). + exists = false; + for (var i = 0; i < this.subscriptions.length; i++) { + var subscription = this.subscriptions[i]; + if (subscription.topic === data.topic) { + exists = true; + break; + } + } + if (!exists) { + // If no more subscriptions exist for this topic, it means + // we are no longer subscribed to this topic. So let's remove it. + this.topics.splice(this.topics.indexOf(data.topic), 1); + } +}; Listener.prototype.start = function () { assert(!this.stream); @@ -170,12 +270,15 @@ Listener.prototype.start = function () { .on('topicUpdated', this.updated.bind(this)) .on('subscriptionsCreated', this.subscribed.bind(this)) .on('connected', this.connected.bind(this)) - .on('error', this.error.bind(this)); + .on('error', function (err) { + // Handle all zotero.stream errors. + // zotero.stream will reconnect automatically if errors are handled. + debug('stream error: %s', err.message); + }); return this; }; - /** * @method stop * @param {Number} [timeout] @@ -204,37 +307,5 @@ Listener.prototype.stop = function (timeout) { }); }; - -// --- Private Helpers --- - -function remove(list, properties) { - var idx = index(list, by(properties)); - - return (idx !== -1) ? list.splice(idx, 1)[0] : null; -} - -function by(properties) { - properties.key = properties.apiKey; - delete properties.apiKey; - - return function match(s) { - - for (var key in properties) - if (properties[key] != null && properties[key] !== s[key]) - return false; - - return true; - }; -} - -function toData(subscription) { - var data = { topics: [subscription.topic] }; - - if (subscription.key) - data.apiKey = subscription.key; - - return data; -} - // --- Exports --- module.exports = Listener; diff --git a/lib/subscription.js b/lib/subscription.js index 484f545..24981d3 100644 --- a/lib/subscription.js +++ b/lib/subscription.js @@ -162,6 +162,29 @@ properties(Subscription, { } }); +Subscription.validateItemsUrl = function (url, key) { + return new B(function (resolve, reject) { + if (!(/^(\/?(users|groups)\/\d+\/?(publications\/items|items))/) + .test(url)) { + return reject(new Error('Invalid URL')); + } + + var headers = {}; + if (key) { + headers['Zotero-API-Key'] = key; + } + zotero.request({ + method: 'get', + path: url, + headers: headers + }, null, function (error, resp) { + // This directly returns all dataserver errors + if (error) return reject(error); + resolve(resp); + }); + }); +}; + /** * Lookup a Zotero item's external id for a given plugin. diff --git a/lib/sync.js b/lib/sync.js index 4a4419a..b8023c2 100644 --- a/lib/sync.js +++ b/lib/sync.js @@ -336,7 +336,7 @@ Session.prototype.update = co(function* () { this.debug('requesting "%s"...', s.path); var message = yield this.get(s.path, { - limit: 50, + limit: 5000, format: 'versions' }); diff --git a/test/controller.js b/test/controller.js index 6fed7db..04ff64f 100644 --- a/test/controller.js +++ b/test/controller.js @@ -35,6 +35,10 @@ describe('Controller', function () { sinon.stub(controller, 'notify'); + sinon.stub(Subscription, 'validateItemsUrl', function () { + return B.fulfilled(this); + }); + controller.options.listen = false; }); @@ -43,6 +47,7 @@ describe('Controller', function () { Subscription.prototype.save.restore(); Subscription.prototype.destroy.restore(); Subscription.load.restore(); + Subscription.validateItemsUrl.restore(); controller.options.listen = true; }); diff --git a/test/http/api.js b/test/http/api.js index 95dae54..d6f0673 100644 --- a/test/http/api.js +++ b/test/http/api.js @@ -57,6 +57,10 @@ describe('API', function () { self.id = 'id'; return B.fulfilled(self); }); + + sinon.stub(Subscription, 'validateItemsUrl', function () { + return B.fulfilled(this); + }); }); afterEach(function () { @@ -66,6 +70,7 @@ describe('API', function () { Subscription.ids.restore(); Subscription.load.restore(); Subscription.prototype.save.restore(); + Subscription.validateItemsUrl.restore(); }); after(function () { @@ -118,7 +123,7 @@ describe('API', function () { expect(Subscription.load).to.have.been.calledThrice; expect(res.body).to.have.length(ids.length); - expect(res.body[0]).to.have.keys(['id', 'url', 'key', 'version']); + expect(res.body[0]).to.have.keys(['id', 'url', 'key', 'version', 'timestamp']); }); }); @@ -244,7 +249,7 @@ describe('API', function () { .and.to.be.json; expect(res.body).to.have.property('id', 'foo'); - expect(res.body).to.have.keys(['id', 'url', 'key', 'version']); + expect(res.body).to.have.keys(['id', 'url', 'key', 'version', 'timestamp']); }); }); }); diff --git a/test/listener.js b/test/listener.js index c3c973d..f0ad53f 100644 --- a/test/listener.js +++ b/test/listener.js @@ -20,8 +20,8 @@ describe('Listener', function () { sinon.stub(zotero, 'stream', function () { var stream = new EventEmitter(); - stream.subscribe = sinon.stub().yields(); - stream.unsubscribe = sinon.stub().yields(); + stream.subscribe = sinon.stub(); + stream.unsubscribe = sinon.stub(); return stream; }); @@ -46,27 +46,37 @@ describe('Listener', function () { describe('.remove', function () { beforeEach(function () { - listener.current.push({ - id: 'foo', key: 'bar', topic: '/users/42/items' + listener.subscriptions.push({ + id: 'id1', key: 'key1', topic: '/users/42' }); - listener.current.push({ - id: 'baz', topic: '/users/42/items' + listener.subscriptions.push({ + id: 'id2', topic: '/users/42' + }); + + listener.subscriptions.push({ + id: 'id3', topic: '/users/42' }); }); it('removes the subscription by id', function () { - expect(listener.current).to.have.length(2); + expect(listener.subscriptions).to.have.length(3); + + listener.remove('id1'); + expect(listener.subscriptions).to.have.length(2); + expect(listener.stream.unsubscribe).to.have.been.called; + expect(listener.stream.unsubscribe.args[0][0]) + .to.have.property('apiKey', 'key1'); + + listener.stream.unsubscribe.reset(); - return listener - .remove({ id: 'foo' }) - .then(function () { - expect(listener.current).to.have.length(1); + listener.remove('id2'); + expect(listener.subscriptions).to.have.length(1); + expect(listener.stream.unsubscribe).not.to.have.been.called; - expect(listener.stream.unsubscribe).to.have.been.called; - expect(listener.stream.unsubscribe.args[0][0]) - .to.have.property('apiKey', 'bar'); - }); + listener.remove('id3'); + expect(listener.subscriptions).to.have.length(0); + expect(listener.stream.unsubscribe).to.have.been.called; }); }); @@ -100,36 +110,6 @@ describe('Listener', function () { .to.have.property('topics') .and.to.contain(subscriptions[0].topic); }); - - it('adds the subscription to pending', function () { - expect(listener.pending).to.be.empty; - listener.add(subscriptions); - expect(listener.pending).to.have.length(1); - expect(listener.current).to.be.empty; - }); - - it('resolves the promise on resolve key/topic', function () { - process.nextTick(function () { - listener.resolve(subscriptions[0].key, subscriptions[0].topic); - expect(listener.pending).to.be.empty; - expect(listener.current).to.have.length(1); - }); - - return expect(listener.add(subscriptions)) - .eventually - .to.be.instanceof(Array) - .and.have.length(1); - }); - - it('rejects the promise on reject key/topic', function () { - process.nextTick(function () { - listener.reject(subscriptions[0].key, subscriptions[0].topic); - expect(listener.pending).to.be.empty; - }); - - return expect(listener.add(subscriptions)) - .to.eventually.be.rejected; - }); }); describe('when given multiple subscriptions', function () { diff --git a/test/subscription.js b/test/subscription.js index f4a7684..34d8437 100644 --- a/test/subscription.js +++ b/test/subscription.js @@ -51,7 +51,7 @@ describe('Subscription', function () { it('updates the timestamp', function (done) { var a, s = new Subscription(); - expect(s.timestamp).to.be.undefined; + expect(s.timestamp).to.equal(0); s.touch(); a = s.timestamp;