-
Notifications
You must be signed in to change notification settings - Fork 306
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
--------- Co-authored-by: Thomas Hunter II <tlhunter@datadog.com>
- Loading branch information
Showing
3 changed files
with
286 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
const tracerLogger = require('../../log')// path to require tracer logger | ||
|
||
const https = require('https') | ||
|
||
class ExternalLogger { | ||
// Note: these attribute names match the corresponding entry in the JSON payload. | ||
constructor ({ | ||
ddsource, hostname, service, apiKey, site = 'datadoghq.com', interval = 10000, timeout = 2000, limit = 1000 | ||
}) { | ||
this.ddsource = ddsource | ||
this.hostname = hostname | ||
this.service = service | ||
this.interval = interval | ||
this.timeout = timeout | ||
this.queue = [] | ||
this.limit = limit | ||
this.endpoint = '/api/v2/logs' | ||
this.site = site | ||
this.intake = `http-intake.logs.${this.site}` | ||
this.headers = { | ||
'DD-API-KEY': apiKey, | ||
'Content-Type': 'application/json' | ||
} | ||
this.timer = setInterval(() => { | ||
this.flush() | ||
}, this.interval).unref() | ||
|
||
tracerLogger.debug(`started log writer to https://${this.intake}${this.endpoint}`) | ||
} | ||
|
||
static tagString (tags) { | ||
const tagArray = [] | ||
for (const key in tags) { | ||
tagArray.push(key + ':' + tags[key]) | ||
} | ||
return tagArray.join(',') | ||
} | ||
|
||
// Parses and enqueues a log | ||
log (log, span, tags) { | ||
const logTags = ExternalLogger.tagString(tags) | ||
|
||
if (span) { | ||
log['dd.trace_id'] = String(span.trace_id) | ||
log['dd.span_id'] = String(span.span_id) | ||
} | ||
|
||
const payload = { | ||
...log, | ||
'timestamp': Date.now(), | ||
'hostname': log.hostname || this.hostname, | ||
'ddsource': log.ddsource || this.ddsource, | ||
'service': log.service || this.service, | ||
'ddtags': logTags || undefined | ||
} | ||
|
||
this.enqueue(payload) | ||
} | ||
|
||
// Enqueues a raw, non-formatted log object | ||
enqueue (log) { | ||
if (this.queue.length >= this.limit) { | ||
this.flush() | ||
} | ||
this.queue.push(log) | ||
} | ||
|
||
shutdown () { | ||
clearInterval(this.timer) | ||
this.flush() | ||
} | ||
|
||
// Flushes logs with optional callback for when the call is complete | ||
flush (cb = () => {}) { | ||
let logs | ||
let numLogs | ||
let encodedLogs | ||
|
||
if (!this.queue.length) { | ||
setImmediate(() => cb()) | ||
return | ||
} | ||
|
||
try { | ||
logs = this.queue | ||
this.queue = [] | ||
|
||
numLogs = logs.length | ||
encodedLogs = JSON.stringify(logs) | ||
} catch (error) { | ||
tracerLogger.error(`failed to encode ${numLogs} logs`) | ||
setImmediate(() => cb(error)) | ||
return | ||
} | ||
|
||
const options = { | ||
hostname: this.intake, | ||
port: 443, | ||
path: this.endpoint, | ||
method: 'POST', | ||
headers: this.headers, | ||
timeout: this.timeout | ||
} | ||
|
||
const req = https.request(options, (res) => { | ||
tracerLogger.info(`statusCode: ${res.statusCode}`) | ||
}) | ||
req.once('error', (e) => { | ||
tracerLogger.error(`failed to send ${numLogs} log(s), with error ${e.message}`) | ||
cb(e) | ||
}) | ||
req.write(encodedLogs) | ||
req.end() | ||
req.once('response', (res) => { | ||
if (res.statusCode >= 400) { | ||
const error = new Error(`failed to send ${numLogs} logs, received response code ${res.statusCode}`) | ||
tracerLogger.error(error.message) | ||
cb(error) | ||
return | ||
} | ||
cb() | ||
}) | ||
} | ||
} | ||
|
||
module.exports = ExternalLogger |
147 changes: 147 additions & 0 deletions
147
packages/dd-trace/src/external-logger/test/index.spec.js
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,147 @@ | ||
'use strict' | ||
|
||
require('../../../../dd-trace/test/setup/tap') | ||
const proxyquire = require('proxyquire') | ||
const { expect } = require('chai') | ||
const nock = require('nock') | ||
|
||
const tracerLogger = require('../../log') | ||
|
||
describe('External Logger', () => { | ||
let externalLogger | ||
let interceptor | ||
let errorLog | ||
|
||
beforeEach(() => { | ||
errorLog = sinon.spy(tracerLogger, 'error') | ||
|
||
const ExternalLogger = proxyquire('../src', { | ||
'../../log': { | ||
error: errorLog | ||
} | ||
}) | ||
|
||
externalLogger = new ExternalLogger({ | ||
ddsource: 'logging_from_space', | ||
hostname: 'mac_desktop', | ||
apiKey: 'API_KEY_PLACEHOLDER', | ||
interval: 10000, | ||
timeout: 5000, | ||
limit: 10 | ||
}) | ||
}) | ||
|
||
afterEach(() => { | ||
interceptor.done() | ||
errorLog.restore() | ||
}) | ||
|
||
it('should properly encode the log message', (done) => { | ||
let request | ||
const currentTime = Date.now() | ||
|
||
interceptor = nock('https://http-intake.logs.datadoghq.com:443') | ||
.post('/api/v2/logs') | ||
.reply((_uri, req, cb) => { | ||
request = req | ||
cb(null, [202, '{}', { 'Content-Type': 'application/json' }]) | ||
}) | ||
|
||
const span = { | ||
service: 'openAi', | ||
trace_id: '000001000', | ||
span_id: '9999991999' | ||
} | ||
const tags = { | ||
env: 'external_logger', | ||
version: '1.2.3', | ||
service: 'external' | ||
} | ||
externalLogger.log({ | ||
message: 'oh no, something is up', | ||
custom: 'field', | ||
attribute: 'funky', | ||
service: 'outer_space', | ||
level: 'info' | ||
}, span, tags) | ||
|
||
externalLogger.flush((err) => { | ||
try { | ||
expect(request[0]).to.have.property('message', 'oh no, something is up') | ||
expect(request[0]).to.have.property('custom', 'field') | ||
expect(request[0]).to.have.property('attribute', 'funky') | ||
expect(request[0]).to.have.property('service', 'outer_space') | ||
expect(request[0]).to.have.property('level', 'info') | ||
expect(request[0]).to.have.property('dd.trace_id', '000001000') | ||
expect(request[0]).to.have.property('dd.span_id', '9999991999') | ||
expect(request[0].timestamp).to.be.greaterThanOrEqual(currentTime) | ||
expect(request[0]).to.have.property('ddsource', 'logging_from_space') | ||
expect(request[0]).to.have.property('ddtags', 'env:external_logger,version:1.2.3,service:external') | ||
} catch (e) { | ||
done(e) | ||
return | ||
} | ||
|
||
done(err) | ||
}) | ||
}) | ||
|
||
it('should empty the log queue when calling flush', (done) => { | ||
interceptor = nock('https://http-intake.logs.datadoghq.com:443') | ||
.post('/api/v2/logs') | ||
.reply(202, {}) | ||
|
||
externalLogger.enqueue({}) | ||
expect(externalLogger.queue.length).to.equal(1) | ||
|
||
externalLogger.flush((err) => { | ||
expect(externalLogger.queue.length).to.equal(0) | ||
done(err) | ||
}) | ||
}) | ||
|
||
it('tracer logger should handle error response codes from Logs API', (done) => { | ||
interceptor = nock('https://http-intake.logs.datadoghq.com:443') | ||
.post('/api/v2/logs') | ||
.reply(400, {}) | ||
|
||
externalLogger.enqueue({}) | ||
externalLogger.flush((err) => { | ||
expect(err).to.be.an.instanceOf(Error) | ||
expect(errorLog.getCall(0).args[0]).to.be.equal( | ||
'failed to send 1 logs, received response code 400' | ||
) | ||
done() | ||
}) | ||
}) | ||
|
||
it('tracer logger should handle simulated network error', (done) => { | ||
interceptor = nock('https://http-intake.logs.datadoghq.com:443') | ||
.post('/api/v2/logs') | ||
.replyWithError('missing API key') | ||
|
||
externalLogger.enqueue({}) | ||
externalLogger.flush((err) => { | ||
expect(err).to.be.an.instanceOf(Error) | ||
expect(errorLog.getCall(0).args[0]).to.be.equal( | ||
'failed to send 1 log(s), with error missing API key' | ||
) | ||
done() | ||
}) | ||
}) | ||
|
||
it('causes a flush when exceeding log queue limit', (done) => { | ||
const flusher = sinon.stub(externalLogger, 'flush') | ||
|
||
for (let i = 0; i < 10; i++) { | ||
externalLogger.enqueue({}) | ||
} | ||
expect(flusher).to.not.have.been.called | ||
|
||
externalLogger.enqueue({}) | ||
expect(flusher).to.have.been.called | ||
|
||
flusher.restore() | ||
done() | ||
}) | ||
}) |