diff --git a/.circleci/config.yml b/.circleci/config.yml index 5add181eef9..fbe72b9195f 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -627,6 +627,11 @@ jobs: environment: - PLUGINS=sharedb + node-undici: + <<: *node-plugin-base + docker: + - image: node:<< parameters.node-version >> + node-when: *node-plugin-base node-winston: *node-plugin-base @@ -818,6 +823,10 @@ workflows: - node-router: *matrix-supported-node-versions - node-sharedb: *matrix-supported-node-versions - node-tedious: *matrix-supported-node-versions + - node-undici: + matrix: + parameters: + node-version: ["14.9", "16"] - node-when: *matrix-supported-node-versions - node-winston: *matrix-supported-node-versions - codecov: @@ -873,6 +882,7 @@ workflows: - node-router - node-sharedb - node-tedious + - node-undici - node-when - node-winston bench: &bench-jobs diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index aeeeab13a1f..8179a34ed94 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -24,6 +24,7 @@ require,retry,MIT,Copyright 2011 Tim Koschützki Felix Geisendörfer require,semver,ISC,Copyright Isaac Z. Schlueter and Contributors require,source-map,BSD-3-Clause,Copyright 2009-2011 Mozilla Foundation and contributors require,source-map-resolve,MIT,Copyright 2014-2020 Simon Lydell 2019 Jinxiang +dev,abort-controller,MIT,Copyright (c) 2017 Toru Nagashima dev,autocannon,MIT,Copyright 2016 Matteo Collina dev,axios,MIT,Copyright 2014-present Matt Zabriskie dev,benchmark,MIT,Copyright 2010-2016 Mathias Bynens Robert Kieffer John-David Dalton diff --git a/docs/API.md b/docs/API.md index cc3683d34d2..59146dcae9a 100644 --- a/docs/API.md +++ b/docs/API.md @@ -90,6 +90,7 @@ tracer.use('pg', {
+

Available Plugins

@@ -138,6 +139,7 @@ tracer.use('pg', { * [restify](./interfaces/plugins.restify.html) * [router](./interfaces/plugins.router.html) * [tedious](./interfaces/plugins.tedious.html) +* [undici](./interfaces/plugins.undici.html) * [when](./interfaces/plugins.when.html) * [winston](./interfaces/plugins.winston.html) diff --git a/docs/test.ts b/docs/test.ts index 53aa209ede0..6bc21fa0a1d 100644 --- a/docs/test.ts +++ b/docs/test.ts @@ -250,6 +250,7 @@ tracer.use('rhea'); tracer.use('router'); tracer.use('sharedb', sharedbOptions); tracer.use('tedious'); +tracer.use('undici', httpClientOptions); tracer.use('winston'); tracer.use('express', false) diff --git a/index.d.ts b/index.d.ts index 38d2bab0334..efb50dcfc4f 100644 --- a/index.d.ts +++ b/index.d.ts @@ -513,6 +513,7 @@ interface Plugins { "router": plugins.router; "sharedb": plugins.sharedb; "tedious": plugins.tedious; + "undici": plugins.undici; "winston": plugins.winston; } @@ -1282,6 +1283,12 @@ declare namespace plugins { */ interface tedious extends Instrumentation {} + /** + * This plugin automatically instruments + * [undici](https://github.com/nodejs/undici/) + */ + interface undici extends HttpClient {} + /** * This plugin patches the [winston](https://github.com/winstonjs/winston) * to automatically inject trace identifiers in log records when the diff --git a/package.json b/package.json index 80e34a671be..86583ed56a1 100644 --- a/package.json +++ b/package.json @@ -88,6 +88,7 @@ "source-map-resolve": "^0.6.0" }, "devDependencies": { + "abort-controller": "^3.0.0", "autocannon": "^4.5.2", "axios": "^0.21.2", "benchmark": "^2.1.4", diff --git a/packages/datadog-plugin-http/src/client.js b/packages/datadog-plugin-http/src/client.js index cc14d26bd68..71207fac4f1 100644 --- a/packages/datadog-plugin-http/src/client.js +++ b/packages/datadog-plugin-http/src/client.js @@ -5,9 +5,9 @@ const log = require('../../dd-trace/src/log') const tags = require('../../../ext/tags') const kinds = require('../../../ext/kinds') const formats = require('../../../ext/formats') -const urlFilter = require('../../dd-trace/src/plugins/util/urlfilter') const analyticsSampler = require('../../dd-trace/src/analytics_sampler') const { storage } = require('../../datadog-core') +const { addErrorToSpan, getServiceName, hasAmazonSignature, client } = require('../../dd-trace/src/plugins/util/web') const HTTP_HEADERS = formats.HTTP_HEADERS const HTTP_STATUS_CODE = tags.HTTP_STATUS_CODE @@ -17,7 +17,7 @@ const SPAN_KIND = tags.SPAN_KIND const CLIENT = kinds.CLIENT function patch (http, methodName, tracer, config) { - config = normalizeConfig(tracer, config) + config = normalizeConfig(config) this.wrap(http, methodName, fn => makeRequestTrace(fn)) function makeRequestTrace (request) { @@ -84,7 +84,7 @@ function patch (http, methodName, tracer, config) { break } case 'error': - addError(span, arg) + addErrorToSpan(span, arg) case 'abort': // eslint-disable-line no-fallthrough case 'timeout': // eslint-disable-line no-fallthrough finish(req, null, span, config) @@ -119,16 +119,6 @@ function patch (http, methodName, tracer, config) { span.finish() } - function addError (span, error) { - span.addTags({ - 'error.type': error.name, - 'error.msg': error.message, - 'error.stack': error.stack - }) - - return error - } - function addRequestHeaders (req, span, config) { config.headers.forEach(key => { const value = req.getHeader(key) @@ -220,105 +210,15 @@ function patch (http, methodName, tracer, config) { } } -function getHost (options) { - if (typeof options === 'string') { - return url.parse(options).host - } - - const hostname = options.hostname || options.host || 'localhost' - const port = options.port - - return [hostname, port].filter(val => val).join(':') -} - -function getServiceName (tracer, config, options) { - if (config.splitByDomain) { - return getHost(options) - } else if (config.service) { - return config.service - } - - return `${tracer._service}-http-client` -} - -function hasAmazonSignature (options) { - if (!options) { - return false - } - - if (options.headers) { - const headers = Object.keys(options.headers) - .reduce((prev, next) => Object.assign(prev, { - [next.toLowerCase()]: options.headers[next] - }), {}) - - if (headers['x-amz-signature']) { - return true - } - - if ([].concat(headers['authorization']).some(startsWith('AWS4-HMAC-SHA256'))) { - return true - } - } - - return options.path && options.path.toLowerCase().indexOf('x-amz-signature=') !== -1 -} - -function startsWith (searchString) { - return value => String(value).startsWith(searchString) -} - function unpatch (http) { this.unwrap(http, 'request') this.unwrap(http, 'get') } -function getStatusValidator (config) { - if (typeof config.validateStatus === 'function') { - return config.validateStatus - } else if (config.hasOwnProperty('validateStatus')) { - log.error('Expected `validateStatus` to be a function.') - } - return code => code < 400 || code >= 500 -} - -function getFilter (config) { - config = Object.assign({}, config, { - blocklist: config.blocklist || [] - }) - - return urlFilter.getFilter(config) -} - -function normalizeConfig (tracer, config) { +function normalizeConfig (config) { config = config.client || config - const validateStatus = getStatusValidator(config) - const propagationFilter = getFilter({ blocklist: config.propagationBlocklist }) - const headers = getHeaders(config) - const hooks = getHooks(config) - - return Object.assign({}, config, { - validateStatus, - propagationFilter, - headers, - hooks - }) -} - -function getHeaders (config) { - if (!Array.isArray(config.headers)) return [] - - return config.headers - .filter(key => typeof key === 'string') - .map(key => key.toLowerCase()) -} - -function getHooks (config) { - const noop = () => {} - const request = (config.hooks && config.hooks.request) || noop - - return { request } + return client.normalizeConfig(config) } module.exports = [ diff --git a/packages/datadog-plugin-undici/src/index.js b/packages/datadog-plugin-undici/src/index.js new file mode 100644 index 00000000000..cab7709e11e --- /dev/null +++ b/packages/datadog-plugin-undici/src/index.js @@ -0,0 +1,291 @@ +'use strict' + +const url = require('url') +const log = require('../../dd-trace/src/log') +const tags = require('../../../ext/tags') +const kinds = require('../../../ext/kinds') +const formats = require('../../../ext/formats') +const analyticsSampler = require('../../dd-trace/src/analytics_sampler') +const { AsyncResource, AsyncLocalStorage } = require('async_hooks') +const { + addErrorToSpan, + getServiceName, + hasAmazonSignature, + client: { normalizeConfig } +} = require('../../dd-trace/src/plugins/util/web') + +const HTTP_HEADERS = formats.HTTP_HEADERS +const HTTP_STATUS_CODE = tags.HTTP_STATUS_CODE +const HTTP_REQUEST_HEADERS = tags.HTTP_REQUEST_HEADERS +const HTTP_RESPONSE_HEADERS = tags.HTTP_RESPONSE_HEADERS +const SPAN_KIND = tags.SPAN_KIND +const CLIENT = kinds.CLIENT + +const asyncLocalStorage = new AsyncLocalStorage() + +function parseHeaders (headers) { + const pairs = headers + .split('\r\n') + .map((r) => r.split(':').map((str) => str.trim())) + + const object = {} + for (const pair of pairs) { + const value = pair[1] + if (!value) { + continue + } + const key = pair[0].toLowerCase() + if (object[key]) { + if (!Array.isArray(object[key])) { + object[key] = [object[key], value] + } else { + object[key].push(value) + } + } else { + object[key] = value + } + } + return object +} +const channels = { + requestChannel: undefined, + headersChannel: undefined, + errorChannel: undefined +} + +function diagnostics (tracer, config) { + let diagnosticsChannel + try { + diagnosticsChannel = require('diagnostics_channel') + } catch (e) { + log.error( + "Unable to configure undici, cannot require 'diagnostics_channel'" + ) + return () => {} + } + + channels.requestChannel = diagnosticsChannel.channel('undici:request:create') + channels.headersChannel = diagnosticsChannel.channel( + 'undici:request:headers' + ) + channels.errorChannel = diagnosticsChannel.channel('undici:request:error') + + channels.requestChannel.subscribe(handleRequestCreate) + channels.errorChannel.subscribe(handleRequestError) + channels.headersChannel.subscribe(handleRequestHeaders) + + const requestSpansMap = new WeakMap() + + function handleRequestCreate ({ request }) { + const method = (request.method || 'GET').toUpperCase() + + const path = request.path ? request.path.split(/[?#]/)[0] : '/' + const uri = `${request.origin}${path}` + + const span = asyncLocalStorage.getStore() + if (span) { + span.addTags({ + 'resource.name': method, + 'span.type': 'http', + 'http.method': method, + 'http.url': uri, + 'service.name': getServiceName(tracer, config, request.origin) + }) + requestSpansMap.set(request, span) + } + + const headers = + typeof request.headers === 'string' + ? parseHeaders(request.headers) + : request.headers + + if ( + !( + hasAmazonSignature({ ...request, headers }) || + !config.propagationFilter(uri) + ) + ) { + const injectedHeaders = {} + tracer.inject(span, HTTP_HEADERS, injectedHeaders) + for (const [key, value] of Object.entries(injectedHeaders)) { + request.addHeader(key, value) + } + } + + analyticsSampler.sample(span, config.measured) + } + + function handleRequestError ({ request, error }) { + const span = requestSpansMap.get(request) + addErrorToSpan(span, error) + addRequestHeaders(request, span, config) + span.finish() + } + + function handleRequestHeaders ({ request, response }) { + const span = requestSpansMap.get(request) + addRequestHeaders(request, span, config) + setStatusCode(response, span, config) + config.hooks.request(span, request, response) + } + + return function unsubscribe () { + if (channels.requestChannel.hasSubscribers) { + channels.requestChannel.unsubscribe(handleRequestCreate) + } + if (channels.headersChannel.hasSubscribers) { + channels.headersChannel.unsubscribe(handleRequestHeaders) + } + if (channels.errorChannel.hasSubscribers) { + channels.errorChannel.unsubscribe(handleRequestError) + } + } +} + +function addRequestHeaders (req, span, config) { + const headers = parseHeaders(req.headers) + for (const [key, value] of Object.entries(headers)) { + span.setTag(`${HTTP_REQUEST_HEADERS}.${key}`, value) + } + + if (!headers.host) { + // req.servername holds the value of the host header + if (req.servername) { + span.setTag(`${HTTP_REQUEST_HEADERS}.host`, req.servername) + } else { + // Undici's host header are written directly + // to the stream, and not passed to the `Request` object + // This workaround ensure we set the host if + // it was not explicitely provided + const { hostname, port } = url.parse(req.origin) + const host = `${hostname}${port ? `:${port}` : ''}` + span.setTag(`${HTTP_REQUEST_HEADERS}.host`, host) + } + } +} + +function setStatusCode (res, span, config) { + // fetch has status set on `status` rather than statusCode + const statusCode = res.status || res.statusCode + span.setTag(HTTP_STATUS_CODE, statusCode) + + if (!config.validateStatus(statusCode)) { + span.setTag('error', 1) + } +} + +function addResponseHeaders (res, span, config) { + for (const key of config.headers) { + const value = res.headers[key] + if (value) { + span.setTag(`${HTTP_RESPONSE_HEADERS}.${key}`, value) + } + } +} + +function finishSpan (res, span, error, config) { + if (res) { + setStatusCode(res, span, config) + addResponseHeaders(res, span, config) + span.finish() + } else { + span.setTag('error', 1) + } +} + +function patch (undici, methodName, tracer, config) { + this.wrap(undici, methodName, (fn) => makeRequestTrace(fn)) + + function makeRequestTrace (request) { + return function requestTrace () { + // Bind the callback for async resources + if (arguments.length === 3) { + arguments[2] = AsyncResource.bind(arguments[2]) + } + const scope = tracer.scope() + const childOf = scope.active() + const span = tracer.startSpan(`undici.${methodName}`, { + childOf, + tags: { + [SPAN_KIND]: CLIENT + } + }) + const result = asyncLocalStorage.run(span, () => { + return tracer.scope().activate(span, () => { + return request.apply(this, arguments) + }) + }) + + if (methodName === 'pipeline') { + result.on('end', () => { + span.finish() + }).on('error', () => { + span.finish() + }) + return result + } + + return wrapPromise(result, span, config) + } + } +} + +function wrapPromise (promise, span, config) { + if (!promise) { + finishSpan(null, span, null, config) + return promise + } + + return promise + .then( + (res) => finishSpan(res, span, null, config), + (e) => finishSpan(null, span, e, config) + ) + .then(() => promise) +} + +module.exports = [ + { + name: 'undici', + versions: ['>=4.7.1'], + patch: function (undici, tracer, config) { + config = normalizeConfig(config) + + patch.call(this, undici, 'request', tracer, config) + patch.call(this, undici, 'upgrade', tracer, config) + patch.call(this, undici, 'connect', tracer, config) + if (undici.fetch) { + patch.call(this, undici, 'fetch', tracer, config) + } + patch.call(this, undici, 'pipeline', tracer, config) + patch.call(this, undici, 'stream', tracer, config) + + patch.call(this, undici.Client.prototype, 'request', tracer, config) + patch.call(this, undici.Client.prototype, 'pipeline', tracer, config) + patch.call(this, undici.Client.prototype, 'upgrade', tracer, config) + patch.call(this, undici.Client.prototype, 'connect', tracer, config) + patch.call(this, undici.Client.prototype, 'stream', tracer, config) + + const unpatchDiagnostics = diagnostics.call(this, tracer, config) + + this.unpatch = () => { + this.unwrap(undici, 'request') + this.unwrap(undici, 'upgrade') + this.unwrap(undici, 'connect') + if (undici.fetch) { + this.unwrap(undici, 'fetch') + } + this.unwrap(undici, 'pipeline') + this.unwrap(undici, 'stream') + + this.unwrap(undici.Client.prototype, 'request') + this.unwrap(undici.Client.prototype, 'pipeline') + this.unwrap(undici.Client.prototype, 'upgrade') + this.unwrap(undici.Client.prototype, 'connect') + this.unwrap(undici.Client.prototype, 'stream') + + unpatchDiagnostics() + } + } + } +] diff --git a/packages/datadog-plugin-undici/test/index.spec.js b/packages/datadog-plugin-undici/test/index.spec.js new file mode 100644 index 00000000000..b1e4f1cfed2 --- /dev/null +++ b/packages/datadog-plugin-undici/test/index.spec.js @@ -0,0 +1,1096 @@ +'use strict' + +const AbortController = require('abort-controller') +const getPort = require('get-port') +const agent = require('../../dd-trace/test/plugins/agent') +const tags = require('../../../ext/tags') +const { expect } = require('chai') + +const HTTP_REQUEST_HEADERS = tags.HTTP_REQUEST_HEADERS +const HTTP_RESPONSE_HEADERS = tags.HTTP_RESPONSE_HEADERS +const plugin = require('../src') +const { PassThrough, pipeline, Readable, Writable } = require('stream') + +describe('undici', () => { + let express + let undici + let appListener + let tracer + + withVersions(plugin, 'undici', (version) => { + function server (app, port, listener) { + const server = require('http').createServer(app) + server.listen(port, 'localhost', listener) + return server + } + + beforeEach(() => { + tracer = require('../../dd-trace') + appListener = null + }) + + afterEach(() => { + if (appListener) { + appListener.close() + } + return agent.close() + }) + + describe('automatic instrumentation', () => { + beforeEach(() => { + return agent.load('undici').then(() => { + undici = require(`../../../versions/undici@${version}`).get() + express = require('express') + }) + }) + + function withUndici (port) { + const path = `http://localhost:${port}/user` + + return { + request: () => undici.request(path), + pipeline: () => + pipeline( + new Readable({ + read () { + this.push(Buffer.from('undici')) + this.push(null) + } + }), + undici.pipeline(path, ({ body }) => { + return pipeline(body, new PassThrough(), () => {}) + }), + new PassThrough(), + _ => { + } + ), + upgrade: () => undici.upgrade(path), + connect: () => undici.connect(path), + fetch: () => undici.fetch(path), + stream: () => undici.stream(path, ({ opaque: { bufs } }) => { + return new Writable({ + write (chunk, _, callback) { + bufs.push(chunk) + callback() + } + }) + }) + } + } + + function withClient (port) { + const client = new undici.Client(`http://localhost:${port}`) + + return { + request: () => client.request({ path: '/user', method: 'GET' }), + pipeline: () => + pipeline( + new Readable({ + read () { + this.push(Buffer.from('undici')) + this.push(null) + } + }), + client.pipeline({ path: `/user`, method: 'GET' }, ({ body }) => { + return pipeline(body, new PassThrough(), () => {}) + }), + new PassThrough(), + _ => { + client.close() + } + ), + upgrade: () => client.upgrade({ path: '/user', method: 'GET' }), + connect: () => client.connect({ path: '/user', method: 'CONNECT' }), + stream: () => client.stream({ path: '/user', method: 'GET' }, ({ opaque: { bufs } }) => { + return new Writable({ + write (chunk, _, callback) { + bufs.push(chunk) + callback() + } + }) + }) + } + } + + function verifyTraces (port) { + return function (traces, options = { method: 'GET', expectStatusCode: '200' }) { + expect(traces[0][0]).to.have.property( + 'service', + 'test-http-client' + ) + expect(traces[0][0]).to.have.property('type', 'http') + expect(traces[0][0]).to.have.property('resource', options.method) + expect(traces[0][0].meta).to.have.property('span.kind', 'client') + expect(traces[0][0].meta).to.have.property( + 'http.url', + `http://localhost:${port}/user` + ) + expect(traces[0][0].meta).to.have.property('http.method', options.method) + if (options.expectStatusCode) { + expect(traces[0][0].meta).to.have.property( + 'http.status_code', + '200' + ) + } + } + } + + function setup (handler, done, options) { + const app = express() + + app.get('/user', (_, res) => { + res.status(200).send() + }) + + app.connect('/user', (_, res) => { + res.status(200).send() + }) + + getPort().then((port) => { + agent + .use((traces) => { + verifyTraces(port)(traces, options) + }) + .then(done) + .catch(done) + + appListener = server(app, port, () => { + handler(port)() + }) + }) + } + + describe('when using client', () => { + it('should do automatic instrumentation for request', (done) => { + setup((port) => withClient(port).request, done) + }) + + it('should do automatic instrumentation for upgrade', (done) => { + setup((port) => withClient(port).upgrade, done) + }) + + it('should do automatic instrumentation for pipeline', (done) => { + setup((port) => withClient(port).pipeline, done, { method: 'GET', expectStatusCode: '200' }) + }) + it('should do automatic instrumentation for connect', (done) => { + setup((port) => withClient(port).connect, done, { method: 'CONNECT' }) + }) + it('should do automatic instrumentation for stream', (done) => { + setup((port) => withClient(port).stream, done) + }) + }) + + describe('when using undici', () => { + it('should do automatic instrumentation for request', (done) => { + setup((port) => withUndici(port).request, done) + }) + + it('should do automatic instrumentation for upgrade', (done) => { + setup((port) => withUndici(port).upgrade, done) + }) + + it('should do automatic instrumentation for pipeline', (done) => { + setup((port) => withUndici(port).pipeline, done, { method: 'GET', expectStatusCode: '200' }) + }) + it('should do automatic instrumentation for connect', (done) => { + setup((port) => withUndici(port).connect, done, { method: 'CONNECT' }) + }) + it('should do automatic instrumentation for stream', (done) => { + setup((port) => withUndici(port).stream, done) + }) + + it('should do automatic instrumentation for fetch', (done) => { + // fetch is not available on node 14 + if (undici.fetch) { + setup((port) => withUndici(port).fetch, done) + } else { + done() + } + }) + }) + }) + + describe('without configuration', () => { + beforeEach(() => { + return agent.load('undici').then(() => { + undici = require(`../../../versions/undici@${version}`).get() + express = require('express') + }) + }) + + it('should do automatic instrumentation', (done) => { + const app = express() + + app.get('/user', (req, res) => { + res.status(200).send() + }) + + getPort().then((port) => { + agent + .use((traces) => { + expect(traces[0][0]).to.have.property( + 'service', + 'test-http-client' + ) + expect(traces[0][0]).to.have.property('type', 'http') + expect(traces[0][0]).to.have.property('resource', 'GET') + expect(traces[0][0].meta).to.have.property('span.kind', 'client') + expect(traces[0][0].meta).to.have.property( + 'http.url', + `http://localhost:${port}/user` + ) + expect(traces[0][0].meta).to.have.property('http.method', 'GET') + expect(traces[0][0].meta).to.have.property( + 'http.status_code', + '200' + ) + }) + .then(done) + .catch(done) + + appListener = server(app, port, () => { + undici.request(`http://localhost:${port}/user`) + }) + }) + }) + + it('should support configuration as an URL object', (done) => { + const app = express() + + app.get('/user', (req, res) => { + res.status(200).send() + }) + + getPort().then((port) => { + agent + .use((traces) => { + expect(traces[0][0].meta).to.have.property( + 'http.url', + `http://localhost:${port}/user` + ) + }) + .then(done) + .catch(done) + + const uri = { + protocol: `http:`, + hostname: 'localhost', + port, + path: '/user' + } + + appListener = server(app, port, () => { + undici.request(uri) + }) + }) + }) + + it('should remove the query string from the URL', (done) => { + const app = express() + + app.get('/user', (req, res) => { + res.status(200).send() + }) + + getPort().then((port) => { + agent + .use((traces) => { + expect(traces[0][0].meta).to.have.property( + 'http.status_code', + '200' + ) + expect(traces[0][0].meta).to.have.property( + 'http.url', + `http://localhost:${port}/user` + ) + }) + .then(done) + .catch(done) + + appListener = server(app, port, () => { + undici.request(`http://localhost:${port}/user?foo=bar`) + }) + }) + }) + + it('should support configuration as an WHATWG URL object', async () => { + const app = express() + const port = await getPort() + const url = new URL(`http://localhost:${port}/user`) + + app.get('/user', (req, res) => res.status(200).send()) + + appListener = server(app, port, () => { + undici.request(url) + }) + + await agent.use((traces) => { + expect(traces[0][0].meta).to.have.property('http.status_code', '200') + expect(traces[0][0].meta).to.have.property( + 'http.url', + `http://localhost:${port}/user` + ) + }) + }) + + it('should not require consuming the data', (done) => { + const app = express() + + app.get('/user', (req, res) => { + res.status(200).send() + }) + + getPort().then((port) => { + agent + .use((traces) => { + expect(traces[0][0]).to.not.be.undefined + }) + .then(done) + .catch(done) + + appListener = server(app, port, () => { + undici.request(`http://localhost:${port}/user`) + }) + }) + }) + + it('should inject its parent span in the headers', (done) => { + const app = express() + + app.get('/user', (req, res) => { + expect(req.get('x-datadog-trace-id')).to.be.a('string') + expect(req.get('x-datadog-parent-id')).to.be.a('string') + + res.status(200).send() + }) + + getPort().then((port) => { + agent + .use((traces) => { + expect(traces[0][0].meta).to.have.property( + 'http.status_code', + '200' + ) + }) + .then(done) + .catch(done) + + appListener = server(app, port, () => { + undici.request(`http://localhost:${port}/user`) + }) + }) + }) + + it('should skip injecting if the Authorization header contains an AWS signature', (done) => { + const app = express() + + app.get('/', (req, res) => { + try { + expect(req.get('x-datadog-trace-id')).to.be.undefined + expect(req.get('x-datadog-parent-id')).to.be.undefined + + res.status(200).send() + + done() + } catch (e) { + done(e) + } + }) + + getPort().then((port) => { + appListener = server(app, port, () => { + undici.request(`http://localhost:${port}/`, { + headers: { + Authorization: 'AWS4-HMAC-SHA256 ...' + } + }) + }) + }) + }) + + it('should skip injecting if one of the Authorization headers contains an AWS signature', (done) => { + const app = express() + + app.get('/', (req, res) => { + try { + expect(req.get('x-datadog-trace-id')).to.be.undefined + expect(req.get('x-datadog-parent-id')).to.be.undefined + + res.status(200).send() + + done() + } catch (e) { + done(e) + } + }) + + getPort().then((port) => { + appListener = server(app, port, () => { + undici.request(`http://localhost:${port}/`, { + headers: { + Authorization: 'AWS4-HMAC-SHA256 ...' + } + }) + }) + }) + }) + + it('should skip injecting if the X-Amz-Signature header is set', (done) => { + const app = express() + + app.get('/', (req, res) => { + try { + expect(req.get('x-datadog-trace-id')).to.be.undefined + expect(req.get('x-datadog-parent-id')).to.be.undefined + + res.status(200).send() + + done() + } catch (e) { + done(e) + } + }) + + getPort().then((port) => { + appListener = server(app, port, () => { + undici.request(`http://localhost:${port}/`, { + headers: { + Authorization: 'AWS4-HMAC-SHA256 ...' + } + }) + }) + }) + }) + + it('should skip injecting if the X-Amz-Signature query param is set', (done) => { + const app = express() + + app.get('/', (req, res) => { + try { + expect(req.get('x-datadog-trace-id')).to.be.undefined + expect(req.get('x-datadog-parent-id')).to.be.undefined + + res.status(200).send() + + done() + } catch (e) { + done(e) + } + }) + + getPort().then((port) => { + appListener = server(app, port, () => { + undici.request(`http://localhost:${port}/?X-Amz-Signature=abc123`, { + headers: { + Authorization: 'AWS4-HMAC-SHA256 ...' + } + }) + }) + }) + }) + + // TODO: the callbacks is run after the scope ends + it.skip('should run the callback in the parent context', (done) => { + if (process.env.DD_CONTEXT_PROPAGATION === 'false') return done() + + const app = express() + + app.get('/user', (req, res) => { + res.status(200).send('OK') + }) + + getPort().then((port) => { + appListener = server(app, port, () => { + undici.request(`http://localhost:${port}/user`, (res) => { + expect(tracer.scope().active()).to.be.null + done() + }) + }) + }) + }) + + // TODO: There is no event listener yet + it.skip('should run the event listeners in the parent context', (done) => { + if (process.env.DD_CONTEXT_PROPAGATION === 'false') return done() + + const app = express() + + app.get('/user', (req, res) => { + res.status(200).send('OK') + }) + + getPort().then((port) => { + appListener = server(app, port, () => { + undici.request(`http://localhost:${port}/user`, (res) => { + done() + }) + }) + }) + }) + + it('should handle connection errors', (done) => { + getPort().then((port) => { + let error + + agent + .use((traces) => { + expect(traces[0][0].meta).to.have.property( + 'error.type', + error.name + ) + expect(traces[0][0].meta).to.have.property( + 'error.msg', + error.message + ) + expect(traces[0][0].meta).to.have.property( + 'error.stack', + error.stack + ) + }) + .then(done) + .catch(done) + + undici.request(`http://localhost:${port}/user`, (err) => { + error = err + }) + }) + }) + + it('should not record HTTP 5XX responses as errors by default', (done) => { + const app = express() + + app.get('/user', (req, res) => { + res.status(500).send() + }) + + getPort().then((port) => { + agent + .use((traces) => { + expect(traces[0][0]).to.have.property('error', 0) + }) + .then(done) + .catch(done) + + appListener = server(app, port, () => { + undici.request(`http://localhost:${port}/user`) + }) + }) + }) + + it('should record HTTP 4XX responses as errors by default', (done) => { + const app = express() + + app.get('/user', (req, res) => { + res.status(400).send() + }) + + getPort().then((port) => { + agent + .use((traces) => { + expect(traces[0][0]).to.have.property('error', 1) + }) + .then(done) + .catch(done) + + appListener = server(app, port, () => { + undici.request(`http://localhost:${port}/user`) + }) + }) + }) + + it('should record destroyed requests as errors', (done) => { + const app = express() + + app.get('/user', (req, res) => {}) + + getPort().then((port) => { + let error + + agent + .use((traces) => { + expect(traces[0][0]).to.have.property('error', 1) + expect(traces[0][0].meta).to.have.property( + 'error.msg', + error.message + ) + expect(traces[0][0].meta).to.have.property( + 'error.type', + error.name + ) + expect(traces[0][0].meta).to.have.property( + 'error.stack', + error.stack + ) + expect(traces[0][0].meta).to.not.have.property( + 'http.status_code' + ) + }) + .then(done) + .catch(done) + + appListener = server(app, port, () => { + const client = new undici.Client(`http://localhost:${port}`) + client.request( + { + path: '/user', + method: 'GET' + }, + (err, data) => { + error = err + } + ) + client.destroy() + }) + }) + }) + + it('should record aborted requests as errors', (done) => { + const app = express() + + app.get('/user', (req, res) => {}) + + getPort().then((port) => { + agent + .use((traces) => { + expect(traces[0][0]).to.have.property('error', 1) + expect(traces[0][0].meta).to.not.have.property( + 'http.status_code' + ) + }) + .then(done) + .catch(done) + + appListener = server(app, port, () => { + const abort = new AbortController() + undici.request(`http://localhost:${port}/user`, { + signal: abort.signal + }) + abort.abort() + }) + }) + }) + + // TODO: Get timeout working + it.skip('should record timeouts as errors', (done) => { + const app = express() + + app.get('/user', (req, res) => {}) + + getPort().then((port) => { + agent + .use((traces) => { + expect(traces[0][0]).to.have.property('error', 1) + expect(traces[0][0].meta).to.not.have.property( + 'http.status_code' + ) + }) + .then(done) + .catch(done) + + appListener = server(app, port, () => { + undici.request(`http://localhost:${port}/user`, { + bodyTimeout: 0, + headersTimeout: 0 + }) + }) + }) + }) + + it('should record when the request was aborted', (done) => { + const app = express() + + app.get('/abort', (req, res) => { + res.status(200).send() + }) + + getPort().then((port) => { + agent + .use((traces) => { + expect(traces[0][0]).to.have.property( + 'service', + 'test-http-client' + ) + }) + .then(done) + .catch(done) + + appListener = server(app, port, () => { + const ac = new AbortController() + undici.request(`http://localhost:${port}/abort`, { + signal: ac.signal + }) + + ac.abort() + }) + }) + }) + + /// Undici is not the client making requests to the agent + // this seems irrelevant at that time + xit('should skip requests to the agent', (done) => { + const app = express() + + app.get('/user', (req, res) => { + res.status(200).send() + }) + + getPort().then((port) => { + const timer = setTimeout(done, 100) + + agent.use((traces) => { + done(new Error('Request to the agent was traced.')) + clearTimeout(timer) + }) + + appListener = server(app, port, () => { + undici.request(tracer._tracer._url.href) + }) + }) + }) + }) + + describe('with service configuration', () => { + let config + + beforeEach(() => { + config = { + service: 'custom' + } + + return agent.load('undici', config).then(() => { + undici = require(`../../../versions/undici@${version}`).get() + express = require('express') + }) + }) + + it('should be configured with the correct values', (done) => { + const app = express() + + app.get('/user', (req, res) => { + res.status(200).send() + }) + + getPort().then((port) => { + agent + .use((traces) => { + expect(traces[0][0]).to.have.property('service', 'custom') + }) + .then(done) + .catch(done) + + appListener = server(app, port, () => { + undici.request(`http://localhost:${port}/user`) + }) + }) + }) + }) + + describe('with validateStatus configuration', () => { + let config + + beforeEach(() => { + config = { + validateStatus: (status) => status < 500 + } + + return agent.load('undici', config).then(() => { + undici = require(`../../../versions/undici@${version}`).get() + express = require('express') + }) + }) + + it('should use the supplied function to decide if a response is an error', (done) => { + const app = express() + + app.get('/user', (req, res) => { + res.status(500).send() + }) + + getPort().then((port) => { + agent + .use((traces) => { + expect(traces[0][0]).to.have.property('error', 1) + }) + .then(done) + .catch(done) + + appListener = server(app, port, () => { + undici.request(`http://localhost:${port}/user`) + }) + }) + }) + }) + + describe('with splitByDomain configuration', () => { + let config + + beforeEach(() => { + config = { + splitByDomain: true + } + return agent.load('undici', config).then(() => { + undici = require(`../../../versions/undici@${version}`).get() + express = require('express') + }) + }) + + it('should use the remote endpoint as the service name', (done) => { + const app = express() + + app.get('/user', (req, res) => { + res.status(200).send() + }) + + getPort().then((port) => { + agent + .use((traces) => { + expect(traces[0][0]).to.have.property( + 'service', + `localhost:${port}` + ) + }) + .then(done) + .catch(done) + + appListener = server(app, port, () => { + undici.request(`http://localhost:${port}/user`) + }) + }) + }) + }) + + describe('with headers configuration', () => { + let config + + beforeEach(() => { + config = { + headers: ['host', 'x-foo'] + } + return agent.load('undici', config).then(() => { + undici = require(`../../../versions/undici@${version}`).get() + express = require('express') + }) + }) + + describe('host header', () => { + // TODO: Injected headers are not available yet + // for request + it('should add tags for the host header', (done) => { + const app = express() + + app.get('/user', (req, res) => { + res.setHeader('x-foo', 'bar') + res.status(200).send() + }) + + getPort().then((port) => { + agent + .use((traces) => { + const meta = traces[0][0].meta + expect(meta).to.have.property( + `${HTTP_REQUEST_HEADERS}.host`, + `localhost:${port}` + ) + }) + .then(done) + .catch(done) + + appListener = server(app, port, () => { + undici.request(`http://localhost:${port}/user`) + }) + }) + }) + + it('should add tags for the host header through Client', (done) => { + const app = express() + + app.get('/user', (req, res) => { + res.setHeader('x-foo', 'bar') + res.status(200).send() + }) + + getPort().then((port) => { + agent + .use((traces) => { + const meta = traces[0][0].meta + expect(meta).to.have.property( + `${HTTP_REQUEST_HEADERS}.host`, + `localhost:${port}` + ) + }) + .then(done) + .catch(done) + + appListener = server(app, port, () => { + const client = new undici.Client(`http://localhost:${port}`) + + client.request({ + path: '/user', + method: 'GET' + }) + }) + }) + }) + + it('should pass overwritten host header', (done) => { + const app = express() + + app.get('/user', (req, res) => { + res.setHeader('x-foo', 'bar') + res.status(200).send() + }) + + getPort().then((port) => { + agent + .use((traces) => { + const meta = traces[0][0].meta + expect(meta).to.have.property( + `${HTTP_REQUEST_HEADERS}.host`, + `my-service` + ) + }) + .then(done) + .catch(done) + + appListener = server(app, port, () => { + undici.request(`http://localhost:${port}/user`, { + headers: { + host: 'my-service' + } + }) + }) + }) + }) + }) + + it('should add tags for the configured headers', (done) => { + const app = express() + + app.get('/user', (req, res) => { + res.setHeader('x-foo', 'bar') + res.status(200).send() + }) + + getPort().then((port) => { + agent + .use((traces) => { + const meta = traces[0][0].meta + expect(meta).to.have.property( + `${HTTP_RESPONSE_HEADERS}.x-foo`, + 'bar' + ) + }) + .then(done) + .catch(done) + + appListener = server(app, port, () => { + undici.request(`http://localhost:${port}/user`) + }) + }) + }) + + it('should support adding request headers', (done) => { + const app = express() + + app.get('/user', (req, res) => { + res.status(200).send() + }) + + getPort().then((port) => { + agent + .use((traces) => { + const meta = traces[0][0].meta + expect(meta).to.have.property( + `${HTTP_REQUEST_HEADERS}.x-foo`, + `bar` + ) + }) + .then(done) + .catch(done) + + appListener = server(app, port, () => { + undici.request(`http://localhost:${port}/user`, { + headers: { 'x-foo': 'bar' } + }) + }) + }) + }) + }) + + describe('with hooks configuration', () => { + let config + + beforeEach(() => { + config = { + hooks: { + request: (span, req, res) => { + span.setTag('resource.name', `${req.method} -- ${req.path}`) + } + } + } + return agent.load('undici', config).then(() => { + undici = require(`../../../versions/undici@${version}`).get() + express = require('express') + }) + }) + + it('should run the request hook before the span is finished', (done) => { + const app = express() + + app.get('/user', (req, res) => { + res.status(200).send() + }) + + getPort().then((port) => { + agent + .use((traces) => { + expect(traces[0][0]).to.have.property('resource', 'GET -- /user') + }) + .then(done) + .catch(done) + + appListener = server(app, port, () => { + undici.request(`http://localhost:${port}/user`) + }) + }) + }) + }) + + describe('with propagationBlocklist configuration', () => { + let config + + beforeEach(() => { + config = { + propagationBlocklist: [/\/users/] + } + return agent.load('undici', config).then(() => { + undici = require(`../../../versions/undici@${version}`).get() + express = require('express') + }) + }) + + it('should skip injecting if the url matches an item in the propagationBlacklist', (done) => { + const app = express() + + app.get('/users', (req, res) => { + try { + expect(req.get('x-datadog-trace-id')).to.be.undefined + expect(req.get('x-datadog-parent-id')).to.be.undefined + + res.status(200).send() + + done() + } catch (e) { + done(e) + } + }) + + getPort().then((port) => { + appListener = server(app, port, () => { + undici.request(`http://localhost:${port}/users`) + }) + }) + }) + }) + }) +}) diff --git a/packages/dd-trace/src/plugins/index.js b/packages/dd-trace/src/plugins/index.js index 3871bb9032b..e4c40d99b7c 100644 --- a/packages/dd-trace/src/plugins/index.js +++ b/packages/dd-trace/src/plugins/index.js @@ -48,5 +48,6 @@ module.exports = { 'router': require('../../../datadog-plugin-router/src'), 'sharedb': require('../../../datadog-plugin-sharedb/src'), 'tedious': require('../../../datadog-plugin-tedious/src'), + 'undici': require('../../../datadog-plugin-undici/src'), 'winston': require('../../../datadog-plugin-winston/src') } diff --git a/packages/dd-trace/src/plugins/util/web.js b/packages/dd-trace/src/plugins/util/web.js index 1a76bf039f8..ee32774ab92 100644 --- a/packages/dd-trace/src/plugins/util/web.js +++ b/packages/dd-trace/src/plugins/util/web.js @@ -1,5 +1,6 @@ 'use strict' +const url = require('url') const uniq = require('lodash.uniq') const analyticsSampler = require('../../analytics_sampler') const FORMAT_HTTP_HEADERS = require('opentracing').FORMAT_HTTP_HEADERS @@ -29,6 +30,21 @@ const HTTP2_HEADER_AUTHORITY = ':authority' const HTTP2_HEADER_SCHEME = ':scheme' const HTTP2_HEADER_PATH = ':path' +function getHost (options) { + if (typeof options === 'string') { + return url.parse(options).host + } + + const hostname = options.hostname || options.host || 'localhost' + const port = options.port + + return [hostname, port].filter(val => val).join(':') +} + +function startsWith (searchString) { + return (value) => String(value).startsWith(searchString) +} + const web = { // Ensure the configuration has the correct structure and defaults. normalizeConfig (config) { @@ -49,6 +65,22 @@ const web = { }) }, + client: { + normalizeConfig (config) { + const validateStatus = getClientStatusValidator(config) + const propagationFilter = getFilter({ blocklist: config.propagationBlocklist }) + const headers = getHeaders(config) + const hooks = getHooks(config) + + return Object.assign({}, config, { + validateStatus, + propagationFilter, + headers, + hooks + }) + } + }, + // Start a span and activate a scope for a request. instrument (tracer, config, req, res, name, callback) { this.patch(req) @@ -144,6 +176,48 @@ const web = { } }, + getServiceName (tracer, config, options) { + if (config.splitByDomain) { + return getHost(options) + } else if (config.service) { + return config.service + } + + return `${tracer._service}-http-client` + }, + + addErrorToSpan (span, error) { + span.addTags({ + 'error.type': error.name, + 'error.msg': error.message, + 'error.stack': error.stack + }) + return error + }, + + hasAmazonSignature (options) { + if (!options) { + return false + } + + if (options.headers) { + const headers = Object.keys(options.headers) + .reduce((prev, next) => Object.assign(prev, { + [next.toLowerCase()]: options.headers[next] + }), {}) + + if (headers['x-amz-signature']) { + return true + } + + if ([].concat(headers['authorization']).some(startsWith('AWS4-HMAC-SHA256'))) { + return true + } + } + + return options.path && options.path.toLowerCase().indexOf('x-amz-signature=') !== -1 + }, + // Register a callback to run before res.end() is called. beforeEnd (req, callback) { req._datadog.beforeEnd.push(callback) @@ -449,6 +523,15 @@ function getStatusValidator (config) { return code => code < 500 } +function getClientStatusValidator (config) { + if (typeof config.validateStatus === 'function') { + return config.validateStatus + } else if (config.hasOwnProperty('validateStatus')) { + log.error('Expected `validateStatus` to be a function.') + } + return code => code < 400 || code >= 500 +} + function getHooks (config) { const noop = () => {} const request = (config.hooks && config.hooks.request) || noop @@ -466,4 +549,20 @@ function getMiddlewareSetting (config) { return true } +function getFilter (config) { + config = Object.assign({}, config, { + blocklist: config.blocklist || [] + }) + + return urlFilter.getFilter(config) +} + +function getHeaders (config) { + if (!Array.isArray(config.headers)) return [] + + return config.headers + .filter(key => typeof key === 'string') + .map(key => key.toLowerCase()) +} + module.exports = web diff --git a/packages/dd-trace/test/plugins/externals.json b/packages/dd-trace/test/plugins/externals.json index a1f8b4aef5c..6a68f193bf8 100644 --- a/packages/dd-trace/test/plugins/externals.json +++ b/packages/dd-trace/test/plugins/externals.json @@ -140,5 +140,11 @@ "name": "q", "versions": ["2"] } + ], + "undici": [ + { + "name": "undici", + "versions": [">=4.7.1"] + } ] }