Skip to content

Commit

Permalink
feat: add exponential backoff to API client
Browse files Browse the repository at this point in the history
The API client now retries retrieving the serviceKey via
the initialization request in an exponential backoff loop.
The exponential algorithm utilizes an error factor to
spread out the retrials, so when multiple clients are started
at the same time they are unlikely to issue any request after
the initial one at the same time.
  • Loading branch information
dszakallas committed May 22, 2017
1 parent b463c22 commit 0f2e4d5
Show file tree
Hide file tree
Showing 13 changed files with 423 additions and 52 deletions.
14 changes: 14 additions & 0 deletions lib/agent/api/httpError.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
'use strict'

var http = require('http')
var inherits = require('util').inherits

function HttpError (statusCode, response) {
Error.captureStackTrace && Error.captureStackTrace(this, this.constructor)
this.message = String(statusCode) + ' - ' + http.STATUS_CODES[statusCode]
this.statusCode = statusCode
this.response = response
}
inherits(HttpError, Error)

module.exports = HttpError
51 changes: 51 additions & 0 deletions lib/agent/api/httpRetry.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
'use strict'

var HttpError = require('./httpError')
var exponentialRetry = require('../../utils/exponentialRetry')

var DEFAULT_MAX_RETRIES = Infinity
var DEFAULT_MAX_WAIT = 10 * 60 * 1000
var DEFAULT_EXP_SCALE = 0.828
var DEFAULT_LIN_SCALE = 150
var DEFAULT_ERR_SCALE = 0.24 // +-12% error

function httpRetry (opts, cb) {
opts = opts || {}
var client = opts.client
var payload = opts.payload
var reqOpts = opts.reqOpts
var errorFilter = opts.errorFilter
var maxRetries = opts.maxRetries != null ? opts.maxRetries : DEFAULT_MAX_RETRIES
var maxWait = opts.maxWait != null ? opts.maxWait : DEFAULT_MAX_WAIT

function httpRequest (cb) {
var completed = false
var req = client.request(reqOpts, function (response) {
completed = true
if (response.statusCode >= 400) {
return cb(new HttpError(response.statusCode), response)
}
return cb(null, response)
})
req.on('error', function (err) {
if (!completed) {
completed = true
return cb(err)
}
})
if (payload) {
req.write(payload)
}
req.end()
}
return exponentialRetry({
maxRetries: maxRetries,
maxWait: maxWait,
expScale: DEFAULT_EXP_SCALE,
linScale: DEFAULT_LIN_SCALE,
errScale: DEFAULT_ERR_SCALE,
errorFilter: errorFilter
}, httpRequest, cb)
}

module.exports = httpRetry
68 changes: 68 additions & 0 deletions lib/agent/api/httpRetry.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
'use strict'

var http = require('http')
var HttpError = require('./httpError')
var httpRetry = require('./httpRetry')
var expect = require('chai').expect
var nock = require('nock')
var bl = require('bl')

describe('httpRetry', function (done) {
it('retries', function (done) {
nock('http://something.com')
.post('/', 'data')
.reply(500)
nock('http://something.com')
.post('/', 'data')
.reply(200, 'response')

this.sandbox.stub(global, 'setTimeout').callsFake(function (cb, int) {
return process.nextTick(cb)
})

httpRetry({
client: http,
maxRetries: 1,
reqOpts: {
hostname: 'something.com',
method: 'POST',
path: '/'
},
payload: 'data'
}, function (err, data) {
expect(err).to.not.exist
data.pipe(bl(function (err, data) {
expect(err).not.to.exist
expect(data.toString()).to.eql('response')
done()
}))
})
})
it('returns error', function (done) {
nock('http://something.com')
.post('/', 'data')
.reply(500, 'bad')

this.sandbox.stub(global, 'setTimeout').callsFake(function (cb, int) {
return process.nextTick(cb)
})

httpRetry({
client: http,
maxRetries: 0,
reqOpts: {
hostname: 'something.com',
method: 'POST',
path: '/'
},
payload: 'data'
}, function (err, data) {
expect(err).to.be.instanceof(HttpError)
data.pipe(bl(function (err, data) {
expect(err).to.not.exist
expect(data.toString()).to.eql('bad')
done()
}))
})
})
})
67 changes: 30 additions & 37 deletions lib/agent/api/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ var util = require('util')
var requestSync = require('sync-request')
var isNumber = require('lodash.isnumber')
var debug = require('../../utils/debug')('api')
var format = require('util').format
var assign = require('lodash.assign')
var HttpsProxyAgent = require('https-proxy-agent')
var stringify = require('json-stringify-safe')
var BufferStream = require('./bufferStream')
var httpRetry = require('./httpRetry')
var CompositeError = require('../../utils/compositeError')

var bl = require('bl')
var libPackage = require('../../../package')
Expand All @@ -34,6 +35,7 @@ function CollectorApi (options) {
this.serviceName = options.serviceName
this.baseRetryInterval = 1000 * 60 * 30 // 30 minutes
this.serviceKey = null
this.getServiceMaxRetries = Infinity

if (options.proxy) {
this.proxyAgent = new HttpsProxyAgent(options.proxy)
Expand Down Expand Up @@ -270,7 +272,8 @@ CollectorApi.prototype.getService = function (cb) {
cpus: self.system.cpus
}
})
var req = https.request({

var reqOpts = {
hostname: opts.hostname,
port: opts.port,
path: opts.path,
Expand All @@ -283,51 +286,41 @@ CollectorApi.prototype.getService = function (cb) {
'X-Reporter-Language': this.collectorLanguage,
'Content-Length': Buffer.byteLength(payload)
}
}, function (res) {
res.setEncoding('utf8')
res.pipe(bl(function (err, resBuffer) {
var response

var retryInterval = self.baseRetryInterval
}

if (err) {
debug.error('getService', err)
return setTimeout(function () {
debug.warn('getService', format('Retrying with %d ms', retryInterval))
self.getService()
}, retryInterval)
httpRetry({
client: https,
reqOpts: reqOpts,
payload: payload,
errorFilter: function shouldContinueRetrying (err) {
if (err.statusCode === 401) {
return false
}

var resText = resBuffer.toString('utf8')

if (res.statusCode === 401) {
debug.error('getService', 'Api key rejected')
return
return true
},
maxRetries: this.getServiceMaxRetries
},
function done (err, result) {
if (err) {
if (err.statusCode === 401) {
debug.error('getService', 'API key rejected')
}
if (res.statusCode > 399) {
debug.error('getService', 'Service responded with ' + res.statusCode)
return setTimeout(function () {
debug.warn('getService', format('Retrying with %d ms', retryInterval))
self.getService(cb)
}, retryInterval)
return cb(new CompositeError('Could not get service key', err))
}
var response
result.pipe(bl(function (err, data) {
if (err) {
cb(err)
}

try {
response = JSON.parse(resText)
} catch (ex) {
return
response = JSON.parse(data.toString('utf8'))
} catch (err) {
return cb(err)
}

self.serviceKey = response.key
cb(null, response.key)
}))
})

req.on('error', function (error) {
debug.error('getService', error)
})
req.write(payload)
req.end()
}

function logServiceKeyError (method) {
Expand Down
14 changes: 8 additions & 6 deletions lib/agent/api/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -275,16 +275,18 @@ describe('The Trace CollectorApi module', function () {
}
})
.post(defaultConfig.collectorApiServiceEndpoint, JSON.stringify(data))
.times(2)
.times(100)
.reply(409, {})

collectorApi.getService()

collectorApi.baseRetryInterval = 1
collectorApi.getServiceMaxRetries = 100

this.timeout(500)
global.setTimeout = this.sandbox.stub(global, 'setTimeout').callsFake(function (cb, int) {
return process.nextTick(cb)
})

this.sandbox.stub(collectorApi, 'getService').callsFake(function () {
collectorApi.getService(function (err) {
expect(setTimeout).to.have.callCount(100)
expect(err).to.exist
done()
})
})
Expand Down
9 changes: 2 additions & 7 deletions lib/agent/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,12 @@ Agent.prototype.stop = function (callback) {
debug.info('stop', 'Stopping agents...')
var agents = this.agents
var counter = 1
var error

agents.forEach(function (agent) {
agent.stop(function (err) {
if (!error && err) {
error = err
}

agent.stop(function () {
if (counter >= agents.length) {
if (callback && typeof callback === 'function') {
callback(error)
callback()
}
} else {
counter++
Expand Down
19 changes: 19 additions & 0 deletions lib/utils/compositeError.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
'use strict'

var inherits = require('util').inherits

function CompositeError (message, cause) {
if (message instanceof Error) {
message = ''
cause = message
}
this.message = message ? message.toString() : ''
this.cause = cause
Error.captureStackTrace && Error.captureStackTrace(this, this.constructor)
if (this.stack != null && this.cause instanceof Error && this.cause.stack != null) {
this.stack += '\nCaused by: ' + this.cause.stack
}
}
inherits(CompositeError, Error)

module.exports = CompositeError
56 changes: 56 additions & 0 deletions lib/utils/exponentialRetry.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
'use strict'

var inherits = require('util').inherits
var retry = require('async/retry')

var DEFAULT_MAX_RETRIES = Infinity
var DEFAULT_MAX_WAIT = Infinity
var DEFAULT_EXP_SCALE = 1
var DEFAULT_LIN_SCALE = 1
var DEFAULT_TRANS = 0
var DEFAULT_ERR_SCALE = 0
var DEFAULT_ERR_TRANS = 0

function MaxRetriesExceededError (n, last) {
Error.captureStackTrace && Error.captureStackTrace(this, this.constructor)
this.message = 'Network request max retry limit reached after ' + n + ' attempts. Last error message was: ' + last.message
if (this.stack && last.stack) {
this.stack += '\nCaused by: ' + last.stack
}
}
inherits(MaxRetriesExceededError, Error)

function exponentialRetry (opts, task, cb) {
if (typeof opts === 'function') {
cb = task
task = opts
opts = {}
}
opts = opts || {}
var maxRetries = opts.maxRetries != null ? opts.maxRetries : DEFAULT_MAX_RETRIES
var maxWait = opts.maxWait != null ? opts.maxWait : DEFAULT_MAX_WAIT
var expScale = opts.expScale != null ? opts.expScale : DEFAULT_EXP_SCALE
var linScale = opts.linScale != null ? opts.linScale : DEFAULT_LIN_SCALE
var trans = opts.trans != null ? opts.trans : DEFAULT_TRANS
var errScale = opts.errScale != null ? opts.errScale : DEFAULT_ERR_SCALE
var errTrans = opts.errTrans != null ? opts.errTrans : DEFAULT_ERR_TRANS
var errorFilter = opts.errorFilter

return retry({
times: maxRetries + 1,
errorFilter: errorFilter,
interval: function (i) {
var wait = Math.exp((i - 1) * expScale) * linScale + trans
if (wait > maxWait) {
wait = maxWait
}
var rnd = 0.5 - Math.random()
wait = wait + (wait * rnd * errScale) + errTrans
var res = Math.floor(wait)
return res
}
}, task, cb)
}

module.exports = exponentialRetry
module.exports.MaxRetriesExceededError = MaxRetriesExceededError
Loading

0 comments on commit 0f2e4d5

Please sign in to comment.