Skip to content

Commit

Permalink
Add support for http2 servers in compatibility mode
Browse files Browse the repository at this point in the history
  • Loading branch information
Stephen Belanger committed Sep 29, 2022
1 parent f767046 commit f82ad3c
Show file tree
Hide file tree
Showing 3 changed files with 342 additions and 1 deletion.
57 changes: 56 additions & 1 deletion packages/datadog-instrumentations/src/http2/server.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,58 @@
'use strict'

// Instrumentation temporarily disabled. See https://github.com/DataDog/dd-trace-js/issues/312
// Old instrumentation temporarily replaced with compatibility mode only instrumentation.
// See https://github.com/DataDog/dd-trace-js/issues/312

const {
channel,
addHook,
AsyncResource
} = require('../helpers/instrument')
const shimmer = require('../../../datadog-shimmer')

const startServerCh = channel('apm:http2:server:request:start')
const errorServerCh = channel('apm:http2:server:request:error')
const finishServerCh = channel('apm:http2:server:request:finish')

addHook({ name: 'http2' }, http2 => {
shimmer.wrap(http2, 'createSecureServer', wrapCreateServer)
shimmer.wrap(http2, 'createServer', wrapCreateServer)
return http2
})

function wrapCreateServer (createServer) {
return function (...args) {
const handler = args.pop()
return createServer.call(this, ...args, function (req, res) {
if (!startServerCh.hasSubscribers) {
return handler.apply(this, arguments)
}

shimmer.wrap(res, 'emit', wrapResponseEmit)
res.req = req

const asyncResource = new AsyncResource('bound-anonymous-fn')
return asyncResource.runInAsyncScope(() => {
startServerCh.publish({ req, res })

try {
return handler.apply(this, arguments)
} catch (err) {
errorServerCh.publish(err)

throw err
}
})
})
}
}

function wrapResponseEmit (emit) {
return function (eventName, event) {
if (eventName === 'close' && finishServerCh.hasSubscribers) {
finishServerCh.publish({ req: this.req })
}

return emit.apply(this, arguments)
}
}
41 changes: 41 additions & 0 deletions packages/datadog-plugin-http2/src/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,52 @@
// Plugin temporarily disabled. See https://github.com/DataDog/dd-trace-js/issues/312

const Plugin = require('../../dd-trace/src/plugins/plugin')
const { storage } = require('../../datadog-core')
const web = require('../../dd-trace/src/plugins/util/web')
const { incomingHttpRequestStart } = require('../../dd-trace/src/appsec/gateway/channels')

class Http2ServerPlugin extends Plugin {
static get name () {
return 'http2'
}

constructor (...args) {
super(...args)

this.addSub('apm:http2:server:request:start', ({ req, res }) => {
const store = storage.getStore()
const span = web.startSpan(this.tracer, this.config, req, res, 'web.request')

this.enter(span, { ...store, req })

const context = web.getContext(req)

if (!context.instrumented) {
context.res.writeHead = web.wrapWriteHead(context)
context.instrumented = true
}

if (incomingHttpRequestStart.hasSubscribers) {
incomingHttpRequestStart.publish({ req, res })
}
})

this.addSub('apm:http2:server:request:error', (error) => {
web.addError(error)
})

this.addSub('apm:http2:server:request:finish', ({ req }) => {
const context = web.getContext(req)

if (!context || !context.res) return // Not created by a http.Server instance.

web.finishAll(context)
})
}

configure (config) {
return super.configure(web.normalizeConfig(config))
}
}

module.exports = Http2ServerPlugin
245 changes: 245 additions & 0 deletions packages/datadog-plugin-http2/test/server.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
'use strict'

const getPort = require('get-port')
const agent = require('../../dd-trace/test/plugins/agent')
const { incomingHttpRequestStart } = require('../../dd-trace/src/appsec/gateway/channels')

function request (http2, url, options) {
url = new URL(url)
return new Promise((resolve, reject) => {
const client = http2
.connect(url.origin)
.on('error', reject)

const req = client.request({
':path': url.pathname,
':method': 'GET'
}, options)
req.on('error', reject)

const chunks = []
req.on('data', (chunk) => {
chunks.push(chunk)
})
req.on('end', () => {
resolve(Buffer.concat(chunks))
})

req.end()
})
}

describe('Plugin', () => {
let http2
let listener
let appListener
let tracer
let port
let app

describe('http2/server', () => {
beforeEach(() => {
tracer = require('../../dd-trace')
listener = (req, res) => {
app && app(req, res)
res.writeHead(200)
res.end()
}
})

beforeEach(() => {
return getPort().then(newPort => {
port = newPort
})
})

afterEach(() => {
appListener && appListener.close()
app = null
return agent.close({ ritmReset: false })
})

describe('canceled request', () => {
beforeEach(() => {
listener = (req, res) => {
setTimeout(() => {
app && app(req, res)
res.writeHead(200)
res.end()
}, 500)
}
})

beforeEach(() => {
return agent.load('http2')
.then(() => {
http2 = require('http2')
})
})

beforeEach(done => {
const server = http2.createServer(listener)
appListener = server
.listen(port, 'localhost', () => done())
})

it('should send traces to agent', (done) => {
app = sinon.stub()
agent
.use(traces => {
expect(app).not.to.have.been.called // request should be cancelled before call to app
expect(traces[0][0]).to.have.property('name', 'web.request')
expect(traces[0][0]).to.have.property('service', 'test')
expect(traces[0][0]).to.have.property('type', 'web')
expect(traces[0][0]).to.have.property('resource', 'GET')
expect(traces[0][0].meta).to.have.property('span.kind', 'server')
expect(traces[0][0].meta).to.have.property('http.url', `http://localhost:${port}/user`)
expect(traces[0][0].meta).to.have.property('http.method', 'GET')
expect(traces[0][0].meta).to.have.property('http.status_code', '200')
})
.then(done)
.catch(done)

const ac = new AbortController()
request(http2, `http://localhost:${port}/user`, {
signal: ac.signal
})
setTimeout(() => { ac.abort() }, 100)
})
})

describe('without configuration', () => {
beforeEach(() => {
return agent.load('http2')
.then(() => {
http2 = require('http2')
})
})

beforeEach(done => {
const server = http2.createServer(listener)
appListener = server
.listen(port, 'localhost', () => done())
})

it('should do automatic instrumentation', done => {
agent
.use(traces => {
expect(traces[0][0]).to.have.property('name', 'web.request')
expect(traces[0][0]).to.have.property('service', 'test')
expect(traces[0][0]).to.have.property('type', 'web')
expect(traces[0][0]).to.have.property('resource', 'GET')
expect(traces[0][0].meta).to.have.property('span.kind', 'server')
expect(traces[0][0].meta).to.have.property('http.url', `http://localhost:${port}/user`)
expect(traces[0][0].meta).to.have.property('http.method', 'GET')
expect(traces[0][0].meta).to.have.property('http.status_code', '200')
})
.then(done)
.catch(done)

request(http2, `http://localhost:${port}/user`).catch(done)
})

it('should run the request listener in the request scope', done => {
const spy = sinon.spy(() => {
expect(tracer.scope().active()).to.not.be.null
})

incomingHttpRequestStart.subscribe(spy)

app = (req, res) => {
expect(tracer.scope().active()).to.not.be.null

expect(spy).to.have.been.calledOnceWithExactly({ req, res }, incomingHttpRequestStart.name)

done()
}

request(http2, `http://localhost:${port}/user`).catch(done)
})

it(`should run the request's close event in the correct context`, done => {
app = (req, res) => {
req.on('close', () => {
expect(tracer.scope().active()).to.equal(null)
done()
})
res.end()
}

request(http2, `http://localhost:${port}/user`).catch(done)
})

it(`should run the response's close event in the correct context`, done => {
app = (req, res) => {
const span = tracer.scope().active()

res.on('close', () => {
expect(tracer.scope().active()).to.equal(span)
done()
})
}

request(http2, `http://localhost:${port}/user`).catch(done)
})

it(`should run the finish event in the correct context`, done => {
app = (req, res) => {
const span = tracer.scope().active()

res.on('finish', () => {
expect(tracer.scope().active()).to.equal(span)
done()
})
}

request(http2, `http://localhost:${port}/user`).catch(done)
})

it('should not cause `end` to be called multiple times', done => {
app = (req, res) => {
res.end = sinon.spy(res.end)

res.on('finish', () => {
expect(res.end).to.have.been.calledOnce
done()
})
}

request(http2, `http://localhost:${port}/user`).catch(done)
})
})

describe('with a blocklist configuration', () => {
beforeEach(() => {
return agent.load('http2', { client: false, blocklist: '/health' })
.then(() => {
http2 = require('http2')
})
})

beforeEach(done => {
const server = http2.createServer(listener)
appListener = server
.listen(port, 'localhost', () => done())
})

it('should drop traces for blocklist route', done => {
const spy = sinon.spy(() => {})

agent
.use((traces) => {
spy()
})
.catch(done)

setTimeout(() => {
expect(spy).to.not.have.been.called
done()
}, 100)

request(http2, `http://localhost:${port}/health`).catch(done)
})
})
})
})

0 comments on commit f82ad3c

Please sign in to comment.