Skip to content

Commit

Permalink
add external log writer (#3201)
Browse files Browse the repository at this point in the history

---------

Co-authored-by: Thomas Hunter II <tlhunter@datadog.com>
  • Loading branch information
crysmags and tlhunter committed Jun 23, 2023
1 parent 825f16d commit 1ec168f
Show file tree
Hide file tree
Showing 3 changed files with 286 additions and 0 deletions.
13 changes: 13 additions & 0 deletions packages/dd-trace/src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,17 @@ class Config {
false
)

const DD_OPENAI_LOGS_ENABLED = coalesce(
options.openAiLogsEnabled,
process.env.DD_OPENAI_LOGS_ENABLED,
false
)

const DD_API_KEY = coalesce(
process.env.DATADOG_API_KEY,
process.env.DD_API_KEY
)

const inAWSLambda = process.env.AWS_LAMBDA_FUNCTION_NAME !== undefined

const isDeprecatedGCPFunction = process.env.FUNCTION_NAME !== undefined && process.env.GCP_PROJECT !== undefined
Expand Down Expand Up @@ -468,6 +479,8 @@ ken|consumer_?(?:id|key|secret)|sign(?:ed|ature)?|auth(?:entication|orization)?)

this.tracing = !isFalse(DD_TRACING_ENABLED)
this.dbmPropagationMode = DD_DBM_PROPAGATION_MODE
this.openAiLogsEnabled = DD_OPENAI_LOGS_ENABLED
this.apiKey = DD_API_KEY
this.logInjection = isTrue(DD_LOGS_INJECTION)
this.env = DD_ENV
this.url = DD_CIVISIBILITY_AGENTLESS_URL ? new URL(DD_CIVISIBILITY_AGENTLESS_URL)
Expand Down
126 changes: 126 additions & 0 deletions packages/dd-trace/src/external-logger/src/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
const tracerLogger = require('../../log')// path to require tracer logger

const https = require('https')

class ExternalLogger {
// Note: these attribute names match the corresponding entry in the JSON payload.
constructor ({
ddsource, hostname, service, apiKey, site = 'datadoghq.com', interval = 10000, timeout = 2000, limit = 1000
}) {
this.ddsource = ddsource
this.hostname = hostname
this.service = service
this.interval = interval
this.timeout = timeout
this.queue = []
this.limit = limit
this.endpoint = '/api/v2/logs'
this.site = site
this.intake = `http-intake.logs.${this.site}`
this.headers = {
'DD-API-KEY': apiKey,
'Content-Type': 'application/json'
}
this.timer = setInterval(() => {
this.flush()
}, this.interval).unref()

tracerLogger.debug(`started log writer to https://${this.intake}${this.endpoint}`)
}

static tagString (tags) {
const tagArray = []
for (const key in tags) {
tagArray.push(key + ':' + tags[key])
}
return tagArray.join(',')
}

// Parses and enqueues a log
log (log, span, tags) {
const logTags = ExternalLogger.tagString(tags)

if (span) {
log['dd.trace_id'] = String(span.trace_id)
log['dd.span_id'] = String(span.span_id)
}

const payload = {
...log,
'timestamp': Date.now(),
'hostname': log.hostname || this.hostname,
'ddsource': log.ddsource || this.ddsource,
'service': log.service || this.service,
'ddtags': logTags || undefined
}

this.enqueue(payload)
}

// Enqueues a raw, non-formatted log object
enqueue (log) {
if (this.queue.length >= this.limit) {
this.flush()
}
this.queue.push(log)
}

shutdown () {
clearInterval(this.timer)
this.flush()
}

// Flushes logs with optional callback for when the call is complete
flush (cb = () => {}) {
let logs
let numLogs
let encodedLogs

if (!this.queue.length) {
setImmediate(() => cb())
return
}

try {
logs = this.queue
this.queue = []

numLogs = logs.length
encodedLogs = JSON.stringify(logs)
} catch (error) {
tracerLogger.error(`failed to encode ${numLogs} logs`)
setImmediate(() => cb(error))
return
}

const options = {
hostname: this.intake,
port: 443,
path: this.endpoint,
method: 'POST',
headers: this.headers,
timeout: this.timeout
}

const req = https.request(options, (res) => {
tracerLogger.info(`statusCode: ${res.statusCode}`)
})
req.once('error', (e) => {
tracerLogger.error(`failed to send ${numLogs} log(s), with error ${e.message}`)
cb(e)
})
req.write(encodedLogs)
req.end()
req.once('response', (res) => {
if (res.statusCode >= 400) {
const error = new Error(`failed to send ${numLogs} logs, received response code ${res.statusCode}`)
tracerLogger.error(error.message)
cb(error)
return
}
cb()
})
}
}

module.exports = ExternalLogger
147 changes: 147 additions & 0 deletions packages/dd-trace/src/external-logger/test/index.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
'use strict'

require('../../../../dd-trace/test/setup/tap')
const proxyquire = require('proxyquire')
const { expect } = require('chai')
const nock = require('nock')

const tracerLogger = require('../../log')

describe('External Logger', () => {
let externalLogger
let interceptor
let errorLog

beforeEach(() => {
errorLog = sinon.spy(tracerLogger, 'error')

const ExternalLogger = proxyquire('../src', {
'../../log': {
error: errorLog
}
})

externalLogger = new ExternalLogger({
ddsource: 'logging_from_space',
hostname: 'mac_desktop',
apiKey: 'API_KEY_PLACEHOLDER',
interval: 10000,
timeout: 5000,
limit: 10
})
})

afterEach(() => {
interceptor.done()
errorLog.restore()
})

it('should properly encode the log message', (done) => {
let request
const currentTime = Date.now()

interceptor = nock('https://http-intake.logs.datadoghq.com:443')
.post('/api/v2/logs')
.reply((_uri, req, cb) => {
request = req
cb(null, [202, '{}', { 'Content-Type': 'application/json' }])
})

const span = {
service: 'openAi',
trace_id: '000001000',
span_id: '9999991999'
}
const tags = {
env: 'external_logger',
version: '1.2.3',
service: 'external'
}
externalLogger.log({
message: 'oh no, something is up',
custom: 'field',
attribute: 'funky',
service: 'outer_space',
level: 'info'
}, span, tags)

externalLogger.flush((err) => {
try {
expect(request[0]).to.have.property('message', 'oh no, something is up')
expect(request[0]).to.have.property('custom', 'field')
expect(request[0]).to.have.property('attribute', 'funky')
expect(request[0]).to.have.property('service', 'outer_space')
expect(request[0]).to.have.property('level', 'info')
expect(request[0]).to.have.property('dd.trace_id', '000001000')
expect(request[0]).to.have.property('dd.span_id', '9999991999')
expect(request[0].timestamp).to.be.greaterThanOrEqual(currentTime)
expect(request[0]).to.have.property('ddsource', 'logging_from_space')
expect(request[0]).to.have.property('ddtags', 'env:external_logger,version:1.2.3,service:external')
} catch (e) {
done(e)
return
}

done(err)
})
})

it('should empty the log queue when calling flush', (done) => {
interceptor = nock('https://http-intake.logs.datadoghq.com:443')
.post('/api/v2/logs')
.reply(202, {})

externalLogger.enqueue({})
expect(externalLogger.queue.length).to.equal(1)

externalLogger.flush((err) => {
expect(externalLogger.queue.length).to.equal(0)
done(err)
})
})

it('tracer logger should handle error response codes from Logs API', (done) => {
interceptor = nock('https://http-intake.logs.datadoghq.com:443')
.post('/api/v2/logs')
.reply(400, {})

externalLogger.enqueue({})
externalLogger.flush((err) => {
expect(err).to.be.an.instanceOf(Error)
expect(errorLog.getCall(0).args[0]).to.be.equal(
'failed to send 1 logs, received response code 400'
)
done()
})
})

it('tracer logger should handle simulated network error', (done) => {
interceptor = nock('https://http-intake.logs.datadoghq.com:443')
.post('/api/v2/logs')
.replyWithError('missing API key')

externalLogger.enqueue({})
externalLogger.flush((err) => {
expect(err).to.be.an.instanceOf(Error)
expect(errorLog.getCall(0).args[0]).to.be.equal(
'failed to send 1 log(s), with error missing API key'
)
done()
})
})

it('causes a flush when exceeding log queue limit', (done) => {
const flusher = sinon.stub(externalLogger, 'flush')

for (let i = 0; i < 10; i++) {
externalLogger.enqueue({})
}
expect(flusher).to.not.have.been.called

externalLogger.enqueue({})
expect(flusher).to.have.been.called

flusher.restore()
done()
})
})

0 comments on commit 1ec168f

Please sign in to comment.