From 0f2e4d5309bc0612e4a0266f3c2b02a348952440 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Szak=C3=A1llas?= Date: Tue, 16 May 2017 16:35:03 +0200 Subject: [PATCH] feat: add exponential backoff to API client The API client now retries retrieving the serviceKey via the initialization request in an exponential backoff loop. The exponential algorithm utilizes an error factor to spread out the retrials, so when multiple clients are started at the same time they are unlikely to issue any request after the initial one at the same time. --- lib/agent/api/httpError.js | 14 +++ lib/agent/api/httpRetry.js | 51 +++++++++++ lib/agent/api/httpRetry.spec.js | 68 +++++++++++++++ lib/agent/api/index.js | 67 +++++++-------- lib/agent/api/index.spec.js | 14 +-- lib/agent/index.js | 9 +- lib/utils/compositeError.js | 19 ++++ lib/utils/exponentialRetry.js | 56 ++++++++++++ lib/utils/exponentialRetry.spec.js | 134 +++++++++++++++++++++++++++++ package.json | 3 +- test/e2e/apiCalls.spec.js | 3 +- test/e2e/initialization.spec.js | 36 ++++++++ test/e2e/utils/serviceMocks.js | 1 + 13 files changed, 423 insertions(+), 52 deletions(-) create mode 100644 lib/agent/api/httpError.js create mode 100644 lib/agent/api/httpRetry.js create mode 100644 lib/agent/api/httpRetry.spec.js create mode 100644 lib/utils/compositeError.js create mode 100644 lib/utils/exponentialRetry.js create mode 100644 lib/utils/exponentialRetry.spec.js diff --git a/lib/agent/api/httpError.js b/lib/agent/api/httpError.js new file mode 100644 index 0000000..46c25c0 --- /dev/null +++ b/lib/agent/api/httpError.js @@ -0,0 +1,14 @@ +'use strict' + +var http = require('http') +var inherits = require('util').inherits + +function HttpError (statusCode, response) { + Error.captureStackTrace && Error.captureStackTrace(this, this.constructor) + this.message = String(statusCode) + ' - ' + http.STATUS_CODES[statusCode] + this.statusCode = statusCode + this.response = response +} +inherits(HttpError, Error) + +module.exports = HttpError diff --git a/lib/agent/api/httpRetry.js b/lib/agent/api/httpRetry.js new file mode 100644 index 0000000..77c4078 --- /dev/null +++ b/lib/agent/api/httpRetry.js @@ -0,0 +1,51 @@ +'use strict' + +var HttpError = require('./httpError') +var exponentialRetry = require('../../utils/exponentialRetry') + +var DEFAULT_MAX_RETRIES = Infinity +var DEFAULT_MAX_WAIT = 10 * 60 * 1000 +var DEFAULT_EXP_SCALE = 0.828 +var DEFAULT_LIN_SCALE = 150 +var DEFAULT_ERR_SCALE = 0.24 // +-12% error + +function httpRetry (opts, cb) { + opts = opts || {} + var client = opts.client + var payload = opts.payload + var reqOpts = opts.reqOpts + var errorFilter = opts.errorFilter + var maxRetries = opts.maxRetries != null ? opts.maxRetries : DEFAULT_MAX_RETRIES + var maxWait = opts.maxWait != null ? opts.maxWait : DEFAULT_MAX_WAIT + + function httpRequest (cb) { + var completed = false + var req = client.request(reqOpts, function (response) { + completed = true + if (response.statusCode >= 400) { + return cb(new HttpError(response.statusCode), response) + } + return cb(null, response) + }) + req.on('error', function (err) { + if (!completed) { + completed = true + return cb(err) + } + }) + if (payload) { + req.write(payload) + } + req.end() + } + return exponentialRetry({ + maxRetries: maxRetries, + maxWait: maxWait, + expScale: DEFAULT_EXP_SCALE, + linScale: DEFAULT_LIN_SCALE, + errScale: DEFAULT_ERR_SCALE, + errorFilter: errorFilter + }, httpRequest, cb) +} + +module.exports = httpRetry diff --git a/lib/agent/api/httpRetry.spec.js b/lib/agent/api/httpRetry.spec.js new file mode 100644 index 0000000..cf3f3ac --- /dev/null +++ b/lib/agent/api/httpRetry.spec.js @@ -0,0 +1,68 @@ +'use strict' + +var http = require('http') +var HttpError = require('./httpError') +var httpRetry = require('./httpRetry') +var expect = require('chai').expect +var nock = require('nock') +var bl = require('bl') + +describe('httpRetry', function (done) { + it('retries', function (done) { + nock('http://something.com') + .post('/', 'data') + .reply(500) + nock('http://something.com') + .post('/', 'data') + .reply(200, 'response') + + this.sandbox.stub(global, 'setTimeout').callsFake(function (cb, int) { + return process.nextTick(cb) + }) + + httpRetry({ + client: http, + maxRetries: 1, + reqOpts: { + hostname: 'something.com', + method: 'POST', + path: '/' + }, + payload: 'data' + }, function (err, data) { + expect(err).to.not.exist + data.pipe(bl(function (err, data) { + expect(err).not.to.exist + expect(data.toString()).to.eql('response') + done() + })) + }) + }) + it('returns error', function (done) { + nock('http://something.com') + .post('/', 'data') + .reply(500, 'bad') + + this.sandbox.stub(global, 'setTimeout').callsFake(function (cb, int) { + return process.nextTick(cb) + }) + + httpRetry({ + client: http, + maxRetries: 0, + reqOpts: { + hostname: 'something.com', + method: 'POST', + path: '/' + }, + payload: 'data' + }, function (err, data) { + expect(err).to.be.instanceof(HttpError) + data.pipe(bl(function (err, data) { + expect(err).to.not.exist + expect(data.toString()).to.eql('bad') + done() + })) + }) + }) +}) diff --git a/lib/agent/api/index.js b/lib/agent/api/index.js index fa05e7d..4941408 100644 --- a/lib/agent/api/index.js +++ b/lib/agent/api/index.js @@ -5,11 +5,12 @@ var util = require('util') var requestSync = require('sync-request') var isNumber = require('lodash.isnumber') var debug = require('../../utils/debug')('api') -var format = require('util').format var assign = require('lodash.assign') var HttpsProxyAgent = require('https-proxy-agent') var stringify = require('json-stringify-safe') var BufferStream = require('./bufferStream') +var httpRetry = require('./httpRetry') +var CompositeError = require('../../utils/compositeError') var bl = require('bl') var libPackage = require('../../../package') @@ -34,6 +35,7 @@ function CollectorApi (options) { this.serviceName = options.serviceName this.baseRetryInterval = 1000 * 60 * 30 // 30 minutes this.serviceKey = null + this.getServiceMaxRetries = Infinity if (options.proxy) { this.proxyAgent = new HttpsProxyAgent(options.proxy) @@ -270,7 +272,8 @@ CollectorApi.prototype.getService = function (cb) { cpus: self.system.cpus } }) - var req = https.request({ + + var reqOpts = { hostname: opts.hostname, port: opts.port, path: opts.path, @@ -283,51 +286,41 @@ CollectorApi.prototype.getService = function (cb) { 'X-Reporter-Language': this.collectorLanguage, 'Content-Length': Buffer.byteLength(payload) } - }, function (res) { - res.setEncoding('utf8') - res.pipe(bl(function (err, resBuffer) { - var response - - var retryInterval = self.baseRetryInterval + } - if (err) { - debug.error('getService', err) - return setTimeout(function () { - debug.warn('getService', format('Retrying with %d ms', retryInterval)) - self.getService() - }, retryInterval) + httpRetry({ + client: https, + reqOpts: reqOpts, + payload: payload, + errorFilter: function shouldContinueRetrying (err) { + if (err.statusCode === 401) { + return false } - - var resText = resBuffer.toString('utf8') - - if (res.statusCode === 401) { - debug.error('getService', 'Api key rejected') - return + return true + }, + maxRetries: this.getServiceMaxRetries + }, + function done (err, result) { + if (err) { + if (err.statusCode === 401) { + debug.error('getService', 'API key rejected') } - if (res.statusCode > 399) { - debug.error('getService', 'Service responded with ' + res.statusCode) - return setTimeout(function () { - debug.warn('getService', format('Retrying with %d ms', retryInterval)) - self.getService(cb) - }, retryInterval) + return cb(new CompositeError('Could not get service key', err)) + } + var response + result.pipe(bl(function (err, data) { + if (err) { + cb(err) } - try { - response = JSON.parse(resText) - } catch (ex) { - return + response = JSON.parse(data.toString('utf8')) + } catch (err) { + return cb(err) } - self.serviceKey = response.key cb(null, response.key) })) }) - - req.on('error', function (error) { - debug.error('getService', error) - }) - req.write(payload) - req.end() } function logServiceKeyError (method) { diff --git a/lib/agent/api/index.spec.js b/lib/agent/api/index.spec.js index 5f469f9..7672567 100644 --- a/lib/agent/api/index.spec.js +++ b/lib/agent/api/index.spec.js @@ -275,16 +275,18 @@ describe('The Trace CollectorApi module', function () { } }) .post(defaultConfig.collectorApiServiceEndpoint, JSON.stringify(data)) - .times(2) + .times(100) .reply(409, {}) - collectorApi.getService() - - collectorApi.baseRetryInterval = 1 + collectorApi.getServiceMaxRetries = 100 - this.timeout(500) + global.setTimeout = this.sandbox.stub(global, 'setTimeout').callsFake(function (cb, int) { + return process.nextTick(cb) + }) - this.sandbox.stub(collectorApi, 'getService').callsFake(function () { + collectorApi.getService(function (err) { + expect(setTimeout).to.have.callCount(100) + expect(err).to.exist done() }) }) diff --git a/lib/agent/index.js b/lib/agent/index.js index a5b5120..a7923e3 100644 --- a/lib/agent/index.js +++ b/lib/agent/index.js @@ -116,17 +116,12 @@ Agent.prototype.stop = function (callback) { debug.info('stop', 'Stopping agents...') var agents = this.agents var counter = 1 - var error agents.forEach(function (agent) { - agent.stop(function (err) { - if (!error && err) { - error = err - } - + agent.stop(function () { if (counter >= agents.length) { if (callback && typeof callback === 'function') { - callback(error) + callback() } } else { counter++ diff --git a/lib/utils/compositeError.js b/lib/utils/compositeError.js new file mode 100644 index 0000000..827d139 --- /dev/null +++ b/lib/utils/compositeError.js @@ -0,0 +1,19 @@ +'use strict' + +var inherits = require('util').inherits + +function CompositeError (message, cause) { + if (message instanceof Error) { + message = '' + cause = message + } + this.message = message ? message.toString() : '' + this.cause = cause + Error.captureStackTrace && Error.captureStackTrace(this, this.constructor) + if (this.stack != null && this.cause instanceof Error && this.cause.stack != null) { + this.stack += '\nCaused by: ' + this.cause.stack + } +} +inherits(CompositeError, Error) + +module.exports = CompositeError diff --git a/lib/utils/exponentialRetry.js b/lib/utils/exponentialRetry.js new file mode 100644 index 0000000..6fe79d0 --- /dev/null +++ b/lib/utils/exponentialRetry.js @@ -0,0 +1,56 @@ +'use strict' + +var inherits = require('util').inherits +var retry = require('async/retry') + +var DEFAULT_MAX_RETRIES = Infinity +var DEFAULT_MAX_WAIT = Infinity +var DEFAULT_EXP_SCALE = 1 +var DEFAULT_LIN_SCALE = 1 +var DEFAULT_TRANS = 0 +var DEFAULT_ERR_SCALE = 0 +var DEFAULT_ERR_TRANS = 0 + +function MaxRetriesExceededError (n, last) { + Error.captureStackTrace && Error.captureStackTrace(this, this.constructor) + this.message = 'Network request max retry limit reached after ' + n + ' attempts. Last error message was: ' + last.message + if (this.stack && last.stack) { + this.stack += '\nCaused by: ' + last.stack + } +} +inherits(MaxRetriesExceededError, Error) + +function exponentialRetry (opts, task, cb) { + if (typeof opts === 'function') { + cb = task + task = opts + opts = {} + } + opts = opts || {} + var maxRetries = opts.maxRetries != null ? opts.maxRetries : DEFAULT_MAX_RETRIES + var maxWait = opts.maxWait != null ? opts.maxWait : DEFAULT_MAX_WAIT + var expScale = opts.expScale != null ? opts.expScale : DEFAULT_EXP_SCALE + var linScale = opts.linScale != null ? opts.linScale : DEFAULT_LIN_SCALE + var trans = opts.trans != null ? opts.trans : DEFAULT_TRANS + var errScale = opts.errScale != null ? opts.errScale : DEFAULT_ERR_SCALE + var errTrans = opts.errTrans != null ? opts.errTrans : DEFAULT_ERR_TRANS + var errorFilter = opts.errorFilter + + return retry({ + times: maxRetries + 1, + errorFilter: errorFilter, + interval: function (i) { + var wait = Math.exp((i - 1) * expScale) * linScale + trans + if (wait > maxWait) { + wait = maxWait + } + var rnd = 0.5 - Math.random() + wait = wait + (wait * rnd * errScale) + errTrans + var res = Math.floor(wait) + return res + } + }, task, cb) +} + +module.exports = exponentialRetry +module.exports.MaxRetriesExceededError = MaxRetriesExceededError diff --git a/lib/utils/exponentialRetry.spec.js b/lib/utils/exponentialRetry.spec.js new file mode 100644 index 0000000..301e7d0 --- /dev/null +++ b/lib/utils/exponentialRetry.spec.js @@ -0,0 +1,134 @@ +'use strict' + +var expect = require('chai').expect +var exponentialRetry = require('./exponentialRetry') + +describe('exponentialRetry', function () { + it('shouldn\'t retry successful task', function (done) { + function task (cb) { + return cb(null, 'ok') + } + var spy = this.sandbox.spy(task) + exponentialRetry(spy, function (err, res) { + if (err) { + return done(err) + } + expect(res).to.eql('ok') + expect(spy).to.have.been.calledOnce + done() + }) + }) + it('should retry once', function (done) { + function task (cb) { + return cb(new Error()) + } + var spy = this.sandbox.spy(task) + exponentialRetry( + { maxRetries: 1 }, + spy, + function (err, res) { + expect(err).to.exist + expect(spy).to.have.been.calledTwice + done() + }) + }) + it('should back off exponentially', function (done) { + var count = 0 + this.sandbox.stub(global, 'setTimeout').callsFake(function (cb, int) { + switch (count) { + case 0: expect(int).to.eql(2); break + case 1: expect(int).to.eql(7); break + case 2: expect(int).to.eql(20); break + case 3: expect(int).to.eql(54); break + case 4: expect(int).to.eql(148); break + } + count++ + return process.nextTick(cb) + }) + function task (cb) { + return cb(new Error()) + } + exponentialRetry( + { maxRetries: 5 }, + task, + function (err, res) { + expect(setTimeout).to.have.callCount(5) + expect(err).to.exist + done() + }) + }) + it('should accept errorFilter', function (done) { + var spy = this.sandbox.spy(function (cb) { + return cb(new Error()) + }) + exponentialRetry( + { + maxRetries: 5, + errorFilter: function (err) { // eslint-disable-line handle-callback-err + return false // shortcut retrial + } + }, + spy, + function (err, res) { + expect(spy).to.be.calledOnce + expect(err).to.exist + done() + }) + }) + it('should accept custom maximum interval', function (done) { + var count = 0 + this.sandbox.stub(global, 'setTimeout').callsFake(function (cb, int) { + switch (count) { + case 0: expect(int).to.eql(2); break + case 1: expect(int).to.eql(7); break + case 2: expect(int).to.eql(10); break + case 3: expect(int).to.eql(10); break + case 4: expect(int).to.eql(10); break + } + count++ + return process.nextTick(cb) + }) + function task (cb) { + return cb(new Error()) + } + exponentialRetry( + { + maxRetries: 5, + maxWait: 10 + }, + task, + function (err, res) { + expect(setTimeout).to.have.callCount(5) + expect(err).to.exist + done() + }) + }) + it('should add error', function (done) { + var count = 0 + this.sandbox.stub(global, 'setTimeout').callsFake(function (cb, int) { + switch (count) { + case 0: expect(int).to.eql(2); break + case 1: expect(int).to.eql(7); break + case 2: expect(int).to.eql(10); break + case 3: expect(int).to.eql(10); break + case 4: expect(int).to.eql(10); break + } + count++ + return process.nextTick(cb) + }) + function task (cb) { + return cb(new Error()) + } + exponentialRetry( + { + maxRetries: 5, + maxWait: 10 + }, + task, + function (err, res) { + expect(setTimeout).to.have.callCount(5) + expect(err).to.exist + done() + }) + }) +}) diff --git a/package.json b/package.json index 54342e5..1f27007 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@risingstack/trace", - "version": "3.6.1", + "version": "3.12.0-exponential-backoff.0", "author": "RisingStack, Inc.", "license": "SEE LICENSE IN LICENSE", "contributors": "RisingStack", @@ -44,6 +44,7 @@ "node": ">=0.12" }, "dependencies": { + "async": "^2.4.0", "bl": "1.2.0", "continuation-local-storage": "3.2.0", "debug": "2.6.4", diff --git a/test/e2e/apiCalls.spec.js b/test/e2e/apiCalls.spec.js index 818906a..b603f93 100644 --- a/test/e2e/apiCalls.spec.js +++ b/test/e2e/apiCalls.spec.js @@ -46,6 +46,7 @@ apiCalls.forEach(function (name) { url: TRACE_COLLECTOR_API_URL, apiKey: TRACE_API_KEY, callback: function (uri, requestBody) { + t.pass('Resolving service key') return [200, { key: TEST_TRACE_SERVICE_KEY }] } }) @@ -92,7 +93,7 @@ apiCalls.forEach(function (name) { .get('127.0.0.1:' + TEST_WEB_SERVER_PORT + '/test2') .set('ignore-me', '1') // set in IGNORE_HEADERS, looks external .end(function (err) { - t.error(err, 'client sends request to /test2 with that should look external') + t.error(err, 'client sends request to /test2 that should look external') }) res.send('test') }) diff --git a/test/e2e/initialization.spec.js b/test/e2e/initialization.spec.js index 250bd3a..7f0d063 100644 --- a/test/e2e/initialization.spec.js +++ b/test/e2e/initialization.spec.js @@ -50,6 +50,42 @@ test('should get service key', require('../..') }) +test('should retry', + { + isolate: 'child-process', + timeout: TEST_TIMEOUT * 2, + + childProcessOpts: { + env: { + TRACE_API_KEY: TRACE_API_KEY, + TRACE_SERVICE_NAME: TRACE_SERVICE_NAME, + TRACE_COLLECT_INTERVAL: 100 + } + } + }, function (t) { + var requests = 0 + var time + serviceMocks.mockServiceKeyRequest({ + url: TRACE_COLLECTOR_API_URL, + apiKey: TRACE_API_KEY, + maxTimes: 5, + callback: function (uri, requestBody) { + var old = time + time = Date.now() + t.equal(requestBody.name, TRACE_SERVICE_NAME, 'request ' + requests + ': +' + String(time - old) + ' ms') + if (requests >= 4) { + t.end() + } else { + ++requests + return [500, {}] + } + } + }) + t.plan(5) + time = Date.now() + require('../..') + }) + test('should stop', testSetup, function (t) { serviceMocks.mockServiceKeyRequest({ url: TRACE_COLLECTOR_API_URL, diff --git a/test/e2e/utils/serviceMocks.js b/test/e2e/utils/serviceMocks.js index ef7f58b..c0c8525 100644 --- a/test/e2e/utils/serviceMocks.js +++ b/test/e2e/utils/serviceMocks.js @@ -8,6 +8,7 @@ function mockServiceKeyRequest (opts) { } }) .post('/v2/service') + .times(opts.maxTimes || 1) .reply(opts.callback) }