diff --git a/packages/dd-trace/src/config.js b/packages/dd-trace/src/config.js index 357a53b4383..c612595d938 100644 --- a/packages/dd-trace/src/config.js +++ b/packages/dd-trace/src/config.js @@ -204,6 +204,17 @@ class Config { false ) + const DD_OPENAI_LOGS_ENABLED = coalesce( + options.openAiLogsEnabled, + process.env.DD_OPENAI_LOGS_ENABLED, + false + ) + + const DD_API_KEY = coalesce( + process.env.DATADOG_API_KEY, + process.env.DD_API_KEY + ) + const inAWSLambda = process.env.AWS_LAMBDA_FUNCTION_NAME !== undefined const isDeprecatedGCPFunction = process.env.FUNCTION_NAME !== undefined && process.env.GCP_PROJECT !== undefined @@ -471,6 +482,8 @@ ken|consumer_?(?:id|key|secret)|sign(?:ed|ature)?|auth(?:entication|orization)?) this.tracing = !isFalse(DD_TRACING_ENABLED) this.dbmPropagationMode = DD_DBM_PROPAGATION_MODE + this.openAiLogsEnabled = DD_OPENAI_LOGS_ENABLED + this.apiKey = DD_API_KEY this.logInjection = isTrue(DD_LOGS_INJECTION) this.env = DD_ENV this.url = DD_CIVISIBILITY_AGENTLESS_URL ? new URL(DD_CIVISIBILITY_AGENTLESS_URL) diff --git a/packages/dd-trace/src/external-logger/src/index.js b/packages/dd-trace/src/external-logger/src/index.js new file mode 100644 index 00000000000..7161e1b12c7 --- /dev/null +++ b/packages/dd-trace/src/external-logger/src/index.js @@ -0,0 +1,126 @@ +const tracerLogger = require('../../log')// path to require tracer logger + +const https = require('https') + +class ExternalLogger { + // Note: these attribute names match the corresponding entry in the JSON payload. + constructor ({ + ddsource, hostname, service, apiKey, site = 'datadoghq.com', interval = 10000, timeout = 2000, limit = 1000 + }) { + this.ddsource = ddsource + this.hostname = hostname + this.service = service + this.interval = interval + this.timeout = timeout + this.queue = [] + this.limit = limit + this.endpoint = '/api/v2/logs' + this.site = site + this.intake = `http-intake.logs.${this.site}` + this.headers = { + 'DD-API-KEY': apiKey, + 'Content-Type': 'application/json' + } + this.timer = setInterval(() => { + this.flush() + }, this.interval).unref() + + tracerLogger.debug(`started log writer to https://${this.intake}${this.endpoint}`) + } + + static tagString (tags) { + const tagArray = [] + for (const key in tags) { + tagArray.push(key + ':' + tags[key]) + } + return tagArray.join(',') + } + + // Parses and enqueues a log + log (log, span, tags) { + const logTags = ExternalLogger.tagString(tags) + + if (span) { + log['dd.trace_id'] = String(span.trace_id) + log['dd.span_id'] = String(span.span_id) + } + + const payload = { + ...log, + 'timestamp': Date.now(), + 'hostname': log.hostname || this.hostname, + 'ddsource': log.ddsource || this.ddsource, + 'service': log.service || this.service, + 'ddtags': logTags || undefined + } + + this.enqueue(payload) + } + + // Enqueues a raw, non-formatted log object + enqueue (log) { + if (this.queue.length >= this.limit) { + this.flush() + } + this.queue.push(log) + } + + shutdown () { + clearInterval(this.timer) + this.flush() + } + + // Flushes logs with optional callback for when the call is complete + flush (cb = () => {}) { + let logs + let numLogs + let encodedLogs + + if (!this.queue.length) { + setImmediate(() => cb()) + return + } + + try { + logs = this.queue + this.queue = [] + + numLogs = logs.length + encodedLogs = JSON.stringify(logs) + } catch (error) { + tracerLogger.error(`failed to encode ${numLogs} logs`) + setImmediate(() => cb(error)) + return + } + + const options = { + hostname: this.intake, + port: 443, + path: this.endpoint, + method: 'POST', + headers: this.headers, + timeout: this.timeout + } + + const req = https.request(options, (res) => { + tracerLogger.info(`statusCode: ${res.statusCode}`) + }) + req.once('error', (e) => { + tracerLogger.error(`failed to send ${numLogs} log(s), with error ${e.message}`) + cb(e) + }) + req.write(encodedLogs) + req.end() + req.once('response', (res) => { + if (res.statusCode >= 400) { + const error = new Error(`failed to send ${numLogs} logs, received response code ${res.statusCode}`) + tracerLogger.error(error.message) + cb(error) + return + } + cb() + }) + } +} + +module.exports = ExternalLogger diff --git a/packages/dd-trace/src/external-logger/test/index.spec.js b/packages/dd-trace/src/external-logger/test/index.spec.js new file mode 100644 index 00000000000..e856e78be3d --- /dev/null +++ b/packages/dd-trace/src/external-logger/test/index.spec.js @@ -0,0 +1,147 @@ +'use strict' + +require('../../../../dd-trace/test/setup/tap') +const proxyquire = require('proxyquire') +const { expect } = require('chai') +const nock = require('nock') + +const tracerLogger = require('../../log') + +describe('External Logger', () => { + let externalLogger + let interceptor + let errorLog + + beforeEach(() => { + errorLog = sinon.spy(tracerLogger, 'error') + + const ExternalLogger = proxyquire('../src', { + '../../log': { + error: errorLog + } + }) + + externalLogger = new ExternalLogger({ + ddsource: 'logging_from_space', + hostname: 'mac_desktop', + apiKey: 'API_KEY_PLACEHOLDER', + interval: 10000, + timeout: 5000, + limit: 10 + }) + }) + + afterEach(() => { + interceptor.done() + errorLog.restore() + }) + + it('should properly encode the log message', (done) => { + let request + const currentTime = Date.now() + + interceptor = nock('https://http-intake.logs.datadoghq.com:443') + .post('/api/v2/logs') + .reply((_uri, req, cb) => { + request = req + cb(null, [202, '{}', { 'Content-Type': 'application/json' }]) + }) + + const span = { + service: 'openAi', + trace_id: '000001000', + span_id: '9999991999' + } + const tags = { + env: 'external_logger', + version: '1.2.3', + service: 'external' + } + externalLogger.log({ + message: 'oh no, something is up', + custom: 'field', + attribute: 'funky', + service: 'outer_space', + level: 'info' + }, span, tags) + + externalLogger.flush((err) => { + try { + expect(request[0]).to.have.property('message', 'oh no, something is up') + expect(request[0]).to.have.property('custom', 'field') + expect(request[0]).to.have.property('attribute', 'funky') + expect(request[0]).to.have.property('service', 'outer_space') + expect(request[0]).to.have.property('level', 'info') + expect(request[0]).to.have.property('dd.trace_id', '000001000') + expect(request[0]).to.have.property('dd.span_id', '9999991999') + expect(request[0].timestamp).to.be.greaterThanOrEqual(currentTime) + expect(request[0]).to.have.property('ddsource', 'logging_from_space') + expect(request[0]).to.have.property('ddtags', 'env:external_logger,version:1.2.3,service:external') + } catch (e) { + done(e) + return + } + + done(err) + }) + }) + + it('should empty the log queue when calling flush', (done) => { + interceptor = nock('https://http-intake.logs.datadoghq.com:443') + .post('/api/v2/logs') + .reply(202, {}) + + externalLogger.enqueue({}) + expect(externalLogger.queue.length).to.equal(1) + + externalLogger.flush((err) => { + expect(externalLogger.queue.length).to.equal(0) + done(err) + }) + }) + + it('tracer logger should handle error response codes from Logs API', (done) => { + interceptor = nock('https://http-intake.logs.datadoghq.com:443') + .post('/api/v2/logs') + .reply(400, {}) + + externalLogger.enqueue({}) + externalLogger.flush((err) => { + expect(err).to.be.an.instanceOf(Error) + expect(errorLog.getCall(0).args[0]).to.be.equal( + 'failed to send 1 logs, received response code 400' + ) + done() + }) + }) + + it('tracer logger should handle simulated network error', (done) => { + interceptor = nock('https://http-intake.logs.datadoghq.com:443') + .post('/api/v2/logs') + .replyWithError('missing API key') + + externalLogger.enqueue({}) + externalLogger.flush((err) => { + expect(err).to.be.an.instanceOf(Error) + expect(errorLog.getCall(0).args[0]).to.be.equal( + 'failed to send 1 log(s), with error missing API key' + ) + done() + }) + }) + + it('causes a flush when exceeding log queue limit', (done) => { + const flusher = sinon.stub(externalLogger, 'flush') + + for (let i = 0; i < 10; i++) { + externalLogger.enqueue({}) + } + expect(flusher).to.not.have.been.called + + externalLogger.enqueue({}) + expect(flusher).to.have.been.called + + flusher.restore() + done() + }) +})