From f50f36616ba25c63c42ae68e002c75c9bdce554b Mon Sep 17 00:00:00 2001 From: Florent Vilmart Date: Tue, 19 Oct 2021 11:40:09 -0400 Subject: [PATCH] ensure we keep refs to the channels --- packages/datadog-plugin-undici/src/index.js | 37 +- .../datadog-plugin-undici/test/index.spec.js | 378 ++++++++++-------- 2 files changed, 243 insertions(+), 172 deletions(-) diff --git a/packages/datadog-plugin-undici/src/index.js b/packages/datadog-plugin-undici/src/index.js index e219f500cc1..0d73aca1c01 100644 --- a/packages/datadog-plugin-undici/src/index.js +++ b/packages/datadog-plugin-undici/src/index.js @@ -53,30 +53,37 @@ function parseHeaders (headers) { }) return object } +const channels = { + requestChannel: undefined, + headersChannel: undefined, + errorChannel: undefined +} function diagnostics (tracer, config) { let diagnosticsChannel try { diagnosticsChannel = require('diagnostics_channel') } catch (e) { - log.error("Unable to configure undici, cannot require 'diagnostics_channel'") + log.error( + "Unable to configure undici, cannot require 'diagnostics_channel'" + ) return () => {} } config = normalizeConfig(tracer, config) - const requestChannel = diagnosticsChannel.channel('undici:request:create') - const headersChannel = diagnosticsChannel.channel('undici:request:headers') - const requestErrorChannel = diagnosticsChannel.channel( - 'undici:request:error' + channels.requestChannel = diagnosticsChannel.channel('undici:request:create') + channels.headersChannel = diagnosticsChannel.channel( + 'undici:request:headers' ) + channels.errorChannel = diagnosticsChannel.channel('undici:request:error') + + channels.requestChannel.subscribe(handleRequestCreate) + channels.errorChannel.subscribe(handleRequestError) + channels.headersChannel.subscribe(handleRequestHeaders) // We use a weakmap here to store the request / spans const requestSpansMap = new WeakMap() - requestChannel.subscribe(handleRequestCreate) - requestErrorChannel.subscribe(handleRequestError) - headersChannel.subscribe(handleRequestHeaders) - function handleRequestCreate ({ request }) { const method = (request.method || 'GET').toUpperCase() @@ -123,14 +130,14 @@ function diagnostics (tracer, config) { } return function unsubscribe () { - if (requestChannel.hasSubscribers) { - requestChannel.unsubscribe(handleRequestCreate) + if (channels.requestChannel.hasSubscribers) { + channels.requestChannel.unsubscribe(handleRequestCreate) } - if (headersChannel.hasSubscribers) { - headersChannel.unsubscribe(handleRequestHeaders) + if (channels.headersChannel.hasSubscribers) { + channels.headersChannel.unsubscribe(handleRequestHeaders) } - if (requestErrorChannel.hasSubscribers) { - requestErrorChannel.unsubscribe(handleRequestError) + if (channels.errorChannel.hasSubscribers) { + channels.errorChannel.unsubscribe(handleRequestError) } } } diff --git a/packages/datadog-plugin-undici/test/index.spec.js b/packages/datadog-plugin-undici/test/index.spec.js index 6c1c09f6023..80e76fd7f5e 100644 --- a/packages/datadog-plugin-undici/test/index.spec.js +++ b/packages/datadog-plugin-undici/test/index.spec.js @@ -10,15 +10,13 @@ const HTTP_REQUEST_HEADERS = tags.HTTP_REQUEST_HEADERS const HTTP_RESPONSE_HEADERS = tags.HTTP_RESPONSE_HEADERS const plugin = require('../src') -wrapIt() - describe('undici', () => { let express let undici let appListener let tracer - withVersions(plugin, 'undici', version => { + withVersions(plugin, 'undici', (version) => { function server (app, port, listener) { const server = require('http').createServer(app) server.listen(port, 'localhost', listener) @@ -39,30 +37,38 @@ describe('undici', () => { describe('without configuration', () => { beforeEach(() => { - return agent.load('undici') - .then(() => { - undici = require(`../../../versions/undici@${version}`).get() - express = require('express') - }) + return agent.load('undici').then(() => { + undici = require(`../../../versions/undici@${version}`).get() + express = require('express') + }) }) - it('should do automatic instrumentation', done => { + it('should do automatic instrumentation', (done) => { const app = express() app.get('/user', (req, res) => { res.status(200).send() }) - getPort().then(port => { + getPort().then((port) => { agent - .use(traces => { - expect(traces[0][0]).to.have.property('service', 'test-http-client') + .use((traces) => { + expect(traces[0][0]).to.have.property( + 'service', + 'test-http-client' + ) expect(traces[0][0]).to.have.property('type', 'http') expect(traces[0][0]).to.have.property('resource', 'GET') expect(traces[0][0].meta).to.have.property('span.kind', 'client') - expect(traces[0][0].meta).to.have.property('http.url', `http://localhost:${port}/user`) + expect(traces[0][0].meta).to.have.property( + 'http.url', + `http://localhost:${port}/user` + ) expect(traces[0][0].meta).to.have.property('http.method', 'GET') - expect(traces[0][0].meta).to.have.property('http.status_code', '200') + expect(traces[0][0].meta).to.have.property( + 'http.status_code', + '200' + ) }) .then(done) .catch(done) @@ -73,17 +79,20 @@ describe('undici', () => { }) }) - it('should support configuration as an URL object', done => { + it('should support configuration as an URL object', (done) => { const app = express() app.get('/user', (req, res) => { res.status(200).send() }) - getPort().then(port => { + getPort().then((port) => { agent - .use(traces => { - expect(traces[0][0].meta).to.have.property('http.url', `http://localhost:${port}/user`) + .use((traces) => { + expect(traces[0][0].meta).to.have.property( + 'http.url', + `http://localhost:${port}/user` + ) }) .then(done) .catch(done) @@ -101,18 +110,24 @@ describe('undici', () => { }) }) - it('should remove the query string from the URL', done => { + it('should remove the query string from the URL', (done) => { const app = express() app.get('/user', (req, res) => { res.status(200).send() }) - getPort().then(port => { + getPort().then((port) => { agent - .use(traces => { - expect(traces[0][0].meta).to.have.property('http.status_code', '200') - expect(traces[0][0].meta).to.have.property('http.url', `http://localhost:${port}/user`) + .use((traces) => { + expect(traces[0][0].meta).to.have.property( + 'http.status_code', + '200' + ) + expect(traces[0][0].meta).to.have.property( + 'http.url', + `http://localhost:${port}/user` + ) }) .then(done) .catch(done) @@ -134,22 +149,25 @@ describe('undici', () => { undici.request(url) }) - await agent.use(traces => { + await agent.use((traces) => { expect(traces[0][0].meta).to.have.property('http.status_code', '200') - expect(traces[0][0].meta).to.have.property('http.url', `http://localhost:${port}/user`) + expect(traces[0][0].meta).to.have.property( + 'http.url', + `http://localhost:${port}/user` + ) }) }) - it('should not require consuming the data', done => { + it('should not require consuming the data', (done) => { const app = express() app.get('/user', (req, res) => { res.status(200).send() }) - getPort().then(port => { + getPort().then((port) => { agent - .use(traces => { + .use((traces) => { expect(traces[0][0]).to.not.be.undefined }) .then(done) @@ -161,7 +179,7 @@ describe('undici', () => { }) }) - it('should inject its parent span in the headers', done => { + it('should inject its parent span in the headers', (done) => { const app = express() app.get('/user', (req, res) => { @@ -171,10 +189,13 @@ describe('undici', () => { res.status(200).send() }) - getPort().then(port => { + getPort().then((port) => { agent - .use(traces => { - expect(traces[0][0].meta).to.have.property('http.status_code', '200') + .use((traces) => { + expect(traces[0][0].meta).to.have.property( + 'http.status_code', + '200' + ) }) .then(done) .catch(done) @@ -185,7 +206,7 @@ describe('undici', () => { }) }) - it('should skip injecting if the Authorization header contains an AWS signature', done => { + it('should skip injecting if the Authorization header contains an AWS signature', (done) => { const app = express() app.get('/', (req, res) => { @@ -201,7 +222,7 @@ describe('undici', () => { } }) - getPort().then(port => { + getPort().then((port) => { appListener = server(app, port, () => { undici.request(`http://localhost:${port}/`, { headers: { @@ -212,7 +233,7 @@ describe('undici', () => { }) }) - it('should skip injecting if one of the Authorization headers contains an AWS signature', done => { + it('should skip injecting if one of the Authorization headers contains an AWS signature', (done) => { const app = express() app.get('/', (req, res) => { @@ -228,7 +249,7 @@ describe('undici', () => { } }) - getPort().then(port => { + getPort().then((port) => { appListener = server(app, port, () => { undici.request(`http://localhost:${port}/`, { headers: { @@ -239,7 +260,7 @@ describe('undici', () => { }) }) - it('should skip injecting if the X-Amz-Signature header is set', done => { + it('should skip injecting if the X-Amz-Signature header is set', (done) => { const app = express() app.get('/', (req, res) => { @@ -255,7 +276,7 @@ describe('undici', () => { } }) - getPort().then(port => { + getPort().then((port) => { appListener = server(app, port, () => { undici.request(`http://localhost:${port}/`, { headers: { @@ -266,7 +287,7 @@ describe('undici', () => { }) }) - it('should skip injecting if the X-Amz-Signature query param is set', done => { + it('should skip injecting if the X-Amz-Signature query param is set', (done) => { const app = express() app.get('/', (req, res) => { @@ -282,7 +303,7 @@ describe('undici', () => { } }) - getPort().then(port => { + getPort().then((port) => { appListener = server(app, port, () => { undici.request(`http://localhost:${port}/?X-Amz-Signature=abc123`, { headers: { @@ -294,7 +315,7 @@ describe('undici', () => { }) // TODO: the callbacks is run after the scope ends - it.skip('should run the callback in the parent context', done => { + it.skip('should run the callback in the parent context', (done) => { if (process.env.DD_CONTEXT_PROPAGATION === 'false') return done() const app = express() @@ -303,9 +324,9 @@ describe('undici', () => { res.status(200).send('OK') }) - getPort().then(port => { + getPort().then((port) => { appListener = server(app, port, () => { - undici.request(`http://localhost:${port}/user`, res => { + undici.request(`http://localhost:${port}/user`, (res) => { expect(tracer.scope().active()).to.be.null done() }) @@ -314,7 +335,7 @@ describe('undici', () => { }) // TODO: There is no event listener yet - it.skip('should run the event listeners in the parent context', done => { + it.skip('should run the event listeners in the parent context', (done) => { if (process.env.DD_CONTEXT_PROPAGATION === 'false') return done() const app = express() @@ -323,24 +344,33 @@ describe('undici', () => { res.status(200).send('OK') }) - getPort().then(port => { + getPort().then((port) => { appListener = server(app, port, () => { - undici.request(`http://localhost:${port}/user`, res => { + undici.request(`http://localhost:${port}/user`, (res) => { done() }) }) }) }) - it('should handle connection errors', done => { - getPort().then(port => { + it('should handle connection errors', (done) => { + getPort().then((port) => { let error agent - .use(traces => { - expect(traces[0][0].meta).to.have.property('error.type', error.name) - expect(traces[0][0].meta).to.have.property('error.msg', error.message) - expect(traces[0][0].meta).to.have.property('error.stack', error.stack) + .use((traces) => { + expect(traces[0][0].meta).to.have.property( + 'error.type', + error.name + ) + expect(traces[0][0].meta).to.have.property( + 'error.msg', + error.message + ) + expect(traces[0][0].meta).to.have.property( + 'error.stack', + error.stack + ) }) .then(done) .catch(done) @@ -351,16 +381,16 @@ describe('undici', () => { }) }) - it('should not record HTTP 5XX responses as errors by default', done => { + it('should not record HTTP 5XX responses as errors by default', (done) => { const app = express() app.get('/user', (req, res) => { res.status(500).send() }) - getPort().then(port => { + getPort().then((port) => { agent - .use(traces => { + .use((traces) => { expect(traces[0][0]).to.have.property('error', 0) }) .then(done) @@ -372,16 +402,16 @@ describe('undici', () => { }) }) - it('should record HTTP 4XX responses as errors by default', done => { + it('should record HTTP 4XX responses as errors by default', (done) => { const app = express() app.get('/user', (req, res) => { res.status(400).send() }) - getPort().then(port => { + getPort().then((port) => { agent - .use(traces => { + .use((traces) => { expect(traces[0][0]).to.have.property('error', 1) }) .then(done) @@ -393,48 +423,64 @@ describe('undici', () => { }) }) - it('should record destroyed requests as errors', done => { + it('should record destroyed requests as errors', (done) => { const app = express() app.get('/user', (req, res) => {}) - getPort().then(port => { + getPort().then((port) => { let error agent - .use(traces => { + .use((traces) => { expect(traces[0][0]).to.have.property('error', 1) - expect(traces[0][0].meta).to.have.property('error.msg', error.message) - expect(traces[0][0].meta).to.have.property('error.type', error.name) - expect(traces[0][0].meta).to.have.property('error.stack', error.stack) - expect(traces[0][0].meta).to.not.have.property('http.status_code') + expect(traces[0][0].meta).to.have.property( + 'error.msg', + error.message + ) + expect(traces[0][0].meta).to.have.property( + 'error.type', + error.name + ) + expect(traces[0][0].meta).to.have.property( + 'error.stack', + error.stack + ) + expect(traces[0][0].meta).to.not.have.property( + 'http.status_code' + ) }) .then(done) .catch(done) appListener = server(app, port, () => { const client = new undici.Client(`http://localhost:${port}`) - client.request({ - path: '/user', - method: 'GET' - }, (err, data) => { - error = err - }) + client.request( + { + path: '/user', + method: 'GET' + }, + (err, data) => { + error = err + } + ) client.destroy() }) }) }) - it('should record aborted requests as errors', done => { + it('should record aborted requests as errors', (done) => { const app = express() app.get('/user', (req, res) => {}) - getPort().then(port => { + getPort().then((port) => { agent - .use(traces => { + .use((traces) => { expect(traces[0][0]).to.have.property('error', 1) - expect(traces[0][0].meta).to.not.have.property('http.status_code') + expect(traces[0][0].meta).to.not.have.property( + 'http.status_code' + ) }) .then(done) .catch(done) @@ -450,16 +496,18 @@ describe('undici', () => { }) // TODO: Get timeout working - it.skip('should record timeouts as errors', done => { + it.skip('should record timeouts as errors', (done) => { const app = express() app.get('/user', (req, res) => {}) - getPort().then(port => { + getPort().then((port) => { agent - .use(traces => { + .use((traces) => { expect(traces[0][0]).to.have.property('error', 1) - expect(traces[0][0].meta).to.not.have.property('http.status_code') + expect(traces[0][0].meta).to.not.have.property( + 'http.status_code' + ) }) .then(done) .catch(done) @@ -473,45 +521,49 @@ describe('undici', () => { }) }) - it('should record when the request was aborted', done => { + it('should record when the request was aborted', (done) => { const app = express() app.get('/abort', (req, res) => { res.status(200).send() }) - getPort().then(port => { + getPort().then((port) => { agent - .use(traces => { - expect(traces[0][0]).to.have.property('service', 'test-http-client') + .use((traces) => { + expect(traces[0][0]).to.have.property( + 'service', + 'test-http-client' + ) }) .then(done) .catch(done) appListener = server(app, port, () => { const ac = new AbortController() - undici.request(`http://localhost:${port}/abort`, { signal: ac.signal }) + undici.request(`http://localhost:${port}/abort`, { + signal: ac.signal + }) ac.abort() }) }) }) - it('should skip requests to the agent', done => { + it('should skip requests to the agent', (done) => { const app = express() app.get('/user', (req, res) => { res.status(200).send() }) - getPort().then(port => { + getPort().then((port) => { const timer = setTimeout(done, 100) - agent - .use(() => { - done(new Error('Request to the agent was traced.')) - clearTimeout(timer) - }) + agent.use(() => { + done(new Error('Request to the agent was traced.')) + clearTimeout(timer) + }) appListener = server(app, port, () => { undici.request(tracer._tracer._url.href) @@ -528,23 +580,22 @@ describe('undici', () => { service: 'custom' } - return agent.load('undici', config) - .then(() => { - undici = require(`../../../versions/undici@${version}`).get() - express = require('express') - }) + return agent.load('undici', config).then(() => { + undici = require(`../../../versions/undici@${version}`).get() + express = require('express') + }) }) - it('should be configured with the correct values', done => { + it('should be configured with the correct values', (done) => { const app = express() app.get('/user', (req, res) => { res.status(200).send() }) - getPort().then(port => { + getPort().then((port) => { agent - .use(traces => { + .use((traces) => { expect(traces[0][0]).to.have.property('service', 'custom') }) .then(done) @@ -562,26 +613,25 @@ describe('undici', () => { beforeEach(() => { config = { - validateStatus: status => status < 500 + validateStatus: (status) => status < 500 } - return agent.load('undici', config) - .then(() => { - undici = require(`../../../versions/undici@${version}`).get() - express = require('express') - }) + return agent.load('undici', config).then(() => { + undici = require(`../../../versions/undici@${version}`).get() + express = require('express') + }) }) - it('should use the supplied function to decide if a response is an error', done => { + it('should use the supplied function to decide if a response is an error', (done) => { const app = express() app.get('/user', (req, res) => { res.status(500).send() }) - getPort().then(port => { + getPort().then((port) => { agent - .use(traces => { + .use((traces) => { expect(traces[0][0]).to.have.property('error', 1) }) .then(done) @@ -601,24 +651,26 @@ describe('undici', () => { config = { splitByDomain: true } - return agent.load('undici', config) - .then(() => { - undici = require(`../../../versions/undici@${version}`).get() - express = require('express') - }) + return agent.load('undici', config).then(() => { + undici = require(`../../../versions/undici@${version}`).get() + express = require('express') + }) }) - it('should use the remote endpoint as the service name', done => { + it('should use the remote endpoint as the service name', (done) => { const app = express() app.get('/user', (req, res) => { res.status(200).send() }) - getPort().then(port => { + getPort().then((port) => { agent - .use(traces => { - expect(traces[0][0]).to.have.property('service', `localhost:${port}`) + .use((traces) => { + expect(traces[0][0]).to.have.property( + 'service', + `localhost:${port}` + ) }) .then(done) .catch(done) @@ -637,17 +689,16 @@ describe('undici', () => { config = { headers: ['host', 'x-foo'] } - return agent.load('undici', config) - .then(() => { - undici = require(`../../../versions/undici@${version}`).get() - express = require('express') - }) + return agent.load('undici', config).then(() => { + undici = require(`../../../versions/undici@${version}`).get() + express = require('express') + }) }) describe('host header', () => { // TODO: Injected headers are not available yet // for request - it('should add tags for the host header', done => { + it('should add tags for the host header', (done) => { const app = express() app.get('/user', (req, res) => { @@ -655,11 +706,14 @@ describe('undici', () => { res.status(200).send() }) - getPort().then(port => { + getPort().then((port) => { agent - .use(traces => { + .use((traces) => { const meta = traces[0][0].meta - expect(meta).to.have.property(`${HTTP_REQUEST_HEADERS}.host`, `localhost:${port}`) + expect(meta).to.have.property( + `${HTTP_REQUEST_HEADERS}.host`, + `localhost:${port}` + ) }) .then(done) .catch(done) @@ -670,7 +724,7 @@ describe('undici', () => { }) }) - it('should add tags for the host header through Client', done => { + it('should add tags for the host header through Client', (done) => { const app = express() app.get('/user', (req, res) => { @@ -678,11 +732,14 @@ describe('undici', () => { res.status(200).send() }) - getPort().then(port => { + getPort().then((port) => { agent - .use(traces => { + .use((traces) => { const meta = traces[0][0].meta - expect(meta).to.have.property(`${HTTP_REQUEST_HEADERS}.host`, `localhost:${port}`) + expect(meta).to.have.property( + `${HTTP_REQUEST_HEADERS}.host`, + `localhost:${port}` + ) }) .then(done) .catch(done) @@ -698,7 +755,7 @@ describe('undici', () => { }) }) - it('should pass overwritten host header', done => { + it('should pass overwritten host header', (done) => { const app = express() app.get('/user', (req, res) => { @@ -706,11 +763,14 @@ describe('undici', () => { res.status(200).send() }) - getPort().then(port => { + getPort().then((port) => { agent - .use(traces => { + .use((traces) => { const meta = traces[0][0].meta - expect(meta).to.have.property(`${HTTP_REQUEST_HEADERS}.host`, `my-service`) + expect(meta).to.have.property( + `${HTTP_REQUEST_HEADERS}.host`, + `my-service` + ) }) .then(done) .catch(done) @@ -726,7 +786,7 @@ describe('undici', () => { }) }) - it('should add tags for the configured headers', done => { + it('should add tags for the configured headers', (done) => { const app = express() app.get('/user', (req, res) => { @@ -734,11 +794,14 @@ describe('undici', () => { res.status(200).send() }) - getPort().then(port => { + getPort().then((port) => { agent - .use(traces => { + .use((traces) => { const meta = traces[0][0].meta - expect(meta).to.have.property(`${HTTP_RESPONSE_HEADERS}.x-foo`, 'bar') + expect(meta).to.have.property( + `${HTTP_RESPONSE_HEADERS}.x-foo`, + 'bar' + ) }) .then(done) .catch(done) @@ -749,18 +812,21 @@ describe('undici', () => { }) }) - it('should support adding request headers', done => { + it('should support adding request headers', (done) => { const app = express() app.get('/user', (req, res) => { res.status(200).send() }) - getPort().then(port => { + getPort().then((port) => { agent - .use(traces => { + .use((traces) => { const meta = traces[0][0].meta - expect(meta).to.have.property(`${HTTP_REQUEST_HEADERS}.x-foo`, `bar`) + expect(meta).to.have.property( + `${HTTP_REQUEST_HEADERS}.x-foo`, + `bar` + ) }) .then(done) .catch(done) @@ -785,23 +851,22 @@ describe('undici', () => { } } } - return agent.load('undici', config) - .then(() => { - undici = require(`../../../versions/undici@${version}`).get() - express = require('express') - }) + return agent.load('undici', config).then(() => { + undici = require(`../../../versions/undici@${version}`).get() + express = require('express') + }) }) - it('should run the request hook before the span is finished', done => { + it('should run the request hook before the span is finished', (done) => { const app = express() app.get('/user', (req, res) => { res.status(200).send() }) - getPort().then(port => { + getPort().then((port) => { agent - .use(traces => { + .use((traces) => { expect(traces[0][0]).to.have.property('resource', 'GET -- /user') }) .then(done) @@ -821,14 +886,13 @@ describe('undici', () => { config = { propagationBlocklist: [/\/users/] } - return agent.load('undici', config) - .then(() => { - undici = require(`../../../versions/undici@${version}`).get() - express = require('express') - }) + return agent.load('undici', config).then(() => { + undici = require(`../../../versions/undici@${version}`).get() + express = require('express') + }) }) - it('should skip injecting if the url matches an item in the propagationBlacklist', done => { + it('should skip injecting if the url matches an item in the propagationBlacklist', (done) => { const app = express() app.get('/users', (req, res) => { @@ -844,7 +908,7 @@ describe('undici', () => { } }) - getPort().then(port => { + getPort().then((port) => { appListener = server(app, port, () => { undici.request(`http://localhost:${port}/users`) })