diff --git a/server/domain/models/dataset.js b/server/domain/models/dataset.js index 2d2d8616..75bddc15 100644 --- a/server/domain/models/dataset.js +++ b/server/domain/models/dataset.js @@ -6,7 +6,7 @@ import DatasetMetadata from './dataset-metadata' import Storage from '../../storage' import { fullDatasetPath, csvFromStream, storeFS } from '../../lib/util' -import { sendToWorkerQueue } from '../../lib/queue' +import DefaultQueue from '../../lib/queue' import { safeQuery } from '../../neo4j/connection' class Dataset extends Base { @@ -158,7 +158,7 @@ class Dataset extends Base { } async importCSV(removeExisting=false, options={}) { - sendToWorkerQueue({ + DefaultQueue.sendToWorker({ task: 'import_csv', uuid: this.uuid, paths: this.paths, @@ -170,7 +170,7 @@ class Dataset extends Base { async runTransformation() { const owner = await this.owner() - sendToWorkerQueue({ + DefaultQueue.sendToWorker({ task: 'generate', id: this.id, uuid: this.uuid, diff --git a/server/domain/repositories/transformationRepository.js b/server/domain/repositories/transformationRepository.js index c291d886..2f0d7908 100644 --- a/server/domain/repositories/transformationRepository.js +++ b/server/domain/repositories/transformationRepository.js @@ -1,5 +1,5 @@ import { safeQuery } from '../../neo4j/connection' -import { sendToWorkerQueue } from '../../lib/queue' +import DefaultQueue from '../../lib/queue' import Transformation from '../models/transformation' import shortid from 'shortid' @@ -59,7 +59,7 @@ export const saveInputTransformation = async (context, dataset, code) => { const owner = await dataset.owner() - sendToWorkerQueue({ + DefaultQueue.sendToWorker({ task: 'register_transformation', id: transformation.id, ownerName: owner.name diff --git a/server/lib/queue.js b/server/lib/queue.js index 70ddadc5..fb71a527 100644 --- a/server/lib/queue.js +++ b/server/lib/queue.js @@ -1,23 +1,27 @@ -import AMQP from 'amqplib' +import AMQPManager from 'amqp-connection-manager' import { pubsub } from '../graphql/pubsub' import Dataset from '../domain/models/dataset' const DATASET_UPDATED = 'DATASET_UPDATED' -let pendingDownloads = {} +const pendingDownloads = {} -const startChannel = async (conn, channelName, { durable, noAck }, callback) => { +const startChannel = (conn, channelName, { durable, noAck }, callback) => { const queueName = channelName - const ch = await conn.createChannel() - await ch.assertExchange(queueName, 'fanout', { durable }) - await ch.assertQueue(channelName, { durable }) - await ch.bindQueue(queueName, queueName, '') - await ch.consume(channelName, callback, { noAck }) - return ch + conn.createChannel({ + json: true, + setup: async (ch) => { + await ch.assertExchange(queueName, 'fanout', { durable }) + await ch.assertQueue(channelName, { durable }) + await ch.bindQueue(queueName, queueName, '') + await ch.consume(channelName, callback, { noAck }) + return ch + } + }) } -const startQueue = async () => { - const conn = await AMQP.connect('amqp://queue') +const startQueue = () => { + const conn = AMQPManager.connect(['amqp://queue']) startChannel(conn, 'dataset-status', { durable: false, noAck: true }, async (msg) => { const msgJSON = JSON.parse(msg.content.toString()) @@ -55,26 +59,38 @@ const startQueue = async () => { return conn } -const sendToWorkerQueue = async (msg) => { - const conn = await AMQP.connect('amqp://queue') - const ch = await conn.createChannel() - const ok = await ch.assertQueue('python-worker', { durable: false }) - ch.sendToQueue('python-worker', Buffer.from(JSON.stringify(msg))) -} - +class AMQP { + async start() { + this.conn = startQueue() + this.worker = await this.conn.createChannel({ + json: true, + setup: ch => ch.assertQueue('python-worker', { durable: false }) + }) + } + + async sendToWorker(msg) { + this.worker.sendToQueue('python-worker', msg) + } + + async prepareDownload(dataset, callback) { + // TODO: Pass a unique download ID (have tasks send JSON as argument) + const owner = await dataset.owner() + + this.sendToWorker({ + task: 'prepare_download', + id: dataset.id, + ownerName: owner.name + }) -const prepareDownload = async (dataset, callback) => { - // TODO: Pass a unique download ID (have tasks send JSON as argument) - const owner = await dataset.owner() - - sendToWorkerQueue({ - task: 'prepare_download', - id: dataset.id, - ownerName: owner.name - }) + // TODO: Create a unique download ID + pendingDownloads[`${dataset.id}`] = callback + } - // TODO: Create a unique download ID - pendingDownloads[`${dataset.id}`] = callback + close() { + this.conn.close() + } } -export { startQueue, sendToWorkerQueue, prepareDownload } +const DefaultQueue = new AMQP() + +export default DefaultQueue diff --git a/server/main.js b/server/main.js index fb496c13..b9ec6e42 100644 --- a/server/main.js +++ b/server/main.js @@ -24,7 +24,7 @@ import resolvers from './graphql/resolvers' import typeDefs from './graphql/typedefs' import schemaDirectives from './graphql/directives' -import { startQueue, prepareDownload } from './lib/queue' +import DefaultQueue from './lib/queue' import User from './domain/models/user' import Dataset from './domain/models/dataset' import { checkConfig } from './lib/startup-checks' @@ -191,7 +191,7 @@ app.get('/dataset/:id', async (req, res) => { const dataset = await Dataset.get(req.params.id) if (dataset && await dataset.canAccess(req.user)) { - prepareDownload(dataset, () => { + DefaultQueue.prepareDownload(dataset, () => { res.attachment(`${dataset.name}.csv`) dataset.readStream().pipe(res) }) @@ -212,11 +212,11 @@ const server = httpServer.listen(PORT, (err) => { console.log(`Subscriptions ready at ws://server:${PORT}${apolloServer.subscriptionsPath}`) }) -const queueConnection = startQueue() +DefaultQueue.start() // Close all connections on shutdown onExit(() => { console.log('Shutting down...') server.close() - queueConnection.close() + DefaultQueue.close() }, { alwaysLast: true }) diff --git a/server/package-lock.json b/server/package-lock.json index e5eb6c1e..062961a6 100644 --- a/server/package-lock.json +++ b/server/package-lock.json @@ -1316,6 +1316,14 @@ "json-schema-traverse": "^0.3.0" } }, + "amqp-connection-manager": { + "version": "2.3.0", + "resolved": "https://registry.npmjs.org/amqp-connection-manager/-/amqp-connection-manager-2.3.0.tgz", + "integrity": "sha512-DvebklFknBkareuf3wxE9X1Eo7l0UK1MgeO9m4B2T/h0OvzLRYsXTtQ8OrkXfgkg98FgKRRR9Nyz9+86aJFEaQ==", + "requires": { + "promise-breaker": "^4.1.2" + } + }, "amqplib": { "version": "0.5.2", "resolved": "https://registry.npmjs.org/amqplib/-/amqplib-0.5.2.tgz", @@ -8945,6 +8953,11 @@ "integrity": "sha512-OE+a6vzqazc+K6LxJrX5UPyKFvGnL5CYmq2jFGNIBWHpc4QyE49/YOumcrpQFJpfejmvRtbJzgO1zPmMCqlbBg==", "dev": true }, + "promise-breaker": { + "version": "4.1.13", + "resolved": "https://registry.npmjs.org/promise-breaker/-/promise-breaker-4.1.13.tgz", + "integrity": "sha512-+lGBqmBEgyvKweIrK4smdN1YxdYp5YjSL1us2XhTMBbZf98jdeGys/Edt5S1b1NXMVRQrvh4DrMgGpYPbXZf3g==" + }, "prompts": { "version": "0.1.14", "resolved": "https://registry.npmjs.org/prompts/-/prompts-0.1.14.tgz", diff --git a/server/package.json b/server/package.json index 709eb17f..e7a76e08 100644 --- a/server/package.json +++ b/server/package.json @@ -4,6 +4,7 @@ "description": "server for adi", "main": "index.js", "dependencies": { + "amqp-connection-manager": "^2.3.0", "amqplib": "^0.5.2", "apollo-server-express": "^2.2.3", "bcrypt": "^3.0.2",