Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add throttling (rate-limiting) of requests. #5

Merged
merged 7 commits into from
Nov 18, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions lib/circular-buffer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
exports.create = function(size) {
var items = [];
var current = 0;

return {
/**
* Inserts an item into the circular buffer. The new item will have index 0,
* and all other items will have their index incremented.
*/
insert: function(item) {
current = (current + 1) % size;
items[current] = item;
},
/**
* Returns the i-th item from the buffer. i=0 is the most-recently-inserted
* item. i=1 is the second-most-recently-inserted item. Returns undefined if
* i+1 items have not yet been inserted.
*/
item: function(i) {
return items[(current - i + size) % size];
}
};
};
20 changes: 17 additions & 3 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,19 @@ exports.init = (key, options) => {
var Promise = options.Promise;
var mySetTimeout = options.setTimeout || setTimeout;
var getTime = options.getTime || () => new Date().getTime();
var rate = options.rate || {};
var rateLimit = rate.limit || 10; // 10 requests per ratePeriod.
var ratePeriod = rate.period || 1000; // 1 second.

var url = require('url');
var attempt = require('./attempt')
.inject(mySetTimeout, getTime)
.attempt;

var ThrottledQueue = require('./throttled-queue')
.inject(mySetTimeout, getTime);
var requestQueue = ThrottledQueue.create(rateLimit, ratePeriod);

/**
* Makes an API request using the injected makeUrlRequest.
*
Expand Down Expand Up @@ -80,9 +87,16 @@ exports.init = (key, options) => {
});

var handle = attempt({
'do': (callback) => {
makeUrlRequest(requestUrl, callback);
},
'do': callback => (
requestQueue.add(() => {
makeUrlRequest(requestUrl, callback);
}, (err, result) => {
if (err != null) {
callback(err, null);
}
// Ignore the result of makeUrlRequest().
})
),
until: (response) => (
response
&& response.status !== 500
Expand Down
87 changes: 87 additions & 0 deletions lib/throttled-queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
var CircularBuffer = require('./circular-buffer');

exports.inject = (setTimeout, getTime) => ({
/**
* Creates a ThrottledQueue. The queue stores actions, which will be executed
* asynchronously, at a controlled rate.
*
* @param {number} limit The maximum number of actions that can be executed
* over one period.
* @param {number} period The time period (ms) over which limit is
* enforceable.
* @return {ThrottledQueue}
*/
create: function(limit, period) {
var queue = [];
var recentTimes = CircularBuffer.create(limit);
var scheduled = false;

function schedule() {
if (scheduled) return;

var lastTime = recentTimes.item(limit - 1);
var delay = lastTime + period - getTime();
delay = (lastTime != undefined && delay > 0) ? delay : 0;

scheduled = true;
setTimeout(() => {
scheduled = false;

while (true) {
var action = queue.shift();
if (!action) return;
if (!action.done) break;
}

recentTimes.insert(getTime());
action.done = true;
action();

if (queue.length) {
schedule();
}
}, delay);
}

// Return a ThrottledQueue object.
return {
/**
* Adds an action to the work queue. The action will be executed as soon
* as the rate limit allows.
*
* @param {Function} doSomething This function will be called with no
* arguments. It is always executed asynchronously. Its return value,
* or any exception it throws, will be given to the callback.
* @param {Function} callback A function that takes (err, result) as
* arguments. This is called after doSomething is executed. EITHER:
* err is null, and result is doSomething's return value; OR result is
* null, and err is the exception thrown by doSomething.
* @return {Object} Returns a handle with a .cancel() method, which can be
* used to cancel the action. If .cancel() is called, then callback
* will be called with err = new Error('cancelled').
*/
add: function(doSomething, callback) {
function action() {
try {
var result = doSomething();
} catch (err) {
return callback(err, null);
}
return callback(null, result);
}

function cancel() {
if (!action.done) {
action.done = true;
process.nextTick(() => callback(new Error('cancelled'), null));
}
}

queue.push(action);
schedule();

return {cancel: cancel};
}
};
}
});
75 changes: 75 additions & 0 deletions spec/circular-buffer-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
var CircularBuffer = require('../lib/circular-buffer');

describe('CircularBuffer,', function() {
var SIZE = 10;
var list;
beforeEach(function() {
list = CircularBuffer.create(SIZE);
});

describe('when empty,', function() {
it('.item() returns undefined', function() {
expect(list.item(0)).toBe(undefined);
expect(list.item(1)).toBe(undefined);
expect(list.item(SIZE - 1)).toBe(undefined);
});
});

describe('when one item has been inserted,', function() {
beforeEach(function() {
list.insert('item 0');
});
it('.item(0) returns the recently-inserted item', function() {
expect(list.item(0)).toBe('item 0');
});
it('.item(i) returns undefined for i > 0', function() {
expect(list.item(1)).toBe(undefined);
expect(list.item(SIZE - 1)).toBe(undefined);
});
});

describe('when SIZE - 1 items have been inserted,', function() {
beforeEach(function() {
for (var i = 0; i < SIZE - 1; ++i) {
list.insert('item ' + i);
}
});
it('.item(i) returns items in reverse order', function() {
expect(list.item(0)).toBe('item ' + (SIZE - 2));
expect(list.item(SIZE - 2)).toBe('item 0');
});
it('.item(SIZE - 1) returns undefined', function() {
expect(list.item(SIZE - 1)).toBe(undefined);
});
});

describe('when SIZE items have been inserted,', function() {
beforeEach(function() {
for (var i = 0; i < SIZE; ++i) {
list.insert('item ' + i);
}
});
it('.item(i) returns items in reverse order', function() {
expect(list.item(0)).toBe('item ' + (SIZE - 1));
expect(list.item(SIZE - 1)).toBe('item 0');
});
});

describe('when SIZE + 1 items have been inserted,', function() {
beforeEach(function() {
for (var i = 0; i < SIZE + 1; ++i) {
list.insert('item ' + i);
}
});
it('.item(i) returns items in reverse order', function() {
expect(list.item(0)).toBe('item ' + SIZE);
expect(list.item(SIZE - 1)).toBe('item 1');
});

it('"item 0" is no longer in the buffer', function() {
for (var i = 0; i < SIZE; ++i) {
expect(list.item(i)).not.toBe('item 0');
}
});
});
});
115 changes: 107 additions & 8 deletions spec/index-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,113 @@ describe('index.js:', () => {
});
});

it('cancels when .cancel() is called immediately', done => {
init(apiKey, {makeUrlRequest: requestAndSucceed})
.geocode({address: 'Sydney Opera House'}, (err, response) => {
expect(err).toMatch(/cancelled/);
expect(requestAndSucceed).not.toHaveBeenCalled();
done();
})
.cancel();
describe('throttling', () => {
it('spaces out requests made too close', done => {
theTime = 0;
var googleMaps = init(apiKey, {
makeUrlRequest: requestAndSucceed,
rate: {limit: 3, period: 1000},
setTimeout: fakeSetTimeout,
getTime: () => theTime
});

googleMaps.geocode({address: 'Sydney Opera House'}, () => {});
googleMaps.geocode({address: 'Sydney Opera House'}, () => {});
googleMaps.geocode({address: 'Sydney Opera House'}, () => {});
googleMaps.geocode({address: 'Sydney Opera House'}, () => {
expect(requestTimes).toEqual([0, 0, 0, 1000]);
done();
});
});

it('sends requests ASAP when not bunched up', done => {
theTime = 0;
var googleMaps = init(apiKey, {
makeUrlRequest: requestAndSucceed,
rate: {period: 1000},
setTimeout: fakeSetTimeout,
getTime: () => theTime
});

googleMaps.geocode({address: 'Sydney Opera House'}, (err, response) => {
expect(err).toBe(null);

theTime = 1000;
googleMaps.geocode({address: 'Sydney Opera House'}, (err, response) => {
expect(err).toBe(null);
expect(requestTimes).toEqual([0, 1000]);
done();
});
});
});
});

describe('.cancel()', () => {
it('cancels when called immediately', done => {
init(apiKey, {makeUrlRequest: requestAndSucceed})
.geocode({address: 'Sydney Opera House'}, (err, response) => {
expect(err).toMatch(/cancelled/);
expect(requestAndSucceed).not.toHaveBeenCalled();
done();
})
.cancel();
});

it('cancels throttled requests', done => {
var googleMaps = init(apiKey, {
makeUrlRequest: requestAndSucceed,
rate: {limit: 1}
});

googleMaps.geocode({address: 'Sydney Opera House'}, (err, response) => {
expect(err).toBe(null);
expect(requestAndSucceed).toHaveBeenCalled();
// At this point, the second request should already have been enqueued,
// due to throttling.
handle.cancel();
});

var handle = googleMaps.geocode(
{address: 'Sydney Opera House'},
(err, response) => {
expect(err).toMatch(/cancelled/);
expect(requestAndSucceed.calls.count()).toBe(1);
done();
}
);
});

it('cancels requests waiting to be retried', done => {
var handle = init(apiKey, {makeUrlRequest: requestAndFail})
.geocode({address: 'Sydney Opera House'}, (err, response) => {
expect(err).toMatch(/cancelled/);
expect(requestAndFail).toHaveBeenCalled();
done();
});

requestAndFail.and.callFake((url, callback) => {
callback(null, {status: 500});
// After the first failure, schedule a cancel.
setImmediate(() => {
handle.cancel();
});
});
});

it('doesn\'t cancel in-flight requests', done => {
var handle =
init(apiKey, {makeUrlRequest: (url, callback) => {
setTimeout(() => {
requestAndSucceed(url, callback);
}, 10);
// By this stage, the request is in-flight, and cannot be cancelled.
handle.cancel();
}})
.geocode({address: 'Sydney Opera House'}, (err, response) => {
expect(err).toBe(null);
done();
});
});
});

describe('using .asPromise()', () => {
Expand Down
Loading