Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add dc store binding polyfill and migrate http2 #3284

Merged
merged 3 commits into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 25 additions & 26 deletions packages/datadog-instrumentations/src/http2/client.js
Original file line number Diff line number Diff line change
@@ -1,32 +1,27 @@
'use strict'

const shimmer = require('../../../datadog-shimmer')
const { addHook, channel, AsyncResource } = require('../helpers/instrument')
const { addHook, channel } = require('../helpers/instrument')

const connectChannel = channel('apm:http2:client:connect:start')
const startChannel = channel('apm:http2:client:request:start')
const finishChannel = channel('apm:http2:client:request:finish')
const endChannel = channel('apm:http2:client:request:end')
const asyncStartChannel = channel('apm:http2:client:request:asyncStart')
const asyncEndChannel = channel('apm:http2:client:request:asyncEnd')
const errorChannel = channel('apm:http2:client:request:error')
const responseChannel = channel('apm:http2:client:response')

function createWrapEmit (requestResource, parentResource) {
function createWrapEmit (ctx) {
return function wrapEmit (emit) {
return function (event, arg1) {
requestResource.runInAsyncScope(() => {
switch (event) {
case 'response':
responseChannel.publish(arg1)
break
case 'error':
errorChannel.publish(arg1)
case 'close': // eslint-disable-line no-fallthrough
finishChannel.publish()
break
}
})
ctx.eventName = event
ctx.eventData = arg1

return parentResource.runInAsyncScope(() => {
return emit.apply(this, arguments)
return asyncStartChannel.runStores(ctx, () => {
try {
return emit.apply(this, arguments)
} finally {
asyncEndChannel.publish(ctx)
}
})
}
}
Expand All @@ -35,17 +30,21 @@ function createWrapEmit (requestResource, parentResource) {
function createWrapRequest (authority, options) {
return function wrapRequest (request) {
return function (headers) {
const parentResource = new AsyncResource('bound-anonymous-fn')
const requestResource = new AsyncResource('bound-anonymous-fn')
const ctx = { headers, authority, options }

return requestResource.runInAsyncScope(() => {
startChannel.publish({ headers, authority, options })
return startChannel.runStores(ctx, () => {
try {
const req = request.apply(this, arguments)

const req = request.apply(this, arguments)
shimmer.wrap(req, 'emit', createWrapEmit(ctx))

shimmer.wrap(req, 'emit', createWrapEmit(requestResource, parentResource))

return req
return req
} catch (e) {
ctx.error = e
errorChannel.publish(ctx)
} finally {
endChannel.publish(ctx)
}
})
}
}
Expand Down
75 changes: 46 additions & 29 deletions packages/datadog-plugin-http2/src/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ const log = require('../../dd-trace/src/log')
const tags = require('../../../ext/tags')
const kinds = require('../../../ext/kinds')
const formats = require('../../../ext/formats')
const analyticsSampler = require('../../dd-trace/src/analytics_sampler')
const { COMPONENT, CLIENT_PORT_KEY } = require('../../dd-trace/src/constants')
const urlFilter = require('../../dd-trace/src/plugins/util/urlfilter')

Expand All @@ -25,32 +24,11 @@ const HTTP2_HEADER_STATUS = ':status'
const HTTP2_METHOD_GET = 'GET'

class Http2ClientPlugin extends ClientPlugin {
static get id () {
return 'http2'
}

constructor (...args) {
super(...args)

this.addSub('apm:http2:client:response', (headers) => {
const span = storage.getStore().span
const status = headers && headers[HTTP2_HEADER_STATUS]

span.setTag(HTTP_STATUS_CODE, status)

if (!this.config.validateStatus(status)) {
this.addError()
}
static get id () { return 'http2' }
static get prefix () { return `apm:http2:client:request` }

addHeaderTags(span, headers, HTTP_RESPONSE_HEADERS, this.config)
})
}

addTraceSub (eventName, handler) {
this.addSub(`apm:${this.constructor.id}:client:${this.operation}:${eventName}`, handler)
}

start ({ authority, options, headers = {} }) {
bindStart (message) {
const { authority, options, headers = {} } = message
const sessionDetails = extractSessionDetails(authority, options)
const path = headers[HTTP2_HEADER_PATH] || '/'
const pathname = path.split(/[?#]/)[0]
Expand All @@ -75,7 +53,7 @@ class Http2ClientPlugin extends ClientPlugin {
metrics: {
[CLIENT_PORT_KEY]: parseInt(sessionDetails.port)
}
})
}, false)

// TODO: Figure out a better way to do this for any span.
if (!allowed) {
Expand All @@ -88,14 +66,53 @@ class Http2ClientPlugin extends ClientPlugin {
this.tracer.inject(span, HTTP_HEADERS, headers)
}

analyticsSampler.sample(span, this.config.measured)
message.parentStore = store
message.currentStore = { ...store, span }

return message.currentStore
}

bindAsyncStart ({ eventName, eventData, currentStore, parentStore }) {
switch (eventName) {
case 'response':
this._onResponse(currentStore, eventData)
return parentStore
case 'error':
this._onError(currentStore, eventData)
return parentStore
case 'close':
this._onClose(currentStore, eventData)
return parentStore
}

this.enter(span, store)
return storage.getStore()
}

configure (config) {
return super.configure(normalizeConfig(config))
}

_onResponse (store, headers) {
const status = headers && headers[HTTP2_HEADER_STATUS]

store.span.setTag(HTTP_STATUS_CODE, status)

if (!this.config.validateStatus(status)) {
storage.run(store, () => this.addError())
}

addHeaderTags(store.span, headers, HTTP_RESPONSE_HEADERS, this.config)
}

_onError ({ span }, error) {
span.setTag('error', error)
span.finish()
}

_onClose ({ span }) {
this.tagPeerService(span)
span.finish()
}
}

function extractSessionDetails (authority, options) {
Expand Down
6 changes: 5 additions & 1 deletion packages/dd-trace/src/plugins/outbound.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,17 @@ class OutboundPlugin extends TracingPlugin {

finish () {
const span = this.activeSpan
this.tagPeerService(span)
super.finish(...arguments)
}

tagPeerService (span) {
if (this.tracer._computePeerService) {
const peerData = this.getPeerService(span.context()._tags)
if (peerData) {
span.addTags(peerData)
}
}
super.finish(...arguments)
}

connect (url) {
Expand Down
28 changes: 28 additions & 0 deletions packages/dd-trace/src/plugins/plugin.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,31 @@ class Subscription {
}
}

class StoreBinding {
constructor (event, transform) {
this._channel = dc.channel(event)
this._transform = data => {
const store = storage.getStore()

return !store || !store.noop
? transform(data)
: store
Qard marked this conversation as resolved.
Show resolved Hide resolved
}
}

enable () {
this._channel.bindStore(storage, this._transform)
}

disable () {
this._channel.unbindStore(storage, this._transform)
}
}

module.exports = class Plugin {
constructor (tracer, tracerConfig) {
this._subscriptions = []
this._bindings = []
this._enabled = false
this._tracer = tracer
this.config = {} // plugin-specific configuration, unset until .configure() is called
Expand All @@ -53,6 +75,10 @@ module.exports = class Plugin {
this._subscriptions.push(new Subscription(channelName, handler))
}

addBind (channelName, transform) {
this._bindings.push(new StoreBinding(channelName, transform))
}

addError (error) {
const store = storage.getStore()

Expand All @@ -71,9 +97,11 @@ module.exports = class Plugin {
if (config.enabled && !this._enabled) {
this._enabled = true
this._subscriptions.forEach(sub => sub.enable())
this._bindings.forEach(sub => sub.enable())
} else if (!config.enabled && this._enabled) {
this._enabled = false
this._subscriptions.forEach(sub => sub.disable())
this._bindings.forEach(sub => sub.disable())
}
}
}
47 changes: 33 additions & 14 deletions packages/dd-trace/src/plugins/tracing.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,7 @@ class TracingPlugin extends Plugin {
this.component = this.constructor.component || this.constructor.id
this.operation = this.constructor.operation

this.addTraceSub('start', message => {
this.start(message)
})

this.addTraceSub('error', err => {
this.error(err)
})

this.addTraceSub('finish', message => {
this.finish(message)
})
this.addTraceSubs()
}

get activeSpan () {
Expand Down Expand Up @@ -65,19 +55,45 @@ class TracingPlugin extends Plugin {
this.addError(error)
}

addTraceSubs () {
const events = ['start', 'end', 'asyncStart', 'asyncEnd', 'error', 'finish']

for (const event of events) {
const bindName = `bind${event.charAt(0).toUpperCase()}${event.slice(1)}`

if (this[event]) {
this.addTraceSub(event, message => {
this[event](message)
})
}

if (this[bindName]) {
this.addTraceBind(event, message => this[bindName](message))
}
}
}

addTraceSub (eventName, handler) {
this.addSub(`apm:${this.component}:${this.operation}:${eventName}`, handler)
const prefix = this.constructor.prefix || `apm:${this.component}:${this.operation}`
this.addSub(`${prefix}:${eventName}`, handler)
}

addTraceBind (eventName, transform) {
const prefix = this.constructor.prefix || `apm:${this.component}:${this.operation}`
this.addBind(`${prefix}:${eventName}`, transform)
}

addError (error) {
const span = this.activeSpan

if (!span._spanContext._tags['error']) {
// Errors may be wrapped in a context.
error = (error && error.error) || error
span.setTag('error', error || 1)
}
}

startSpan (name, { childOf, kind, meta, metrics, service, resource, type } = {}) {
startSpan (name, { childOf, kind, meta, metrics, service, resource, type } = {}, enter = true) {
const store = storage.getStore()

if (store && childOf === undefined) {
Expand All @@ -100,7 +116,10 @@ class TracingPlugin extends Plugin {

analyticsSampler.sample(span, this.config.measured)

storage.enterWith({ ...store, span })
// TODO: Remove this after migration to TracingChannel is done.
if (enter) {
storage.enterWith({ ...store, span })
}

return span
}
Expand Down
Loading