Skip to content

Commit

Permalink
add dc store binding polyfill and migrate http2
Browse files Browse the repository at this point in the history
  • Loading branch information
rochdev authored and tlhunter committed Jun 26, 2023
1 parent 459ca8d commit f45054c
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 68 deletions.
49 changes: 24 additions & 25 deletions packages/datadog-instrumentations/src/http2/client.js
Original file line number Diff line number Diff line change
@@ -1,32 +1,27 @@
'use strict'

const shimmer = require('../../../datadog-shimmer')
const { addHook, channel, AsyncResource } = require('../helpers/instrument')
const { addHook, channel } = require('../helpers/instrument')

const connectChannel = channel('apm:http2:client:connect:start')
const startChannel = channel('apm:http2:client:request:start')
const finishChannel = channel('apm:http2:client:request:finish')
const endChannel = channel('apm:http2:client:request:end')
const asyncStartChannel = channel('apm:http2:client:request:asyncStart')
const asyncEndChannel = channel('apm:http2:client:request:asyncEnd')
const errorChannel = channel('apm:http2:client:request:error')
const responseChannel = channel('apm:http2:client:response')

function createWrapEmit (requestResource, parentResource) {
function createWrapEmit (ctx) {
return function wrapEmit (emit) {
return function (event, arg1) {
requestResource.runInAsyncScope(() => {
switch (event) {
case 'response':
responseChannel.publish(arg1)
break
case 'error':
errorChannel.publish(arg1)
case 'close': // eslint-disable-line no-fallthrough
finishChannel.publish()
break
}
})
ctx.eventName = event
ctx.eventData = arg1

return parentResource.runInAsyncScope(() => {
return emit.apply(this, arguments)
return asyncStartChannel.runStores(ctx, () => {
try {
return emit.apply(this, arguments)
} finally {
asyncEndChannel.publish(ctx)
}
})
}
}
Expand All @@ -35,17 +30,21 @@ function createWrapEmit (requestResource, parentResource) {
function createWrapRequest (authority, options) {
return function wrapRequest (request) {
return function (headers) {
const parentResource = new AsyncResource('bound-anonymous-fn')
const requestResource = new AsyncResource('bound-anonymous-fn')

return requestResource.runInAsyncScope(() => {
startChannel.publish({ headers, authority, options })
const ctx = { headers, authority, options }

return startChannel.runStores(ctx, () => {
const req = request.apply(this, arguments)

shimmer.wrap(req, 'emit', createWrapEmit(requestResource, parentResource))
shimmer.wrap(req, 'emit', createWrapEmit(ctx))

return req
try {
return req
} catch (e) {
ctx.error = e
errorChannel.publish(ctx)
} finally {
endChannel.publish(ctx)
}
})
}
}
Expand Down
74 changes: 45 additions & 29 deletions packages/datadog-plugin-http2/src/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ const log = require('../../dd-trace/src/log')
const tags = require('../../../ext/tags')
const kinds = require('../../../ext/kinds')
const formats = require('../../../ext/formats')
const analyticsSampler = require('../../dd-trace/src/analytics_sampler')
const { COMPONENT, CLIENT_PORT_KEY } = require('../../dd-trace/src/constants')
const urlFilter = require('../../dd-trace/src/plugins/util/urlfilter')

Expand All @@ -25,32 +24,11 @@ const HTTP2_HEADER_STATUS = ':status'
const HTTP2_METHOD_GET = 'GET'

class Http2ClientPlugin extends ClientPlugin {
static get id () {
return 'http2'
}

constructor (...args) {
super(...args)

this.addSub('apm:http2:client:response', (headers) => {
const span = storage.getStore().span
const status = headers && headers[HTTP2_HEADER_STATUS]

span.setTag(HTTP_STATUS_CODE, status)

if (!this.config.validateStatus(status)) {
this.addError()
}
static get id () { return 'http2' }
static get prefix () { return `apm:http2:client:request` }

addHeaderTags(span, headers, HTTP_RESPONSE_HEADERS, this.config)
})
}

addTraceSub (eventName, handler) {
this.addSub(`apm:${this.constructor.id}:client:${this.operation}:${eventName}`, handler)
}

start ({ authority, options, headers = {} }) {
bindStart (message) {
const { authority, options, headers = {} } = message
const sessionDetails = extractSessionDetails(authority, options)
const path = headers[HTTP2_HEADER_PATH] || '/'
const pathname = path.split(/[?#]/)[0]
Expand All @@ -75,7 +53,7 @@ class Http2ClientPlugin extends ClientPlugin {
metrics: {
[CLIENT_PORT_KEY]: parseInt(sessionDetails.port)
}
})
}, false)

// TODO: Figure out a better way to do this for any span.
if (!allowed) {
Expand All @@ -88,14 +66,52 @@ class Http2ClientPlugin extends ClientPlugin {
this.tracer.inject(span, HTTP_HEADERS, headers)
}

analyticsSampler.sample(span, this.config.measured)
message.parentStore = store
message.currentStore = { ...store, span }

return message.currentStore
}

bindAsyncStart ({ eventName, eventData, currentStore, parentStore }) {
switch (eventName) {
case 'response':
this._onResponse(currentStore, eventData)
return parentStore
case 'error':
this._onError(currentStore, eventData)
return parentStore
case 'close':
this._onClose(currentStore, eventData)
return parentStore
}

this.enter(span, store)
return storage.getStore()
}

configure (config) {
return super.configure(normalizeConfig(config))
}

_onResponse (store, headers) {
const status = headers && headers[HTTP2_HEADER_STATUS]

store.span.setTag(HTTP_STATUS_CODE, status)

if (!this.config.validateStatus(status)) {
storage.run(store, () => this.addError())
}

addHeaderTags(store.span, headers, HTTP_RESPONSE_HEADERS, this.config)
}

_onError ({ span }, error) {
span.setTag('error', error)
span.finish()
}

_onClose ({ span }) {
span.finish()
}
}

function extractSessionDetails (authority, options) {
Expand Down
28 changes: 28 additions & 0 deletions packages/dd-trace/src/plugins/plugin.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,31 @@ class Subscription {
}
}

class StoreBinding {
constructor (event, transform) {
this._channel = dc.channel(event)
this._transform = data => {
const store = storage.getStore()

return !store || !store.noop
? transform(data)
: store
}
}

enable () {
this._channel.bindStore(storage, this._transform)
}

disable () {
this._channel.unbindStore(storage, this._transform)
}
}

module.exports = class Plugin {
constructor (tracer, tracerConfig) {
this._subscriptions = []
this._bindings = []
this._enabled = false
this._tracer = tracer
this.config = {} // plugin-specific configuration, unset until .configure() is called
Expand All @@ -53,6 +75,10 @@ module.exports = class Plugin {
this._subscriptions.push(new Subscription(channelName, handler))
}

addBind (channelName, transform) {
this._bindings.push(new StoreBinding(channelName, transform))
}

addError (error) {
const store = storage.getStore()

Expand All @@ -71,9 +97,11 @@ module.exports = class Plugin {
if (config.enabled && !this._enabled) {
this._enabled = true
this._subscriptions.forEach(sub => sub.enable())
this._bindings.forEach(sub => sub.enable())
} else if (!config.enabled && this._enabled) {
this._enabled = false
this._subscriptions.forEach(sub => sub.disable())
this._bindings.forEach(sub => sub.disable())
}
}
}
45 changes: 31 additions & 14 deletions packages/dd-trace/src/plugins/tracing.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,7 @@ class TracingPlugin extends Plugin {
this.component = this.constructor.component || this.constructor.id
this.operation = this.constructor.operation

this.addTraceSub('start', message => {
this.start(message)
})

this.addTraceSub('error', err => {
this.error(err)
})

this.addTraceSub('finish', message => {
this.finish(message)
})
this.addTraceSubs()
}

get activeSpan () {
Expand Down Expand Up @@ -62,8 +52,32 @@ class TracingPlugin extends Plugin {
this.addError(error)
}

addTraceSubs () {
const events = ['start', 'end', 'asyncStart', 'asyncEnd', 'error', 'finish']

for (const event of events) {
const bindName = `bind${event.charAt(0).toUpperCase()}${event.slice(1)}`

if (this[event]) {
this.addTraceSub(event, message => {
this[event](message)
})
}

if (this[bindName]) {
this.addTraceBind(event, message => this[bindName](message))
}
}
}

addTraceSub (eventName, handler) {
this.addSub(`apm:${this.component}:${this.operation}:${eventName}`, handler)
const prefix = this.constructor.prefix || `apm:${this.component}:${this.operation}`
this.addSub(`${prefix}:${eventName}`, handler)
}

addTraceBind (eventName, transform) {
const prefix = this.constructor.prefix || `apm:${this.component}:${this.operation}`
this.addBind(`${prefix}:${eventName}`, transform)
}

addError (error) {
Expand All @@ -74,7 +88,7 @@ class TracingPlugin extends Plugin {
}
}

startSpan (name, { childOf, kind, meta, metrics, service, resource, type } = {}) {
startSpan (name, { childOf, kind, meta, metrics, service, resource, type } = {}, enter = true) {
const store = storage.getStore()

if (store && childOf === undefined) {
Expand All @@ -97,7 +111,10 @@ class TracingPlugin extends Plugin {

analyticsSampler.sample(span, this.config.measured)

storage.enterWith({ ...store, span })
// TODO: Remove this after migration to TracingChannel is done.
if (enter) {
storage.enterWith({ ...store, span })
}

return span
}
Expand Down
64 changes: 64 additions & 0 deletions packages/diagnostics_channel/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,68 @@ if (major === '19' && minor === '9') {
}
}

if (!Channel.prototype.runStores) {
const ActiveChannelPrototype = getActiveChannelPrototype()

Channel.prototype.bindStore = ActiveChannelPrototype.bindStore = function (store, transform) {
if (!this._stores) {
this._stores = new Map()
}
this._stores.set(store, transform)
}

Channel.prototype.unbindStore = ActiveChannelPrototype.runStores = function (store) {
if (!this._stores) return
this._stores.delete(store)
}

Channel.prototype.runStores = ActiveChannelPrototype.runStores = function (data, fn, thisArg, ...args) {
if (!this._stores) return Reflect.apply(fn, thisArg, args)

let run = () => {
this.publish(data)
return Reflect.apply(fn, thisArg, args)
}

for (const entry of this._stores.entries()) {
const store = entry[0]
const transform = entry[1]
run = wrapStoreRun(store, data, run, transform)
}

return run()
}
}

function defaultTransform (data) {
return data
}

function wrapStoreRun (store, data, next, transform = defaultTransform) {
return () => {
let context
try {
context = transform(data)
} catch (err) {
process.nextTick(() => {
throw err
})
return next()
}

return store.run(context, next)
}
}

function getActiveChannelPrototype () {
const dummyChannel = channel('foo')
const listener = () => {}

dummyChannel.subscribe(listener)
const ActiveChannelPrototype = Object.getPrototypeOf(dummyChannel)
dummyChannel.unsubscribe(listener)

return ActiveChannelPrototype
}

module.exports = dc

0 comments on commit f45054c

Please sign in to comment.