Skip to content

Commit

Permalink
Add queue check and reject promises if there is a nonexistent queue i…
Browse files Browse the repository at this point in the history
…n producer. Closes  #45

Add tests to verify in case rcp:true and rcp: false
  • Loading branch information
mrister committed Jul 19, 2016
1 parent bf4f973 commit ad8031f
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 82 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,6 @@ node_modules

# generated doc output
docs/

# IntellijIDEA dir
.idea/
140 changes: 85 additions & 55 deletions src/modules/producer.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
var utils = require('./utils'),
uuid = require('node-uuid'),
parsers = require('./message-parsers');
const utils = require('./utils');
const uuid = require('node-uuid');
const parsers = require('./message-parsers');

var amqpRPCQueues = {};

/**
* Checks if the channel exists
* @param {Object} channel the channel to check
* @param {String} queueName name of the queue to check on channel
* @returns {Promise} rejects if there is no queue
*/
function checkQueue(channel, queueName) {
return channel
.checkQueue(queueName)
.catch((err) => {
// error means there is no queue
err.type = 'no_queue';
throw err;
});
}
/**
* Create a RPC-ready queue
* @param {string} queue the queue name in which we send a RPC request
Expand All @@ -14,27 +29,37 @@ function createRpcQueue(queue) {
amqpRPCQueues[queue] = {};
}

let rpcQueue = amqpRPCQueues[queue];
const rpcQueue = amqpRPCQueues[queue];
if (rpcQueue.queue) return Promise.resolve(rpcQueue.queue);

//we create the callback queue using base queue name + appending config hostname and :res for clarity
//ie. if hostname is gateway-http and queue is service-oauth, response queue will be service-oauth:gateway-http:res
//it is important to have different hostname or no hostname on each module sending message or there will be conflicts
var resQueue = queue + ':' + this.conn.config.hostname + ':res';
rpcQueue.queue = this.conn.get().then((channel) => {
return channel.assertQueue(resQueue, { durable: true, exclusive: true })
.then((_queue) => {
rpcQueue.queue = _queue.queue;

//if channel is closed, we want to make sure we cleanup the queue so future calls will recreate it
this.conn.addListener('close', () => { delete rpcQueue.queue; createRpcQueue.call(this, queue); });

return channel.consume(_queue.queue, maybeAnswer.call(this, queue), { noAck: true });
})
.then(() => rpcQueue.queue);
rpcQueue.queue = this
.conn
.get()
.then((channel) => {
return checkQueue(channel, queue)
.then(()=> {
return channel.assertQueue(resQueue, {durable: true, exclusive: true})
.then((_queue) => {
rpcQueue.queue = _queue.queue;

//if channel is closed, we want to make sure we cleanup the queue so future calls will recreate it
this.conn.addListener('close', () => {
delete rpcQueue.queue;
createRpcQueue.call(this, queue);
});

return channel.consume(_queue.queue, maybeAnswer.call(this, queue), {noAck: true});
})
.then(() => rpcQueue.queue);
});
})
.catch(() => {
.catch((err) => {
delete rpcQueue.queue;
if (err.type === 'no_queue') throw err;
return utils.timeoutPromise(this.conn.config.timeout).then(() => {
return createRpcQueue.call(this, queue);
});
Expand All @@ -60,46 +85,50 @@ function maybeAnswer(queue) {
rpcQueue[corrId].resolve(parsers.in(msg));
this.conn.config.transport.info('bmq:producer', '[' + queue + '] < answer');
delete rpcQueue[corrId];
} catch(e) {
} catch (e) {
this.conn.config.transport.error(new Error('Receiving RPC message from previous session: callback no more in memory. ' + queue));
}
};
}

function publishOrSendToQueue(queue, msg, options) {
if (!options.routingKey) {
return this.channel.sendToQueue(queue, msg, options);
} else {
return this.channel.publish(queue, options.routingKey, msg, options);
}
return checkQueue(this.channel, queue)
.then(() => {
if (!options.routingKey) {
return this.channel.sendToQueue(queue, msg, options);
} else {
return this.channel.publish(queue, options.routingKey, msg, options);
}
});
}

/**
* Send message with or without rpc protocol, and check if RPC queues are created
* @param {string} queue the queue to send `msg` on
* @param {any} msg string, object, number.. anything bufferable/serializable
* @param {*} msg string, object, number.. anything bufferable/serializable
* @param {object} options contain rpc property (if true, enable rpc for this message)
* @return {Promise} Resolves when message is correctly sent, or when response is received when rpc is enabled
*/
function checkRpc (queue, msg, options) {
function checkRpc(queue, msg, options) {
//messages are persistent
options.persistent = true;

if (options.rpc) {
return createRpcQueue.call(this, queue)
.then(() => {
//generates a correlationId (random uuid) so we know which callback to execute on received response
var corrId = uuid.v4();
options.correlationId = corrId;
//reply to us if you receive this message!
options.replyTo = amqpRPCQueues[queue].queue;

publishOrSendToQueue.call(this, queue, msg, options);

//defered promise that will resolve when response is received
amqpRPCQueues[queue][corrId] = Promise.defer();
return amqpRPCQueues[queue][corrId].promise;
});
.then(() => {
//generates a correlationId (random uuid) so we know which callback to execute on received response
var corrId = uuid.v4();
options.correlationId = corrId;
//reply to us if you receive this message!
options.replyTo = amqpRPCQueues[queue].queue;

return publishOrSendToQueue.call(this, queue, msg, options)
.then(() => {
//defered promise that will resolve when response is received
amqpRPCQueues[queue][corrId] = Promise.defer();
return amqpRPCQueues[queue][corrId].promise;
});
});
}

return publishOrSendToQueue.call(this, queue, msg, options);
Expand All @@ -108,36 +137,37 @@ function checkRpc (queue, msg, options) {
/**
* Ensure channel exists and send message using `checkRpc`
* @param {string} queue The destination queue on which we want to send a message
* @param {any} msg Anything serializable/bufferable
* @param {*} msg Anything serializable/bufferable
* @param {object} options message options (persistent, durable, rpc, etc.)
* @return {Promise} checkRpc response
*/
function produce(queue, msg, options) {
//default options are persistent and durable because we do not want to miss any outgoing message
//unless user specify it
options = Object.assign({ persistent: true, durable: true }, options);
options = Object.assign({persistent: true, durable: true}, options);

return this.conn.get()
.then((_channel) => {
this.channel = _channel;
.then((_channel) => {
this.channel = _channel;

//undefined can't be serialized/buffered :p
if (!msg) msg = null;
//undefined can't be serialized/buffered :p
if (!msg) msg = null;

this.conn.config.transport.info('bmq:producer', '[' + queue + '] > ', msg);
this.conn.config.transport.info('bmq:producer', '[' + queue + '] > ', msg);

return checkRpc.call(this, queue, parsers.out(msg, options), options);
})
.catch((err) => {
//add timeout between retries because we don't want to overflow the CPU
this.conn.config.transport.error('bmq:producer', err);
return utils.timeoutPromise(this.conn.config.timeout)
.then(() => {
return produce.call(this, queue, msg, options);
return checkRpc.call(this, queue, parsers.out(msg, options), options);
})
.catch((err) => {
if (err.type === 'no_queue') throw err;
//add timeout between retries because we don't want to overflow the CPU
this.conn.config.transport.error('bmq:producer', err);
return utils.timeoutPromise(this.conn.config.timeout)
.then(() => {
return produce.call(this, queue, msg, options);
});
});
});
}

module.exports = function(conn) {
return { conn, produce };
module.exports = function (conn) {
return {conn, produce};
};
94 changes: 67 additions & 27 deletions test/producer-consumer-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ var fixtures = {

var letters = 0;

describe('producer/consumer', function() {
before(function() {
describe('producer/consumer', function () {
before(function () {
this.timeout(20000);
return docker.start();
});
Expand All @@ -22,7 +22,7 @@ describe('producer/consumer', function() {
return docker.stop();
});

describe('msg delevering', function() {
describe('msg delevering', function () {
before(() => {
return consumer.consume(fixtures.queues[0], function () {
letters--;
Expand All @@ -35,7 +35,7 @@ describe('producer/consumer', function() {

it('should be able to consume message sended by producer to queue [test-queue-0]', function () {
letters++;
return producer.produce(fixtures.queues[0], { msg: uuid.v4() })
return producer.produce(fixtures.queues[0], {msg: uuid.v4()})
.then(() => utils.timeoutPromise(300))
.then(() => assert.equal(letters, 0));
});
Expand Down Expand Up @@ -66,7 +66,7 @@ describe('producer/consumer', function() {
var messages = [];
letters += 200;

for(let i = 0; i < count; i++) {
for (let i = 0; i < count; i++) {
messages.push(producer.produce(fixtures.queues[0], null));
messages.push(producer.produce(fixtures.queues[1], null));
}
Expand All @@ -78,38 +78,78 @@ describe('producer/consumer', function() {

});

describe('msg requeueing', function() {
describe('msg requeueing', function () {
it('should be able to consume message, but throw error so the message is requeued again on queue [test-queue-0]', function (done) {
var attempt = 3;

consumer.consume(fixtures.queues[3], function (_msg) {
assert(typeof _msg === 'object');

--attempt;
if (!attempt) {
return done();
} else {
throw new Error('Any kind of error');
}
})
.then(function () {
producer.produce(fixtures.queues[3], { msg: uuid.v4() })
.then(function (response) {
assert(response === true);
++letters;
assert(typeof _msg === 'object');

--attempt;
if (!attempt) {
return done();
} else {
throw new Error('Any kind of error');
}
})
.then(function () {
producer.produce(fixtures.queues[3], {msg: uuid.v4()})
.then(function (response) {
assert(response === true);
++letters;
});
});
});
});

describe('producer/no queue', function () {
it('should reject for rpc channel in case the queue does not exists', function (done) {
var fakeQueue = 'fake:queue:name';
producer.produce(fakeQueue, {msg: Date.now()}, {rpc: true})
.then(function () {
done(new Error('it should reject'));
})
.catch(function (err) {
try {
assert(err instanceof Error);
assert(err.message.indexOf('NOT_FOUND') > -1);
assert(err.message.indexOf('404') > -1);
assert(err.message.indexOf(fakeQueue) > -1);
done();
} catch (e) {
done(e);
}
});
});

it('should reject for non rpc channel in case the queue does not exists', function (done) {
var fakeQueue = 'fake:queue:name';
producer.produce(fakeQueue, {msg: Date.now()}, {rpc: false})
.then(function () {
done(new Error('it should reject'));
})
.catch(function (err) {
try {
assert(err instanceof Error);
assert(err.message.indexOf('NOT_FOUND') > -1);
assert(err.message.indexOf('404') > -1);
assert(err.message.indexOf(fakeQueue) > -1);
done();
} catch (e) {
done(e);
}
});
});
});
});

describe('routing keys', function () {
it('should be able to send a message to a rounting key exchange', function() {
it('should be able to send a message to a rounting key exchange', function () {
return consumer.consume(fixtures.routingKey, function (message) {
assert.equal(message.content, 'ok');
})
.then(() => {
return producer.produce(fixtures.routingKey, { content: 'ok' }, { routingKey: 'route' });
});
assert.equal(message.content, 'ok');
})
.then(() => {
return producer.produce(fixtures.routingKey, {content: 'ok'}, {routingKey: 'route'});
});
});
});
});

0 comments on commit ad8031f

Please sign in to comment.