Skip to content

Commit

Permalink
add dc store binding polyfill and migrate http2 (#3284)
Browse files Browse the repository at this point in the history
* add dc store binding polyfill and migrate http2

* fix error handling for synchronous exceptions

* OutboundPlugin#tagPeerService()

---------

Co-authored-by: Thomas Hunter II <tlhunter@datadog.com>
  • Loading branch information
2 people authored and Stephen Belanger committed Jul 11, 2023
1 parent e33a4ef commit 6da7832
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 70 deletions.
51 changes: 25 additions & 26 deletions packages/datadog-instrumentations/src/http2/client.js
Original file line number Diff line number Diff line change
@@ -1,32 +1,27 @@
'use strict'

const shimmer = require('../../../datadog-shimmer')
const { addHook, channel, AsyncResource } = require('../helpers/instrument')
const { addHook, channel } = require('../helpers/instrument')

const connectChannel = channel('apm:http2:client:connect:start')
const startChannel = channel('apm:http2:client:request:start')
const finishChannel = channel('apm:http2:client:request:finish')
const endChannel = channel('apm:http2:client:request:end')
const asyncStartChannel = channel('apm:http2:client:request:asyncStart')
const asyncEndChannel = channel('apm:http2:client:request:asyncEnd')
const errorChannel = channel('apm:http2:client:request:error')
const responseChannel = channel('apm:http2:client:response')

function createWrapEmit (requestResource, parentResource) {
function createWrapEmit (ctx) {
return function wrapEmit (emit) {
return function (event, arg1) {
requestResource.runInAsyncScope(() => {
switch (event) {
case 'response':
responseChannel.publish(arg1)
break
case 'error':
errorChannel.publish(arg1)
case 'close': // eslint-disable-line no-fallthrough
finishChannel.publish()
break
}
})
ctx.eventName = event
ctx.eventData = arg1

return parentResource.runInAsyncScope(() => {
return emit.apply(this, arguments)
return asyncStartChannel.runStores(ctx, () => {
try {
return emit.apply(this, arguments)
} finally {
asyncEndChannel.publish(ctx)
}
})
}
}
Expand All @@ -35,17 +30,21 @@ function createWrapEmit (requestResource, parentResource) {
function createWrapRequest (authority, options) {
return function wrapRequest (request) {
return function (headers) {
const parentResource = new AsyncResource('bound-anonymous-fn')
const requestResource = new AsyncResource('bound-anonymous-fn')
const ctx = { headers, authority, options }

return requestResource.runInAsyncScope(() => {
startChannel.publish({ headers, authority, options })
return startChannel.runStores(ctx, () => {
try {
const req = request.apply(this, arguments)

const req = request.apply(this, arguments)
shimmer.wrap(req, 'emit', createWrapEmit(ctx))

shimmer.wrap(req, 'emit', createWrapEmit(requestResource, parentResource))

return req
return req
} catch (e) {
ctx.error = e
errorChannel.publish(ctx)
} finally {
endChannel.publish(ctx)
}
})
}
}
Expand Down
75 changes: 46 additions & 29 deletions packages/datadog-plugin-http2/src/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ const log = require('../../dd-trace/src/log')
const tags = require('../../../ext/tags')
const kinds = require('../../../ext/kinds')
const formats = require('../../../ext/formats')
const analyticsSampler = require('../../dd-trace/src/analytics_sampler')
const { COMPONENT, CLIENT_PORT_KEY } = require('../../dd-trace/src/constants')
const urlFilter = require('../../dd-trace/src/plugins/util/urlfilter')

Expand All @@ -25,32 +24,11 @@ const HTTP2_HEADER_STATUS = ':status'
const HTTP2_METHOD_GET = 'GET'

class Http2ClientPlugin extends ClientPlugin {
static get id () {
return 'http2'
}

constructor (...args) {
super(...args)

this.addSub('apm:http2:client:response', (headers) => {
const span = storage.getStore().span
const status = headers && headers[HTTP2_HEADER_STATUS]

span.setTag(HTTP_STATUS_CODE, status)

if (!this.config.validateStatus(status)) {
this.addError()
}
static get id () { return 'http2' }
static get prefix () { return `apm:http2:client:request` }

addHeaderTags(span, headers, HTTP_RESPONSE_HEADERS, this.config)
})
}

addTraceSub (eventName, handler) {
this.addSub(`apm:${this.constructor.id}:client:${this.operation}:${eventName}`, handler)
}

start ({ authority, options, headers = {} }) {
bindStart (message) {
const { authority, options, headers = {} } = message
const sessionDetails = extractSessionDetails(authority, options)
const path = headers[HTTP2_HEADER_PATH] || '/'
const pathname = path.split(/[?#]/)[0]
Expand All @@ -75,7 +53,7 @@ class Http2ClientPlugin extends ClientPlugin {
metrics: {
[CLIENT_PORT_KEY]: parseInt(sessionDetails.port)
}
})
}, false)

// TODO: Figure out a better way to do this for any span.
if (!allowed) {
Expand All @@ -88,14 +66,53 @@ class Http2ClientPlugin extends ClientPlugin {
this.tracer.inject(span, HTTP_HEADERS, headers)
}

analyticsSampler.sample(span, this.config.measured)
message.parentStore = store
message.currentStore = { ...store, span }

return message.currentStore
}

bindAsyncStart ({ eventName, eventData, currentStore, parentStore }) {
switch (eventName) {
case 'response':
this._onResponse(currentStore, eventData)
return parentStore
case 'error':
this._onError(currentStore, eventData)
return parentStore
case 'close':
this._onClose(currentStore, eventData)
return parentStore
}

this.enter(span, store)
return storage.getStore()
}

configure (config) {
return super.configure(normalizeConfig(config))
}

_onResponse (store, headers) {
const status = headers && headers[HTTP2_HEADER_STATUS]

store.span.setTag(HTTP_STATUS_CODE, status)

if (!this.config.validateStatus(status)) {
storage.run(store, () => this.addError())
}

addHeaderTags(store.span, headers, HTTP_RESPONSE_HEADERS, this.config)
}

_onError ({ span }, error) {
span.setTag('error', error)
span.finish()
}

_onClose ({ span }) {
this.tagPeerService(span)
span.finish()
}
}

function extractSessionDetails (authority, options) {
Expand Down
6 changes: 5 additions & 1 deletion packages/dd-trace/src/plugins/outbound.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,17 @@ class OutboundPlugin extends TracingPlugin {

finish () {
const span = this.activeSpan
this.tagPeerService(span)
super.finish(...arguments)
}

tagPeerService (span) {
if (this.tracer._computePeerService) {
const peerData = this.getPeerService(span.context()._tags)
if (peerData) {
span.addTags(peerData)
}
}
super.finish(...arguments)
}

connect (url) {
Expand Down
28 changes: 28 additions & 0 deletions packages/dd-trace/src/plugins/plugin.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,31 @@ class Subscription {
}
}

class StoreBinding {
constructor (event, transform) {
this._channel = dc.channel(event)
this._transform = data => {
const store = storage.getStore()

return !store || !store.noop
? transform(data)
: store
}
}

enable () {
this._channel.bindStore(storage, this._transform)
}

disable () {
this._channel.unbindStore(storage, this._transform)
}
}

module.exports = class Plugin {
constructor (tracer, tracerConfig) {
this._subscriptions = []
this._bindings = []
this._enabled = false
this._tracer = tracer
this.config = {} // plugin-specific configuration, unset until .configure() is called
Expand All @@ -53,6 +75,10 @@ module.exports = class Plugin {
this._subscriptions.push(new Subscription(channelName, handler))
}

addBind (channelName, transform) {
this._bindings.push(new StoreBinding(channelName, transform))
}

addError (error) {
const store = storage.getStore()

Expand All @@ -71,9 +97,11 @@ module.exports = class Plugin {
if (config.enabled && !this._enabled) {
this._enabled = true
this._subscriptions.forEach(sub => sub.enable())
this._bindings.forEach(sub => sub.enable())
} else if (!config.enabled && this._enabled) {
this._enabled = false
this._subscriptions.forEach(sub => sub.disable())
this._bindings.forEach(sub => sub.disable())
}
}
}
47 changes: 33 additions & 14 deletions packages/dd-trace/src/plugins/tracing.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,7 @@ class TracingPlugin extends Plugin {
this.component = this.constructor.component || this.constructor.id
this.operation = this.constructor.operation

this.addTraceSub('start', message => {
this.start(message)
})

this.addTraceSub('error', err => {
this.error(err)
})

this.addTraceSub('finish', message => {
this.finish(message)
})
this.addTraceSubs()
}

get activeSpan () {
Expand Down Expand Up @@ -65,19 +55,45 @@ class TracingPlugin extends Plugin {
this.addError(error)
}

addTraceSubs () {
const events = ['start', 'end', 'asyncStart', 'asyncEnd', 'error', 'finish']

for (const event of events) {
const bindName = `bind${event.charAt(0).toUpperCase()}${event.slice(1)}`

if (this[event]) {
this.addTraceSub(event, message => {
this[event](message)
})
}

if (this[bindName]) {
this.addTraceBind(event, message => this[bindName](message))
}
}
}

addTraceSub (eventName, handler) {
this.addSub(`apm:${this.component}:${this.operation}:${eventName}`, handler)
const prefix = this.constructor.prefix || `apm:${this.component}:${this.operation}`
this.addSub(`${prefix}:${eventName}`, handler)
}

addTraceBind (eventName, transform) {
const prefix = this.constructor.prefix || `apm:${this.component}:${this.operation}`
this.addBind(`${prefix}:${eventName}`, transform)
}

addError (error) {
const span = this.activeSpan

if (!span._spanContext._tags['error']) {
// Errors may be wrapped in a context.
error = (error && error.error) || error
span.setTag('error', error || 1)
}
}

startSpan (name, { childOf, kind, meta, metrics, service, resource, type } = {}) {
startSpan (name, { childOf, kind, meta, metrics, service, resource, type } = {}, enter = true) {
const store = storage.getStore()

if (store && childOf === undefined) {
Expand All @@ -100,7 +116,10 @@ class TracingPlugin extends Plugin {

analyticsSampler.sample(span, this.config.measured)

storage.enterWith({ ...store, span })
// TODO: Remove this after migration to TracingChannel is done.
if (enter) {
storage.enterWith({ ...store, span })
}

return span
}
Expand Down
Loading

0 comments on commit 6da7832

Please sign in to comment.