Skip to content

Commit

Permalink
feat: (#93) add support for bulkPublish operation
Browse files Browse the repository at this point in the history
  • Loading branch information
arobson committed Feb 18, 2018
1 parent cccdf09 commit d2df5ea
Show file tree
Hide file tree
Showing 4 changed files with 285 additions and 18 deletions.
48 changes: 47 additions & 1 deletion docs/publishing.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ rabbit.publish( "exchange.name",
);
```

## `request( exchangeName, options, [connectionName] )`
## `rabbot.request( exchangeName, options, [connectionName] )`

This works just like a publish except that the promise returned provides the response (or responses) from the other side. A `replyTimeout` is available in the options that controls how long rabbot will wait for a reply before removing the subscription for the request to prevent memory leaks.

Expand All @@ -74,3 +74,49 @@ rabbit.request( "request.exchange", {
// the last message in a series OR the only reply will be sent to this callback
} );
```

## `rabbot.bulkPublish( set, [connectionName] )`

This creates a promise for a set of publishes to one or more exchanges on the same connection.

It is a little more efficient than calling `publish` repeatedly as it performs the precondition checks up-front, a single time before it beings the publishing.

It supports two separate formats for specifying a set of messages: hash and array.

### Hash Format

Each key is the name of the exchange to publish to and the value is an array of messages to send. Each element in the array follows the same format as the `publish` options.

The exchanges are processed serially, so this option will not work if you want finer control over sending messages to multiple exchanges in interleaved order.

```js
rabbot.publish({

This comment has been minimized.

Copy link
@djMax

djMax Mar 2, 2018

rabbot.bulkPublish no?

'exchange-1': [
{ type: 'one', body: '1' },
{ type: 'one', body: '2' }
],
'exchange-2': [
{ type: 'two', body: '1' },
{ type: 'two', body: '2' }
]
}).then(
() => // a list of the messages of that succeeded,
failed => // a list of failed messages and the errors `{ err, message }`
)
```

### Array Format

Each element in the array follows the format of `publish`'s option but requires the `exchange` property to control which exchange to publish each message to.

```js
rabbot.publish([

This comment has been minimized.

Copy link
@djMax

djMax Mar 2, 2018

bulkPublish?

{ type: 'one', body: '1', exchange: 'exchange-1' },
{ type: 'one', body: '2', exchange: 'exchange-1' },
{ type: 'two', body: '1', exchange: 'exchange-2' },
{ type: 'two', body: '2', exchange: 'exchange-2' }
}).then(
() => // a list of the messages of that succeeded,
failed => // a list of failed messages and the errors `{ err, message }`
)
```
127 changes: 127 additions & 0 deletions spec/integration/bulkPublish.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
require('../setup');
const rabbit = require('../../src/index.js');
const config = require('./configuration');

describe('Bulk Publish', function () {
var harness;

before(function (done) {
this.timeout(10000);
rabbit.configure({
connection: config.connection,
exchanges: [
{
name: 'rabbot-ex.direct-1',
type: 'direct',
autoDelete: true
},
{
name: 'rabbot-ex.direct-2',
type: 'direct',
autoDelete: true
},
{
name: 'rabbot-ex.direct-3',
type: 'direct',
autoDelete: true
}
],
queues: [
{
name: 'rabbot-q.1',
autoDelete: true,
subscribe: true
},
{
name: 'rabbot-q.2',
autoDelete: true,
subscribe: true
},
{
name: 'rabbot-q.3',
autoDelete: true,
subscribe: true
}
],
bindings: [
{
exchange: 'rabbot-ex.direct-1',
target: 'rabbot-q.1',
keys: ''
},
{
exchange: 'rabbot-ex.direct-2',
target: 'rabbot-q.2',
keys: ''
},
{
exchange: 'rabbot-ex.direct-3',
target: 'rabbot-q.3',
keys: ''
}
]
});
harness = harnessFactory(rabbit, done, 18);
harness.handle('bulk');
rabbit.bulkPublish({
'rabbot-ex.direct-1': [
{ type: 'bulk', routingKey: '', body: 1 },
{ type: 'bulk', routingKey: '', body: 2 },
{ type: 'bulk', routingKey: '', body: 3 }
],
'rabbot-ex.direct-2': [
{ type: 'bulk', routingKey: '', body: 4 },
{ type: 'bulk', routingKey: '', body: 5 },
{ type: 'bulk', routingKey: '', body: 6 }
],
'rabbot-ex.direct-3': [
{ type: 'bulk', routingKey: '', body: 7 },
{ type: 'bulk', routingKey: '', body: 8 },
{ type: 'bulk', routingKey: '', body: 9 }
]
});

rabbit.bulkPublish([
{ type: 'bulk', routingKey: '', body: 10, exchange: 'rabbot-ex.direct-1' },
{ type: 'bulk', routingKey: '', body: 11, exchange: 'rabbot-ex.direct-1' },
{ type: 'bulk', routingKey: '', body: 12, exchange: 'rabbot-ex.direct-2' },
{ type: 'bulk', routingKey: '', body: 13, exchange: 'rabbot-ex.direct-2' },
{ type: 'bulk', routingKey: '', body: 14, exchange: 'rabbot-ex.direct-2' },
{ type: 'bulk', routingKey: '', body: 15, exchange: 'rabbot-ex.direct-2' },
{ type: 'bulk', routingKey: '', body: 16, exchange: 'rabbot-ex.direct-3' },
{ type: 'bulk', routingKey: '', body: 17, exchange: 'rabbot-ex.direct-3' },
{ type: 'bulk', routingKey: '', body: 18, exchange: 'rabbot-ex.direct-3' }
]);
});

it('should bulk publish all messages successfully', function () {
const results = harness.received.map((m) => (
parseInt(m.body)
));
results.sort((a, b) => a - b).should.eql(
[
1,
2,
3,
4,
5,
6,
7,
8,
9,
10,
11,
12,
13,
14,
15,
16,
17,
18
]);
});

after(function () {
return harness.clean('default');
});
});
18 changes: 9 additions & 9 deletions spec/integration/directReplyQueue.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ describe(`Direct Reply Queue (replyQueue: 'rabbit')`, function () {
}).then(() => {
messagesToSend = 3;
harness.handle('no.replyQueue', (req) => {
req.reply({ reply: req.body.message })
req.reply({ reply: req.body.message });
});
for (var i = 0; i < messagesToSend; i++) {
rabbit.request('noreply-ex.direct', {
Expand All @@ -44,15 +44,15 @@ describe(`Direct Reply Queue (replyQueue: 'rabbit')`, function () {
body: { message: i },
routingKey: ''
})
.then(
r => {
replies.push(r.body.reply);
r.ack();
if (replies.length >= messagesToSend) {
done();
.then(
r => {
replies.push(r.body.reply);
r.ack();
if (replies.length >= messagesToSend) {
done();
}
}
}
);
);
}
});
});
Expand Down
110 changes: 102 additions & 8 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
const _ = require('lodash');
const Monologue = require('monologue.js');
const connectionFn = require('./connectionFsm.js');
const topologyFn = require('./topology.js');
Expand Down Expand Up @@ -69,7 +68,6 @@ var Broker = function () {
this.configurations = {};
this.configuring = {};
this.log = log;
_.bindAll(this);
};

Broker.prototype.addConnection = function (opts) {
Expand Down Expand Up @@ -132,7 +130,7 @@ Broker.prototype.addConnection = function (opts) {
};

Broker.prototype.addExchange = function (name, type, options = {}, connectionName = DEFAULT) {
if (_.isObject(name)) {
if (typeof name === 'object') {
options = name;
options.connectionName = options.connectionName || type || connectionName;
} else {
Expand Down Expand Up @@ -170,6 +168,66 @@ Broker.prototype.bindQueue = function (source, target, keys, connectionName = DE
);
};

Broker.prototype.bulkPublish = function (set, connectionName = DEFAULT) {
if (set.connectionName) {
connectionName = set.connectionName;
}
if (!this.connections[ connectionName ]) {
return Promise.reject(new Error(`BulkPublish failed - no connection ${connectionName} has been configured`));
}

const publish = (exchange, options) => {
options.appId = options.appId || this.appId;
options.timestamp = options.timestamp || Date.now();
if (this.connections[ connectionName ] && this.connections[ connectionName ].options.publishTimeout) {
options.connectionPublishTimeout = this.connections[ connectionName ].options.publishTimeout;
}
if (typeof options.body === 'number') {
options.body = options.body.toString();
}
return exchange.publish(options)
.then(
() => options,
err => { return { err, message: options }; }
);
};

let exchangeNames = Array.isArray(set)
? set.reduce((acc, m) => {
if (acc.indexOf(m.exchange) < 0) {
acc.push(m.exchange);
}
return acc;
}, [])
: Object.keys(set);

return this.onExchanges(exchangeNames, connectionName)
.then(exchanges => {
if (!Array.isArray(set)) {
const keys = Object.keys(set);
return Promise.all(keys.map(exchangeName => {
return Promise.all(set[exchangeName].map(message => {
const exchange = exchanges[exchangeName];
if (exchange) {
return publish(exchange, message);
} else {
return Promise.reject(new Error(`Publish failed - no exchange ${exchangeName} on connection ${connectionName} is defined`));
}
}));
}));
} else {
return Promise.all(set.map(message => {
const exchange = exchanges[message.exchange];
if (exchange) {
return publish(exchange, message);
} else {
return Promise.reject(new Error(`Publish failed - no exchange ${message.exchange} on connection ${connectionName} is defined`));
}
}));
}
});
};

Broker.prototype.clearAckInterval = function () {
clearInterval(this.ackIntervalId);
};
Expand Down Expand Up @@ -215,7 +273,7 @@ Broker.prototype.getQueue = function (name, connectionName = DEFAULT) {
Broker.prototype.handle = function (messageType, handler, queueName, context) {
this.hasHandles = true;
var options;
if (_.isString(messageType)) {
if (typeof messageType === 'string') {
options = {
type: messageType,
queue: queueName || '*',
Expand Down Expand Up @@ -285,16 +343,52 @@ Broker.prototype.onExchange = function (exchangeName, connectionName = DEFAULT)
);
};

Broker.prototype.onExchanges = function (exchanges, connectionName = DEFAULT) {
const connectionPromises = [this.connections[ connectionName ].promise];
if (this.configuring[ connectionName ]) {
connectionPromises.push(this.configuring[ connectionName ]);
}
const set = {};
return Promise.all(connectionPromises)
.then(
() => {
const exchangePromises = exchanges.map(exchangeName =>
this.connections[ connectionName ].promises[`exchange:${exchangeName}`]
.then(() => {
return { name: exchangeName, exchange: true };
})
);
return Promise.all(exchangePromises);
}
).then(
list => {
list.map(item => {
if (item && item.exchange) {
const exchange = this.getExchange(item.name, connectionName);
set[item.name] = exchange;
}
});
return set;
}
);
};

Broker.prototype.onReturned = function (handler) {
returnedStrategies.onReturned = returnedStrategies.customOnReturned = handler;
};

Broker.prototype.publish = function (exchangeName, type, message, routingKey, correlationId, connectionName, sequenceNo) {
const timestamp = Date.now();
let options;
if (_.isObject(type)) {
if (typeof type === 'object') {
options = type;
connectionName = message || options.connectionName || DEFAULT;
connectionName = message || DEFAULT;
options = Object.assign({
appId: this.appId,
timestamp: timestamp,
connectionName: connectionName
}, options);
connectionName = options.connectionName;
} else {
connectionName = connectionName || message.connectionName || DEFAULT;
options = {
Expand All @@ -315,7 +409,7 @@ Broker.prototype.publish = function (exchangeName, type, message, routingKey, co
if (this.connections[ connectionName ] && this.connections[ connectionName ].options.publishTimeout) {
options.connectionPublishTimeout = this.connections[ connectionName ].options.publishTimeout;
}
if (_.isNumber(options.body)) {
if (typeof options.body === 'number') {
options.body = options.body.toString();
}

Expand Down Expand Up @@ -407,7 +501,7 @@ Broker.prototype.startSubscription = function (queueName, exclusive = false, con
if (!this.hasHandles) {
console.warn("Subscription to '" + queueName + "' was started without any handlers. This will result in lost messages!");
}
if (_.isString(exclusive)) {
if (typeof exclusive === 'string') {
connectionName = exclusive;
exclusive = false;
}
Expand Down

0 comments on commit d2df5ea

Please sign in to comment.