diff --git a/index.js b/index.js index 447313671..1b49a0dc9 100644 --- a/index.js +++ b/index.js @@ -1,2 +1,3 @@ exports = module.exports = require('./lib/sendgrid'); exports.mail = require('./lib/helpers/mail/mail.js'); +exports.importer = require('./lib/helpers/contact-importer/contact-importer.js'); diff --git a/lib/helpers/contact-importer/contact-importer.js b/lib/helpers/contact-importer/contact-importer.js new file mode 100644 index 000000000..eb5778b78 --- /dev/null +++ b/lib/helpers/contact-importer/contact-importer.js @@ -0,0 +1,146 @@ +/* eslint dot-notation: 'off' */ +'use strict'; + +var Bottleneck = require('bottleneck'); +var EventEmitter = require('events').EventEmitter; +var chunk = require('lodash.chunk'); +var debug = require('debug')('sendgrid'); +var util = require('util'); +var queue = require('async.queue'); +var ensureAsync = require('async.ensureasync'); + +var ContactImporter = module.exports = function(sg, options) { + options = options || {}; + var self = this; + this.sg = sg; + this.pendingItems = []; + + // Number of items to send per batch. + this.batchSize = options.batchSize || 1500; + + // Max number of requests per rate limit period. + this.rateLimitLimit = options.rateLimitLimit || 3; + + // Length of rate limit period (miliseconds). + this.rateLimitPeriod = options.rateLimitPeriod || 2000; + + // Create a throttler that will process no more than `rateLimitLimit` requests every `rateLimitPeriod` ms. + this.throttle = new Bottleneck(0, 0); + this.throttle.changeReservoir(this.rateLimitLimit); + + // Create a queue that wil be used to send batches to the throttler. + this.queue = queue(ensureAsync(this._worker)); + + // When the last batch is removed from the queue, add any incomplete batches. + this.queue.empty = function() { + if (self.pendingItems.length) { + debug('adding %s items from deferrd queue for processing', self.pendingItems.length); + var batch = self.pendingItems.splice(0); + self.queue.push({ + data: batch, + owner: self, + }, function(error, result) { + if (error) { + return self._notify(error, JSON.parse(error.response.body), batch); + } + return self._notify(null, JSON.parse(result.body), batch); + }); + } + }; + + // Emit an event when the queue is drained. + this.queue.drain = function() { + self.emit('drain'); + }; +}; +util.inherits(ContactImporter, EventEmitter); + +/** + * Add a new contact, or an array of contact, to the end of the queue. + * + * @param {Array|Object} data A contact or array of contacts. + */ +ContactImporter.prototype.push = function(data) { + var self = this; + data = Array.isArray(data) ? data : [data]; + + // Add the new items onto the pending items. + var itemsToProcess = this.pendingItems.concat(data); + + // Chunk the pending items into batches and add onto the queue + var batches = chunk(itemsToProcess, this.batchSize); + debug('generated batches %s from %s items', batches.length, data.length); + + batches.forEach(function(batch) { + // If this batch is full or the queue is empty queue it for processing. + if (batch.length === self.batchSize || !self.queue.length()) { + self.queue.push({ + data: batch, + owner: self, + }, function(error, result) { + if (error) { + return self._notify(error, JSON.parse(error.response.body), batch); + } + return self._notify(null, JSON.parse(result.body), batch); + }); + } + // Otherwise, it store it for later. + else { + debug('the last batch with only %s item is deferred (partial batch)', batch.length); + self.pendingItems = batch; + } + }); + + debug('batches in queue: %s', this.queue.length()); + debug('items in deferred queue: %s', this.pendingItems.length); +}; + +/** + * Send a batch of contacts to Sendgrid. + * + * @param {Object} task Task to be processed (data in 'data' property) + * @param {Function} callback Callback function. + */ +ContactImporter.prototype._worker = function(task, callback) { + var context = task.owner; + debug('processing batch (%s items)', task.data.length); + context.throttle.submit(context._sendBatch, context, task.data, callback); +}; + +ContactImporter.prototype._sendBatch = function(context, data, callback) { + debug('sending batch (%s items)', data.length); + + var request = context.sg.emptyRequest(); + request.method = 'POST'; + request.path = '/v3/contactdb/recipients'; + request.body = data; + + context.sg.API(request) + .then(function(response) { + debug('got response: %o', response); + setTimeout(function() { + context.throttle.incrementReservoir(1); + }, context.rateLimitPeriod); + return callback(null, response); + }) + .catch(function(error) { + debug('got error, %o', error); + setTimeout(function() { + context.throttle.incrementReservoir(1); + }, context.rateLimitPeriod); + return callback(error); + }); +}; + +/** + * Emit the result of processing a batch. + * + * @param {Object} error + * @param {Object} result + */ +ContactImporter.prototype._notify = function(error, result, batch) { + if (error) { + return this.emit('error', error, batch); + } + return this.emit('success', result, batch); +}; diff --git a/lib/sendgrid.js b/lib/sendgrid.js index eb58f84df..6681d284f 100644 --- a/lib/sendgrid.js +++ b/lib/sendgrid.js @@ -64,55 +64,60 @@ function makeHeaders(apiKey, globalHeaders) { * SendGrid allows for quick and easy access to the v3 Web API */ function SendGrid(apiKey, host, globalHeaders) { + return new SendGridInstance(apiKey, host, globalHeaders); +} +/** + * SendGrid allows for quick and easy access to the v3 Web API + */ +function SendGridInstance(apiKey, host, globalHeaders) { //Create global request - var globalRequest = getEmptyRequest({ + this.globalRequest = getEmptyRequest({ host: host || 'api.sendgrid.com', headers: makeHeaders(apiKey, globalHeaders), }); //Initialize new client - var client = new Client(globalRequest); + this.client = new Client(this.globalRequest); +} - //Interact with the API with this function - SendGrid.API = function(request, callback) { +//Interact with the API with this function +SendGridInstance.prototype.API = function(request, callback) { + var self = this; - //If no callback provided, we will return a promise - if (!callback) { - if (!SendGrid.Promise) { - throw new SendGridError('Promise API not supported'); - } - return new SendGrid.Promise(function(resolve, reject) { - client.API(request, function(response) { - if (isValidResponse(response)) { - resolve(response); - } - else { - var error = new SendGridError('Response error'); - error.response = response; - reject(error); - } - }); - }); + //If no callback provided, we will return a promise + if (!callback) { + if (!SendGrid.Promise) { + throw new SendGridError('Promise API not supported'); } - - //Use callback - client.API(request, function(response) { - if (isValidResponse(response)) { - callback(null, response); - } - else { - var error = new SendGridError('Response error'); - callback(error, response); - } + return new SendGrid.Promise(function(resolve, reject) { + self.client.API(request, function(response) { + if (isValidResponse(response)) { + resolve(response); + } + else { + var error = new SendGridError('Response error'); + error.response = response; + reject(error); + } + }); }); - }; + } - //Set requests - SendGrid.emptyRequest = getEmptyRequest; - SendGrid.globalRequest = globalRequest; - return SendGrid; -} + //Use callback + self.client.API(request, function(response) { + if (isValidResponse(response)) { + callback(null, response); + } + else { + var error = new SendGridError('Response error'); + callback(error, response); + } + }); +}; + +//Set requests +SendGridInstance.prototype.emptyRequest = getEmptyRequest; //Try to use native promises by default if (typeof Promise !== 'undefined') { diff --git a/package.json b/package.json index f51a88e65..b8a864e44 100644 --- a/package.json +++ b/package.json @@ -22,12 +22,20 @@ "node": ">= 0.4.7" }, "dependencies": { + "async.ensureasync": "^0.5.2", + "async.queue": "^0.5.2", + "bottleneck": "^1.12.0", + "lodash.chunk": "^4.2.0", "sendgrid-rest": "^2.2.1" }, "devDependencies": { "chai": "^3.5.0", + "debug": "^2.2.0", "eslint": "^3.1.0", - "mocha": "^2.4.5" + "mocha": "^2.4.5", + "mocha-sinon": "^1.1.5", + "sinon": "^1.17.5", + "sinon-chai": "^2.8.0" }, "scripts": { "lint": "eslint . --fix", diff --git a/test/helpers/contact-importer/contact-importer.test.js b/test/helpers/contact-importer/contact-importer.test.js new file mode 100644 index 000000000..eecbb157a --- /dev/null +++ b/test/helpers/contact-importer/contact-importer.test.js @@ -0,0 +1,59 @@ +var ContactImporter = require('../../../lib/helpers/contact-importer/contact-importer.js') +var sendgrid = require('../../../') + +var chai = require('chai') +var sinon = require('sinon') + +chai.should() +var expect = chai.expect +chai.use(require('sinon-chai')) + +require('mocha-sinon') + +describe.only('test_contact_importer', function() { + beforeEach(function() { + // Create a new SendGrid instance. + var API_KEY = process.env.API_KEY + var sg = sendgrid(API_KEY) + + // Create a new importer with a batch size of 2. + this.contactImporter = new ContactImporter(sg, { + batchSize: 2, + }) + // this.spy = sinon.spy(ContactImporter.prototype, '_sendBatch') + this.sinon.spy(ContactImporter.prototype, '_sendBatch') + + // Generate some test data. + var data = [] + for (i = 0; i < 5; i++) { + var item = { + email: 'example' + i + '@example.com', + first_name: 'Test', + last_name: 'User' + } + // Lets make the first user produce an error. + if (i === 1) { + item.invalid_field= 'some value' + } + data.push(item) + } + this.contactImporter.push(data) + }) + + it('test_contact_importer sends items in batches', function(done) { + var self = this + this.timeout(30000) + this.contactImporter.on('success', function(result, batch) { + console.log('SUCCESS result', result) + console.log('SUCCESS batch', batch) + }) + this.contactImporter.on('error', function(error, batch) { + console.log('SUCCESS error', error) + console.log('SUCCESS batch', batch) + }) + this.contactImporter.on('drain', function() { + expect(self.contactImporter._sendBatch).to.have.callCount(3) + done() + }) + }) +})