Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combined update #10

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
language: node_js

node_js:
- "5"
- "8"
- "6"
- "4"
- "0.12"

sudo: false
cache:
Expand All @@ -13,12 +13,8 @@ cache:
matrix:
fast_finish: true

before_install:
# For 0.12 without harmony
- npm install gnode

script: "make test-travis"
after_script: "npm install coveralls@2.10.0 && cat ./coverage/lcov.info | coveralls"
after_script: "npm install coveralls@2.12.0 && cat ./coverage/lcov.info | coveralls"

notifications:
email:
Expand Down
4 changes: 2 additions & 2 deletions config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
},

"plugins": [
"./logger",
"./fs"
"logger",
"fs"
],

"redis": {
Expand Down
179 changes: 126 additions & 53 deletions lib/controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,78 @@ var Listener = require('./listener');

/** @module arkivo */

/**
* # How it works?
* ## What happens on startup?
* 1. Arkivo is started.
* 2. Continues processing the previous Kue jobs [subscribe, unusbscribe, sync].
* 3. Creates listener, connects to stream-server, subscribes to all
* topics from all subscriptions.
* 4. Stream-server responds with subscriptionsCreated, and the listener
* emits 'update' event for each subscription that is listening
* for the specific topic and key pair.
* 5. Sync jobs are scheduled for each subscription. And this basically means
* that on each stream-server (re)connect we (re)synchronize all subscriptions.
*
* ## What happens when creating a subscription?
* 1. The URL is validated to make sure it's an items url and we have access to it.
* 2. If the URL is ok, the new subscription is created and saved.
* 3. Two concurrent operations are issued:
* subscribe to stream-server, and schedule the first sync job.
* 4. Listener receives 'subscriptionsCreated' response, and emits
* 'update' events only for subscriptions which are newly subscribed
* to stream-server.
* 5. Each 'update' event should schedule a 'sync' job for the updated
* subscription, but in practise the job will be skipped because,
* it should already exists when created in point 4.
*
* ## What happens when a topic is updated?
* 1. Listener receives 'topicUpdated' from stream-server.
* 2. Each subscription that is listening for the updated topic
* will get the 'update' event.
* 3. Each update event should schedule a 'sync' job for that subscription,
* unless there already exists a 'sync' job for the subscription.
*
* ## What happens when removing subscription?
* 1. Destroys the actual subscription.
* 2. Removes the subscription from the listener.
* 3. Listener checks if there is no more subscriptions listening
* for the same topic(+key) combination, and usubscribes from
* stream-server if so.
*
* ---
*
* # Optional stream-server
* Stream-sever can be disabled by setting listen:false in config.
* ## What this would change?
* * On startup we only continue to do the previously scheduled jobs.
* * But we don't initialize the listener.
* * Therefore we don't add subscriptions to it.
* * Therefore the listener doesn't subscribe to stream-server.
* * Therefore we don't get subscriptionsCreated response from server,
* which emits 'update' event and schedules 'sync' jobs.
* * But, 'sync' job is scheduled on subscription creation.
* * And also it can be triggered by `POST /api/sync` API request.
*
* ---
*
* # The Kue based 'sync' queue
* The Kue based sync queue is probably unnecessary and can be replaced with
* a simple array marking subscriptions that need to be updated:
* var subscriptionsToUpdate = [
* "vrrui780gb",
* "olxbwld2t4",
* ]
*
* Especially because Kue doesn't have a simple way to deduplicate jobs.
*
* The parameter 'skip' (to skip fetching already existing items)
* can be saved together with the subscription,
* therefore we would no longer need to persist the sync queue.
* On each restart all subscriptions are resynchronized anyway.
* Unless there is something else why we need the persisting sync queue?
*
*/

/**
* @class Controller
Expand All @@ -50,10 +122,9 @@ function Controller(options) {
id: subscription.id,
title: 'Zotero Stream-issued Synchronization Request'
}, this.options.delay, this.options.attempts);
}.bind(this))

.on('error', fail('uncaught listener error'));
}.bind(this));

this.subscriptionsInSyncQueue = {};

/**
* The default job types to process.
Expand Down Expand Up @@ -100,18 +171,27 @@ Controller.prototype.subscribe = function (data, job) {

report(job, ['subscribing to "%s"...', subscription.url]);

return subscription
.save()

.tap(function (s) {
// Theoretically the validated url can become invalid at the time
// when stream-server tries to create a subscription.
// But in practice this shouldn't be a problem.
return Subscription.validateItemsUrl(data.url, data.key)
.then(function () {
return subscription.save();
})
.then(function () {
if (this.options.listen) {
this.listener.add(subscription);
}
// If sync job would finish before stream-server
// would add the new subscription, this could result in missed updates,
// but each new stream-server subscription schedules another sync job,
// therefore this problem should be impossible.
this.notify('sync', {
id: s.id,
id: subscription.id,
skip: data.skip
}, 0, this.options.attempts);

if (this.options.listen)
return this.listener.add(s);

return subscription;
}.bind(this));
};

Expand Down Expand Up @@ -144,7 +224,7 @@ Controller.prototype.unsubscribe = function (data, job) {
.call('destroy')

.tap(function (s) {
if (this.options.listen) return this.listener.remove(s);
if (this.options.listen) this.listener.remove(s.id);
}.bind(this));
};

Expand Down Expand Up @@ -177,6 +257,8 @@ Controller.prototype.synchronize = function (data, job) {

var lock = this.lock;

delete this.subscriptionsInSyncQueue[data.id];

if (data.all) {
report(job, ['loading all subscriptions...']);
subscriptions = Subscription.all();
Expand All @@ -197,7 +279,7 @@ Controller.prototype.synchronize = function (data, job) {

subscriptions = Subscription
.load(data.id)
.then(listify);
.then(function (subscription) { return [subscription]; });
}

return subscriptions
Expand Down Expand Up @@ -266,6 +348,17 @@ Controller.prototype.sync = Controller.prototype.synchronize;
* @return {Promise<Kue.Job>}
*/
Controller.prototype.notify = function (name, data, delay, attempts) {

// This is a workaround to job deduplication which is not supported
// by Kue. We just skip the job if its sync id is in queue.
// This doesn't help if Arkivo is restarted.
if (name === 'sync') {
if (this.subscriptionsInSyncQueue[data.id]) {
return B.resolve(null);
}
this.subscriptionsInSyncQueue[data.id] = true;
}

return new B(function (resolve, reject) {
var job = q.jobs.create(name, data);

Expand Down Expand Up @@ -348,39 +441,21 @@ Controller.prototype.start = function (jobs) {
var workers = this.options.workers;
var listener = this.listener;

// Initially sync all subscriptions;
// then start processing new or interrupted jobs!
var subscriptions = this
.sync({ all: true })
.catch(fail('initial synchronization failed'));
jobs.forEach(function (type) {
debug('listening for "%s" requests...', type);
q.jobs.process(type, workers, handler);
});

subscriptions
.finally(function resume() {
q.interrupted().each(function (job) {
debug('re-sheduling interrupted job %s#%d...', job.type, job.id);
job.inactive();
});

jobs.forEach(function (type) {
debug('listening for "%s" requests...', type);
q.jobs.process(type, workers, handler);
});

q.interrupted().each(function (job) {
debug('re-sheduling interrupted job %s#%d...', job.type, job.id);
job.inactive();
});
});


// Connect to Zotero Stream API and start
// listening when initial sync is done.
// Connect to Zotero Stream API and start listening
if (this.options.listen) {
listener
.start()
.once('connected', function () {
debug('listener connected successfully');

subscriptions
.then(function (s) { if (s.length) return listener.add(s); });
});

listener.start();
var subscriptions = Subscription.all();
subscriptions.tap(function (s) { if (s.length) listener.add(s); });
}

return this;
Expand Down Expand Up @@ -452,16 +527,14 @@ function progressor(job, total) {
};
}

function fail(message) {
return function failed(error) {
debug('%s: %s', message, error.message);
trace(error.stack);

throw error;
};
}

function listify(i) { return [i]; }
// function fail(message) {
// return function failed(error) {
// debug('%s: %s', message, error.message);
// trace(error.stack);
//
// throw error;
// };
// }

// --- Exports ---
module.exports = Controller;
2 changes: 2 additions & 0 deletions lib/http/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ Server.prototype.initialize = function () {

this.app = express();

this.app.set('json spaces', 2);

this.app.use(this.options.api, q.app);
this.app.use(this.options.api, routes.api);

Expand Down
Loading