Skip to content

Commit

Permalink
use amqp-connection-manager to keep queue connections alive
Browse files Browse the repository at this point in the history
Made changes to the queue library so that it uses the connection manager
to create connections. The setup is very similar, but the connection manager's
slightly more verbose setup allows it to manage keeping the queue alive,
which is especially important due to recommended keepalive timeouts
occasionally disconnecting it.

While there, I also made some other changes to further encapsulate the
queue connection, making it easier to shut down when the site shuts down.
  • Loading branch information
agentdave committed Dec 13, 2018
1 parent e9308e8 commit 6850e89
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 39 deletions.
6 changes: 3 additions & 3 deletions server/domain/models/dataset.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import DatasetMetadata from './dataset-metadata'

import Storage from '../../storage'
import { fullDatasetPath, csvFromStream, storeFS } from '../../lib/util'
import { sendToWorkerQueue } from '../../lib/queue'
import DefaultQueue from '../../lib/queue'
import { safeQuery } from '../../neo4j/connection'

class Dataset extends Base {
Expand Down Expand Up @@ -158,7 +158,7 @@ class Dataset extends Base {
}

async importCSV(removeExisting=false, options={}) {
sendToWorkerQueue({
DefaultQueue.sendToWorker({
task: 'import_csv',
uuid: this.uuid,
paths: this.paths,
Expand All @@ -170,7 +170,7 @@ class Dataset extends Base {

async runTransformation() {
const owner = await this.owner()
sendToWorkerQueue({
DefaultQueue.sendToWorker({
task: 'generate',
id: this.id,
uuid: this.uuid,
Expand Down
4 changes: 2 additions & 2 deletions server/domain/repositories/transformationRepository.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { safeQuery } from '../../neo4j/connection'
import { sendToWorkerQueue } from '../../lib/queue'
import DefaultQueue from '../../lib/queue'
import Transformation from '../models/transformation'
import shortid from 'shortid'

Expand Down Expand Up @@ -59,7 +59,7 @@ export const saveInputTransformation = async (context, dataset, code) => {

const owner = await dataset.owner()

sendToWorkerQueue({
DefaultQueue.sendToWorker({
task: 'register_transformation',
id: transformation.id,
ownerName: owner.name
Expand Down
76 changes: 46 additions & 30 deletions server/lib/queue.js
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
import AMQP from 'amqplib'
import AMQPManager from 'amqp-connection-manager'
import { pubsub } from '../graphql/pubsub'
import Dataset from '../domain/models/dataset'

const DATASET_UPDATED = 'DATASET_UPDATED'

let pendingDownloads = {}
const pendingDownloads = {}

const startChannel = async (conn, channelName, { durable, noAck }, callback) => {
const startChannel = (conn, channelName, { durable, noAck }, callback) => {
const queueName = channelName
const ch = await conn.createChannel()
await ch.assertExchange(queueName, 'fanout', { durable })
await ch.assertQueue(channelName, { durable })
await ch.bindQueue(queueName, queueName, '')
await ch.consume(channelName, callback, { noAck })
return ch
conn.createChannel({
json: true,
setup: async (ch) => {
await ch.assertExchange(queueName, 'fanout', { durable })
await ch.assertQueue(channelName, { durable })
await ch.bindQueue(queueName, queueName, '')
await ch.consume(channelName, callback, { noAck })
return ch
}
})
}

const startQueue = async () => {
const conn = await AMQP.connect('amqp://queue')
const startQueue = () => {
const conn = AMQPManager.connect(['amqp://queue'])

startChannel(conn, 'dataset-status', { durable: false, noAck: true }, async (msg) => {
const msgJSON = JSON.parse(msg.content.toString())
Expand Down Expand Up @@ -55,26 +59,38 @@ const startQueue = async () => {
return conn
}

const sendToWorkerQueue = async (msg) => {
const conn = await AMQP.connect('amqp://queue')
const ch = await conn.createChannel()
const ok = await ch.assertQueue('python-worker', { durable: false })
ch.sendToQueue('python-worker', Buffer.from(JSON.stringify(msg)))
}

class AMQP {
async start() {
this.conn = startQueue()
this.worker = await this.conn.createChannel({
json: true,
setup: ch => ch.assertQueue('python-worker', { durable: false })
})
}

async sendToWorker(msg) {
this.worker.sendToQueue('python-worker', msg)
}

async prepareDownload(dataset, callback) {
// TODO: Pass a unique download ID (have tasks send JSON as argument)
const owner = await dataset.owner()

this.sendToWorker({
task: 'prepare_download',
id: dataset.id,
ownerName: owner.name
})

const prepareDownload = async (dataset, callback) => {
// TODO: Pass a unique download ID (have tasks send JSON as argument)
const owner = await dataset.owner()

sendToWorkerQueue({
task: 'prepare_download',
id: dataset.id,
ownerName: owner.name
})
// TODO: Create a unique download ID
pendingDownloads[`${dataset.id}`] = callback
}

// TODO: Create a unique download ID
pendingDownloads[`${dataset.id}`] = callback
close() {
this.conn.close()
}
}

export { startQueue, sendToWorkerQueue, prepareDownload }
const DefaultQueue = new AMQP()

export default DefaultQueue
8 changes: 4 additions & 4 deletions server/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import resolvers from './graphql/resolvers'
import typeDefs from './graphql/typedefs'
import schemaDirectives from './graphql/directives'

import { startQueue, prepareDownload } from './lib/queue'
import DefaultQueue from './lib/queue'
import User from './domain/models/user'
import Dataset from './domain/models/dataset'
import { checkConfig } from './lib/startup-checks'
Expand Down Expand Up @@ -191,7 +191,7 @@ app.get('/dataset/:id', async (req, res) => {
const dataset = await Dataset.get(req.params.id)

if (dataset && await dataset.canAccess(req.user)) {
prepareDownload(dataset, () => {
DefaultQueue.prepareDownload(dataset, () => {
res.attachment(`${dataset.name}.csv`)
dataset.readStream().pipe(res)
})
Expand All @@ -212,11 +212,11 @@ const server = httpServer.listen(PORT, (err) => {
console.log(`Subscriptions ready at ws://server:${PORT}${apolloServer.subscriptionsPath}`)
})

const queueConnection = startQueue()
DefaultQueue.start()

// Close all connections on shutdown
onExit(() => {
console.log('Shutting down...')
server.close()
queueConnection.close()
DefaultQueue.close()
}, { alwaysLast: true })
13 changes: 13 additions & 0 deletions server/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"description": "server for adi",
"main": "index.js",
"dependencies": {
"amqp-connection-manager": "^2.3.0",
"amqplib": "^0.5.2",
"apollo-server-express": "^2.2.3",
"bcrypt": "^3.0.2",
Expand Down

0 comments on commit 6850e89

Please sign in to comment.