diff --git a/packages/datadog-instrumentations/src/http2/client.js b/packages/datadog-instrumentations/src/http2/client.js index 0a4d9ffe0ea..e5a3cd9f4ad 100644 --- a/packages/datadog-instrumentations/src/http2/client.js +++ b/packages/datadog-instrumentations/src/http2/client.js @@ -1,32 +1,27 @@ 'use strict' const shimmer = require('../../../datadog-shimmer') -const { addHook, channel, AsyncResource } = require('../helpers/instrument') +const { addHook, channel } = require('../helpers/instrument') const connectChannel = channel('apm:http2:client:connect:start') const startChannel = channel('apm:http2:client:request:start') -const finishChannel = channel('apm:http2:client:request:finish') +const endChannel = channel('apm:http2:client:request:end') +const asyncStartChannel = channel('apm:http2:client:request:asyncStart') +const asyncEndChannel = channel('apm:http2:client:request:asyncEnd') const errorChannel = channel('apm:http2:client:request:error') -const responseChannel = channel('apm:http2:client:response') -function createWrapEmit (requestResource, parentResource) { +function createWrapEmit (ctx) { return function wrapEmit (emit) { return function (event, arg1) { - requestResource.runInAsyncScope(() => { - switch (event) { - case 'response': - responseChannel.publish(arg1) - break - case 'error': - errorChannel.publish(arg1) - case 'close': // eslint-disable-line no-fallthrough - finishChannel.publish() - break - } - }) + ctx.eventName = event + ctx.eventData = arg1 - return parentResource.runInAsyncScope(() => { - return emit.apply(this, arguments) + return asyncStartChannel.runStores(ctx, () => { + try { + return emit.apply(this, arguments) + } finally { + asyncEndChannel.publish(ctx) + } }) } } @@ -35,17 +30,21 @@ function createWrapEmit (requestResource, parentResource) { function createWrapRequest (authority, options) { return function wrapRequest (request) { return function (headers) { - const parentResource = new AsyncResource('bound-anonymous-fn') - const requestResource = new AsyncResource('bound-anonymous-fn') + const ctx = { headers, authority, options } - return requestResource.runInAsyncScope(() => { - startChannel.publish({ headers, authority, options }) + return startChannel.runStores(ctx, () => { + try { + const req = request.apply(this, arguments) - const req = request.apply(this, arguments) + shimmer.wrap(req, 'emit', createWrapEmit(ctx)) - shimmer.wrap(req, 'emit', createWrapEmit(requestResource, parentResource)) - - return req + return req + } catch (e) { + ctx.error = e + errorChannel.publish(ctx) + } finally { + endChannel.publish(ctx) + } }) } } diff --git a/packages/datadog-plugin-http2/src/client.js b/packages/datadog-plugin-http2/src/client.js index e33107abf67..5de0ba163c6 100644 --- a/packages/datadog-plugin-http2/src/client.js +++ b/packages/datadog-plugin-http2/src/client.js @@ -8,7 +8,6 @@ const log = require('../../dd-trace/src/log') const tags = require('../../../ext/tags') const kinds = require('../../../ext/kinds') const formats = require('../../../ext/formats') -const analyticsSampler = require('../../dd-trace/src/analytics_sampler') const { COMPONENT, CLIENT_PORT_KEY } = require('../../dd-trace/src/constants') const urlFilter = require('../../dd-trace/src/plugins/util/urlfilter') @@ -25,32 +24,11 @@ const HTTP2_HEADER_STATUS = ':status' const HTTP2_METHOD_GET = 'GET' class Http2ClientPlugin extends ClientPlugin { - static get id () { - return 'http2' - } - - constructor (...args) { - super(...args) - - this.addSub('apm:http2:client:response', (headers) => { - const span = storage.getStore().span - const status = headers && headers[HTTP2_HEADER_STATUS] - - span.setTag(HTTP_STATUS_CODE, status) - - if (!this.config.validateStatus(status)) { - this.addError() - } + static get id () { return 'http2' } + static get prefix () { return `apm:http2:client:request` } - addHeaderTags(span, headers, HTTP_RESPONSE_HEADERS, this.config) - }) - } - - addTraceSub (eventName, handler) { - this.addSub(`apm:${this.constructor.id}:client:${this.operation}:${eventName}`, handler) - } - - start ({ authority, options, headers = {} }) { + bindStart (message) { + const { authority, options, headers = {} } = message const sessionDetails = extractSessionDetails(authority, options) const path = headers[HTTP2_HEADER_PATH] || '/' const pathname = path.split(/[?#]/)[0] @@ -75,7 +53,7 @@ class Http2ClientPlugin extends ClientPlugin { metrics: { [CLIENT_PORT_KEY]: parseInt(sessionDetails.port) } - }) + }, false) // TODO: Figure out a better way to do this for any span. if (!allowed) { @@ -88,14 +66,53 @@ class Http2ClientPlugin extends ClientPlugin { this.tracer.inject(span, HTTP_HEADERS, headers) } - analyticsSampler.sample(span, this.config.measured) + message.parentStore = store + message.currentStore = { ...store, span } + + return message.currentStore + } + + bindAsyncStart ({ eventName, eventData, currentStore, parentStore }) { + switch (eventName) { + case 'response': + this._onResponse(currentStore, eventData) + return parentStore + case 'error': + this._onError(currentStore, eventData) + return parentStore + case 'close': + this._onClose(currentStore, eventData) + return parentStore + } - this.enter(span, store) + return storage.getStore() } configure (config) { return super.configure(normalizeConfig(config)) } + + _onResponse (store, headers) { + const status = headers && headers[HTTP2_HEADER_STATUS] + + store.span.setTag(HTTP_STATUS_CODE, status) + + if (!this.config.validateStatus(status)) { + storage.run(store, () => this.addError()) + } + + addHeaderTags(store.span, headers, HTTP_RESPONSE_HEADERS, this.config) + } + + _onError ({ span }, error) { + span.setTag('error', error) + span.finish() + } + + _onClose ({ span }) { + this.tagPeerService(span) + span.finish() + } } function extractSessionDetails (authority, options) { diff --git a/packages/dd-trace/src/plugins/outbound.js b/packages/dd-trace/src/plugins/outbound.js index 43a23808df3..03863174ef9 100644 --- a/packages/dd-trace/src/plugins/outbound.js +++ b/packages/dd-trace/src/plugins/outbound.js @@ -62,13 +62,17 @@ class OutboundPlugin extends TracingPlugin { finish () { const span = this.activeSpan + this.tagPeerService(span) + super.finish(...arguments) + } + + tagPeerService (span) { if (this.tracer._computePeerService) { const peerData = this.getPeerService(span.context()._tags) if (peerData) { span.addTags(peerData) } } - super.finish(...arguments) } connect (url) { diff --git a/packages/dd-trace/src/plugins/plugin.js b/packages/dd-trace/src/plugins/plugin.js index 2b8dc9c9c5d..f56b2d0c5b6 100644 --- a/packages/dd-trace/src/plugins/plugin.js +++ b/packages/dd-trace/src/plugins/plugin.js @@ -25,9 +25,31 @@ class Subscription { } } +class StoreBinding { + constructor (event, transform) { + this._channel = dc.channel(event) + this._transform = data => { + const store = storage.getStore() + + return !store || !store.noop + ? transform(data) + : store + } + } + + enable () { + this._channel.bindStore(storage, this._transform) + } + + disable () { + this._channel.unbindStore(storage, this._transform) + } +} + module.exports = class Plugin { constructor (tracer, tracerConfig) { this._subscriptions = [] + this._bindings = [] this._enabled = false this._tracer = tracer this.config = {} // plugin-specific configuration, unset until .configure() is called @@ -53,6 +75,10 @@ module.exports = class Plugin { this._subscriptions.push(new Subscription(channelName, handler)) } + addBind (channelName, transform) { + this._bindings.push(new StoreBinding(channelName, transform)) + } + addError (error) { const store = storage.getStore() @@ -71,9 +97,11 @@ module.exports = class Plugin { if (config.enabled && !this._enabled) { this._enabled = true this._subscriptions.forEach(sub => sub.enable()) + this._bindings.forEach(sub => sub.enable()) } else if (!config.enabled && this._enabled) { this._enabled = false this._subscriptions.forEach(sub => sub.disable()) + this._bindings.forEach(sub => sub.disable()) } } } diff --git a/packages/dd-trace/src/plugins/tracing.js b/packages/dd-trace/src/plugins/tracing.js index 8260982ce9d..eb38cf945f0 100644 --- a/packages/dd-trace/src/plugins/tracing.js +++ b/packages/dd-trace/src/plugins/tracing.js @@ -13,17 +13,7 @@ class TracingPlugin extends Plugin { this.component = this.constructor.component || this.constructor.id this.operation = this.constructor.operation - this.addTraceSub('start', message => { - this.start(message) - }) - - this.addTraceSub('error', err => { - this.error(err) - }) - - this.addTraceSub('finish', message => { - this.finish(message) - }) + this.addTraceSubs() } get activeSpan () { @@ -65,19 +55,45 @@ class TracingPlugin extends Plugin { this.addError(error) } + addTraceSubs () { + const events = ['start', 'end', 'asyncStart', 'asyncEnd', 'error', 'finish'] + + for (const event of events) { + const bindName = `bind${event.charAt(0).toUpperCase()}${event.slice(1)}` + + if (this[event]) { + this.addTraceSub(event, message => { + this[event](message) + }) + } + + if (this[bindName]) { + this.addTraceBind(event, message => this[bindName](message)) + } + } + } + addTraceSub (eventName, handler) { - this.addSub(`apm:${this.component}:${this.operation}:${eventName}`, handler) + const prefix = this.constructor.prefix || `apm:${this.component}:${this.operation}` + this.addSub(`${prefix}:${eventName}`, handler) + } + + addTraceBind (eventName, transform) { + const prefix = this.constructor.prefix || `apm:${this.component}:${this.operation}` + this.addBind(`${prefix}:${eventName}`, transform) } addError (error) { const span = this.activeSpan if (!span._spanContext._tags['error']) { + // Errors may be wrapped in a context. + error = (error && error.error) || error span.setTag('error', error || 1) } } - startSpan (name, { childOf, kind, meta, metrics, service, resource, type } = {}) { + startSpan (name, { childOf, kind, meta, metrics, service, resource, type } = {}, enter = true) { const store = storage.getStore() if (store && childOf === undefined) { @@ -100,7 +116,10 @@ class TracingPlugin extends Plugin { analyticsSampler.sample(span, this.config.measured) - storage.enterWith({ ...store, span }) + // TODO: Remove this after migration to TracingChannel is done. + if (enter) { + storage.enterWith({ ...store, span }) + } return span } diff --git a/packages/diagnostics_channel/src/index.js b/packages/diagnostics_channel/src/index.js index ead90759dcc..eb6f175ea24 100644 --- a/packages/diagnostics_channel/src/index.js +++ b/packages/diagnostics_channel/src/index.js @@ -54,4 +54,68 @@ if (major === '19' && minor === '9') { } } +if (!Channel.prototype.runStores) { + const ActiveChannelPrototype = getActiveChannelPrototype() + + Channel.prototype.bindStore = ActiveChannelPrototype.bindStore = function (store, transform) { + if (!this._stores) { + this._stores = new Map() + } + this._stores.set(store, transform) + } + + Channel.prototype.unbindStore = ActiveChannelPrototype.runStores = function (store) { + if (!this._stores) return + this._stores.delete(store) + } + + Channel.prototype.runStores = ActiveChannelPrototype.runStores = function (data, fn, thisArg, ...args) { + if (!this._stores) return Reflect.apply(fn, thisArg, args) + + let run = () => { + this.publish(data) + return Reflect.apply(fn, thisArg, args) + } + + for (const entry of this._stores.entries()) { + const store = entry[0] + const transform = entry[1] + run = wrapStoreRun(store, data, run, transform) + } + + return run() + } +} + +function defaultTransform (data) { + return data +} + +function wrapStoreRun (store, data, next, transform = defaultTransform) { + return () => { + let context + try { + context = transform(data) + } catch (err) { + process.nextTick(() => { + throw err + }) + return next() + } + + return store.run(context, next) + } +} + +function getActiveChannelPrototype () { + const dummyChannel = channel('foo') + const listener = () => {} + + dummyChannel.subscribe(listener) + const ActiveChannelPrototype = Object.getPrototypeOf(dummyChannel) + dummyChannel.unsubscribe(listener) + + return ActiveChannelPrototype +} + module.exports = dc