-
Notifications
You must be signed in to change notification settings - Fork 0
/
mq.js
47 lines (39 loc) · 1.21 KB
/
mq.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
'use strict';
// exporting an instance
var config = rootRequire('config');
var AMQP = rootRequire('amqp');
// creating mq
var mq = new AMQP(Object.assign({}, {logger: logger}, config.mq));
var exchangeName = config.mq.exchangeName;
var localQueue = [];
// fixme: we should inject this dependency
var logger = rootRequire('logger').prefix('MQ');
mq.send = data => {
try {
mq.channel.publish(exchangeName, '', new Buffer(JSON.stringify(data)));
logger.log('send ' + JSON.stringify(data));
} catch (e) {
if (config.mq.displayErrors) {
logger.error('[mq-published]: send ' + e + ', stacking message');
}
setImmediate(() => {
localQueue.push(JSON.parse(JSON.stringify(data)));
// security: if the nb of standby events is too large, skip the old ones
if (localQueue.length > 10000) {
localQueue.shift();
}
});
}
};
// CHANNEL OPEN => DRAIN
mq.on('channel.opened', () => {
mq.channel.assertExchange(exchangeName, 'fanout', { durable: true });
// drain localQueue
logger.log('channel.opened: draining ' + localQueue.length + ' messages');
localQueue.forEach(message => {
mq.send(message);
});
localQueue = []; // reset.
});
mq.open();
module.exports = mq;