Skip to content

Commit

Permalink
refactor: adds support for all public apis
Browse files Browse the repository at this point in the history
  • Loading branch information
flovilmart committed Feb 1, 2022
1 parent 7f85c87 commit e66468a
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 204 deletions.
33 changes: 0 additions & 33 deletions packages/datadog-plugin-http/src/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ const log = require('../../dd-trace/src/log')
const tags = require('../../../ext/tags')
const kinds = require('../../../ext/kinds')
const formats = require('../../../ext/formats')
const urlFilter = require('../../dd-trace/src/plugins/util/urlfilter')
const analyticsSampler = require('../../dd-trace/src/analytics_sampler')
const { storage } = require('../../datadog-core')
const { addErrorToSpan, getServiceName, hasAmazonSignature, client } = require('../../dd-trace/src/plugins/util/web')
Expand Down Expand Up @@ -216,44 +215,12 @@ function unpatch (http) {
this.unwrap(http, 'get')
}

function getStatusValidator (config) {
if (typeof config.validateStatus === 'function') {
return config.validateStatus
} else if (config.hasOwnProperty('validateStatus')) {
log.error('Expected `validateStatus` to be a function.')
}
return code => code < 400 || code >= 500
}

function getFilter (config) {
config = Object.assign({}, config, {
blocklist: config.blocklist || []
})

return urlFilter.getFilter(config)
}

function normalizeConfig (config) {
config = config.client || config

return client.normalizeConfig(config)
}

function getHeaders (config) {
if (!Array.isArray(config.headers)) return []

return config.headers
.filter(key => typeof key === 'string')
.map(key => key.toLowerCase())
}

function getHooks (config) {
const noop = () => {}
const request = (config.hooks && config.hooks.request) || noop

return { request }
}

module.exports = [
{
name: 'http',
Expand Down
127 changes: 96 additions & 31 deletions packages/datadog-plugin-undici/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ const log = require('../../dd-trace/src/log')
const tags = require('../../../ext/tags')
const kinds = require('../../../ext/kinds')
const formats = require('../../../ext/formats')
const urlFilter = require('../../dd-trace/src/plugins/util/urlfilter')
const analyticsSampler = require('../../dd-trace/src/analytics_sampler')
const { AsyncResource, AsyncLocalStorage } = require('async_hooks')
const { addErrorToSpan, getServiceName, hasAmazonSignature, client: { normalizeConfig } } = require('../../dd-trace/src/plugins/util/web')
const {
addErrorToSpan,
getServiceName,
hasAmazonSignature,
client: { normalizeConfig }
} = require('../../dd-trace/src/plugins/util/web')

const HTTP_HEADERS = formats.HTTP_HEADERS
const HTTP_STATUS_CODE = tags.HTTP_STATUS_CODE
Expand Down Expand Up @@ -58,7 +62,6 @@ function diagnostics (tracer, config) {
)
return () => {}
}
config = normalizeConfig(config)

channels.requestChannel = diagnosticsChannel.channel('undici:request:create')
channels.headersChannel = diagnosticsChannel.channel(
Expand Down Expand Up @@ -90,9 +93,17 @@ function diagnostics (tracer, config) {
requestSpansMap.set(request, span)
}

const headers = typeof request.headers == 'string' ? parseHeaders(request.headers) : request.headers;

if (!(hasAmazonSignature({ ...request, headers }) || !config.propagationFilter(uri))) {
const headers =
typeof request.headers === 'string'
? parseHeaders(request.headers)
: request.headers

if (
!(
hasAmazonSignature({ ...request, headers }) ||
!config.propagationFilter(uri)
)
) {
const injectedHeaders = {}
tracer.inject(span, HTTP_HEADERS, injectedHeaders)
Object.entries(injectedHeaders).forEach(([key, value]) => {
Expand All @@ -106,12 +117,15 @@ function diagnostics (tracer, config) {
function handleRequestError ({ request, error }) {
const span = requestSpansMap.get(request)
addErrorToSpan(span, error)
finish(request, null, span, config)
addRequestHeaders(request, span, config)
span.finish()
}

function handleRequestHeaders ({ request, response }) {
const span = requestSpansMap.get(request)
finish(request, response, span, config)
addRequestHeaders(request, span, config)
setStatusCode(response, span, config)
config.hooks.request(span, request, response)
}

return function unsubscribe () {
Expand Down Expand Up @@ -149,37 +163,38 @@ function addRequestHeaders (req, span, config) {
}
}

function addResponseHeaders (res, span, config) {
const resHeader = res.headers.map((x) => x.toString())
while (resHeader.length) {
const key = resHeader.shift()
const value = resHeader.shift()
span.setTag(`${HTTP_RESPONSE_HEADERS}.${key}`, value)
function setStatusCode (res, span, config) {
// fetch has status set on `status` rather than statusCode
const statusCode = res.status || res.statusCode
span.setTag(HTTP_STATUS_CODE, statusCode)

if (!config.validateStatus(statusCode)) {
span.setTag('error', 1)
}
}

function finish (req, res, span, config) {
if (res) {
span.setTag(HTTP_STATUS_CODE, res.statusCode)
function addResponseHeaders (res, span, config) {
config.headers.forEach((key) => {
const value = res.headers[key]

if (!config.validateStatus(res.statusCode)) {
span.setTag('error', 1)
if (value) {
span.setTag(`${HTTP_RESPONSE_HEADERS}.${key}`, value)
}
})
}

function finishSpan (res, span, error, config) {
if (res) {
setStatusCode(res, span, config)
addResponseHeaders(res, span, config)
span.finish()
} else {
span.setTag('error', 1)
}

addRequestHeaders(req, span, config)

config.hooks.request(span, req, res)

span.finish()
}

function patch (undici, methodName, tracer, config) {
this.wrap(undici, methodName, fn => makeRequestTrace(fn))
this.wrap(undici, methodName, (fn) => makeRequestTrace(fn))

function makeRequestTrace (request) {
return function requestTrace () {
Expand All @@ -195,28 +210,78 @@ function patch (undici, methodName, tracer, config) {
[SPAN_KIND]: CLIENT
}
})

return asyncLocalStorage.run(span, () => {
return request.apply(this, arguments)
const result = asyncLocalStorage.run(span, () => {
return tracer.scope().activate(span, () => {
return request.apply(this, arguments)
})
})

if (methodName === 'pipeline') {
result.on('end', () => {
span.finish()
}).on('error', () => {
span.finish()
})
return result
}

return wrapPromise(result, span, config)
}
}
}

function wrapPromise (promise, span, config) {
if (!promise) {
finishSpan(null, span, null, config)
return promise
}

return promise
.then(
(res) => finishSpan(res, span, null, config),
(e) => finishSpan(null, span, e, config)
)
.then(() => promise)
}

module.exports = [
{
name: 'undici',
versions: ['>=4.7.1'],
patch: function (undici, tracer, config) {
config = normalizeConfig(config)

patch.call(this, undici, 'request', tracer, config)
patch.call(this, undici, 'upgrade', tracer, config)
patch.call(this, undici, 'connect', tracer, config)
patch.call(this, undici, 'fetch', tracer, config)
patch.call(this, undici, 'pipeline', tracer, config)
patch.call(this, undici, 'stream', tracer, config)
patch.call(this, undici.Client.prototype, 'request', tracer, config)

this.unpatch = diagnostics.call(this, tracer, config)
patch.call(this, undici.Client.prototype, 'request', tracer, config)
patch.call(this, undici.Client.prototype, 'pipeline', tracer, config)
patch.call(this, undici.Client.prototype, 'upgrade', tracer, config)
patch.call(this, undici.Client.prototype, 'connect', tracer, config)
patch.call(this, undici.Client.prototype, 'stream', tracer, config)

const unpatchDiagnostics = diagnostics.call(this, tracer, config)

this.unpatch = () => {
this.unwrap(undici, 'request')
this.unwrap(undici, 'upgrade')
this.unwrap(undici, 'connect')
this.unwrap(undici, 'fetch')
this.unwrap(undici, 'pipeline')
this.unwrap(undici, 'stream')

this.unwrap(undici.Client.prototype, 'request')
this.unwrap(undici.Client.prototype, 'pipeline')
this.unwrap(undici.Client.prototype, 'upgrade')
this.unwrap(undici.Client.prototype, 'connect')
this.unwrap(undici.Client.prototype, 'stream')

unpatchDiagnostics()
}
}
}
]
Loading

0 comments on commit e66468a

Please sign in to comment.