Skip to content

Commit

Permalink
Convert grpc to use tracing channel (#3366)
Browse files Browse the repository at this point in the history
* Convert grpc to use tracing channel

* Allow addError to accept explicit span rather than delegating to activeSpan
  • Loading branch information
Stephen Belanger committed Jul 11, 2023
1 parent a5baa66 commit f154bcf
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 122 deletions.
86 changes: 44 additions & 42 deletions packages/datadog-instrumentations/src/grpc/client.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
'use strict'

const types = require('./types')
const { addHook, channel, AsyncResource } = require('../helpers/instrument')
const { addHook, channel } = require('../helpers/instrument')
const shimmer = require('../../../datadog-shimmer')

const patched = new WeakSet()
const instances = new WeakMap()

const startChannel = channel('apm:grpc:client:request:start')
const asyncStartChannel = channel('apm:grpc:client:request:asyncStart')
const errorChannel = channel('apm:grpc:client:request:error')
const finishChannel = channel('apm:grpc:client:request:finish')
const emitChannel = channel('apm:grpc:client:request:emit')

function createWrapMakeRequest (type) {
return function wrapMakeRequest (makeRequest) {
Expand Down Expand Up @@ -99,71 +101,71 @@ function wrapMethod (method, path, type) {
return wrapped
}

function wrapCallback (requestResource, parentResource, callback) {
function wrapCallback (ctx, callback = () => { }) {
return function (err) {
if (err) {
requestResource.runInAsyncScope(() => {
errorChannel.publish(err)
})
ctx.error = err
errorChannel.publish(ctx)
}

if (callback) {
return parentResource.runInAsyncScope(() => {
return callback.apply(this, arguments)
})
}
return asyncStartChannel.runStores(ctx, () => {
return callback.apply(this, arguments)
// No async end channel needed
})
}
}

function wrapStream (call, requestResource, parentResource) {
if (!call || typeof call.emit !== 'function') return

shimmer.wrap(call, 'emit', emit => {
return function (eventName, ...args) {
requestResource.runInAsyncScope(() => {
switch (eventName) {
case 'error':
errorChannel.publish(args[0])

break
case 'status':
finishChannel.publish(args[0])

break
}
})
function createWrapEmit (ctx) {
return function wrapEmit (emit) {
return function (event, arg1) {
switch (event) {
case 'error':
ctx.error = arg1
errorChannel.publish(ctx)
break
case 'status':
ctx.result = arg1
finishChannel.publish(ctx)
break
}

return parentResource.runInAsyncScope(() => {
return emitChannel.runStores(ctx, () => {
return emit.apply(this, arguments)
})
}
})
}
}

function callMethod (client, method, args, path, metadata, type) {
if (!startChannel.hasSubscribers) return method.apply(client, args)

const length = args.length
const callback = args[length - 1]
const parentResource = new AsyncResource('bound-anonymous-fn')
const requestResource = new AsyncResource('bound-anonymous-fn')

return requestResource.runInAsyncScope(() => {
startChannel.publish({ metadata, path, type })
const ctx = { metadata, path, type }

if (type === types.unary || type === types.client_stream) {
if (typeof callback === 'function') {
args[length - 1] = wrapCallback(requestResource, parentResource, callback)
} else {
args[length] = wrapCallback(requestResource, parentResource)
return startChannel.runStores(ctx, () => {
try {
if (type === types.unary || type === types.client_stream) {
if (typeof callback === 'function') {
args[length - 1] = wrapCallback(ctx, callback)
} else {
args[length] = wrapCallback(ctx)
}
}
}

const call = method.apply(client, args)
const call = method.apply(client, args)

wrapStream(call, requestResource, parentResource)
if (call && typeof call.emit === 'function') {
shimmer.wrap(call, 'emit', createWrapEmit(ctx))
}

return call
return call
} catch (e) {
ctx.error = e
errorChannel.publish(ctx)
}
// No end channel needed
})
}

Expand Down
129 changes: 69 additions & 60 deletions packages/datadog-instrumentations/src/grpc/server.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
'use strict'

const types = require('./types')
const { channel, addHook, AsyncResource } = require('../helpers/instrument')
const { channel, addHook } = require('../helpers/instrument')
const shimmer = require('../../../datadog-shimmer')

const startChannel = channel('apm:grpc:server:request:start')
const asyncStartChannel = channel('apm:grpc:server:request:asyncStart')
const errorChannel = channel('apm:grpc:server:request:error')
const updateChannel = channel('apm:grpc:server:request:update')
const finishChannel = channel('apm:grpc:server:request:finish')
const emitChannel = channel('apm:grpc:server:request:emit')

// https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
const OK = 0
Expand All @@ -31,28 +33,38 @@ function wrapHandler (func, name) {
const type = types[this.type]
const isStream = type !== 'unary'

const parentResource = new AsyncResource('bound-anonymous-fn')
const requestResource = new AsyncResource('bound-anonymous-fn')

return requestResource.runInAsyncScope(() => {
startChannel.publish({ name, metadata, type })

const onCancel = requestResource.bind(() => {
finishChannel.publish({ code: CANCELLED })
})

// Finish the span if the call was cancelled.
call.once('cancelled', onCancel)
const ctx = { name, metadata, type }

return startChannel.runStores(ctx, () => {
try {
const onCancel = () => {
ctx.code = CANCELLED
finishChannel.publish(ctx)
}

// Finish the span if the call was cancelled.
call.once('cancelled', onCancel)

if (isStream) {
wrapStream(call, ctx, onCancel)
} else {
arguments[1] = wrapCallback(callback, call, ctx, onCancel)
}

shimmer.wrap(call, 'emit', emit => {
return function () {
return emitChannel.runStores(ctx, () => {
return emit.apply(this, arguments)
})
}
})

if (isStream) {
wrapStream(call, requestResource, onCancel)
} else {
arguments[1] = wrapCallback(callback, call, requestResource, parentResource, onCancel)
return func.apply(this, arguments)
} catch (e) {
ctx.error = e
errorChannel.publish(ctx)
}

shimmer.wrap(call, 'emit', emit => requestResource.bind(emit))

return func.apply(this, arguments)
// No end channel needed
})
}
}
Expand All @@ -67,69 +79,66 @@ function wrapRegister (register) {
}
}

function wrapStream (call, requestResource, onCancel) {
if (call.call && call.call.sendStatus) {
call.call.sendStatus = wrapSendStatus(call.call.sendStatus, requestResource)
}

shimmer.wrap(call, 'emit', emit => {
return function (eventName, ...args) {
switch (eventName) {
function createWrapEmit (call, ctx, onCancel) {
return function wrapEmit (emit) {
return function (event, arg1) {
switch (event) {
case 'error':
errorChannel.publish(args[0])
finishChannel.publish({ code: args[0].code })

ctx.error = arg1
errorChannel.publish(ctx)
ctx.code = arg1.code
finishChannel.publish(ctx)
call.removeListener('cancelled', onCancel)

break

// Finish the span of the response only if it was successful.
// Otherwise it'll be finished in the `error` listener.
case 'finish':
if (call.status) {
updateChannel.publish(call.status)
}

if (!call.status || call.status.code === 0) {
finishChannel.publish()
finishChannel.publish(ctx)
}

call.removeListener('cancelled', onCancel)

break
}

return emit.apply(this, arguments)
}
})
}
}

function wrapCallback (callback, call, requestResource, parentResource, onCancel) {
return function (err, value, trailer, flags) {
requestResource.runInAsyncScope(() => {
if (err) {
errorChannel.publish(err)
finishChannel.publish(err)
} else {
finishChannel.publish({ code: OK, trailer })
}
function wrapStream (call, ctx, onCancel) {
if (call.call && call.call.sendStatus) {
call.call.sendStatus = wrapSendStatus(call.call.sendStatus, ctx)
}

call.removeListener('cancelled', onCancel)
})
shimmer.wrap(call, 'emit', createWrapEmit(call, ctx, onCancel))
}

if (callback) {
return parentResource.runInAsyncScope(() => {
return callback.apply(this, arguments)
})
function wrapCallback (callback = () => {}, call, ctx, onCancel) {
return function (err, value, trailer, flags) {
if (err) {
ctx.error = err
errorChannel.publish(ctx)
} else {
ctx.code = OK
ctx.trailer = trailer
}

finishChannel.publish(ctx)

call.removeListener('cancelled', onCancel)

return asyncStartChannel.runStores(ctx, () => {
return callback.apply(this, arguments)
// No async end channel needed
})
}
}

function wrapSendStatus (sendStatus, requestResource) {
function wrapSendStatus (sendStatus, ctx) {
return function (status) {
requestResource.runInAsyncScope(() => {
updateChannel.publish(status)
})
ctx.status = status
updateChannel.publish(ctx)

return sendStatus.apply(this, arguments)
}
Expand Down
40 changes: 29 additions & 11 deletions packages/datadog-plugin-grpc/src/client.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,27 @@
'use strict'

const { storage } = require('../../datadog-core')
const ClientPlugin = require('../../dd-trace/src/plugins/client')
const { TEXT_MAP } = require('../../../ext/formats')
const { addMetadataTags, getFilter, getMethodMetadata } = require('./util')

class GrpcClientPlugin extends ClientPlugin {
static get id () { return 'grpc' }
static get operation () { return 'client:request' }
static get prefix () { return `apm:grpc:client:request` }
static get peerServicePrecursors () { return ['rpc.service'] }

start ({ metadata, path, type }) {
constructor (...args) {
super(...args)

this.addTraceBind('emit', ({ parentStore }) => {
return parentStore
})
}

bindStart (message) {
const store = storage.getStore()
const { metadata, path, type } = message
const metadataFilter = this.config.metadataFilter
const method = getMethodMetadata(path, type)
const span = this.startSpan(this.operationName(), {
Expand All @@ -28,7 +40,7 @@ class GrpcClientPlugin extends ClientPlugin {
metrics: {
'grpc.status.code': 0
}
})
}, false)

// needed as precursor for peer.service
if (method.service && method.package) {
Expand All @@ -39,22 +51,27 @@ class GrpcClientPlugin extends ClientPlugin {
addMetadataTags(span, metadata, metadataFilter, 'request')
inject(this.tracer, span, metadata)
}
}

error (error) {
const span = this.activeSpan
message.span = span
message.parentStore = store
message.currentStore = { ...store, span }

if (!span) return
return message.currentStore
}

this.addCode(span, error.code)
this.addError(error)
bindAsyncStart ({ parentStore }) {
return parentStore
}

finish ({ code, metadata }) {
const span = this.activeSpan
error ({ span, error }) {
this.addCode(span, error.code)
this.addError(error, span)
}

finish ({ span, result }) {
if (!span) return

const { code, metadata } = result || {}
const metadataFilter = this.config.metadataFilter

this.addCode(span, code)
Expand All @@ -63,7 +80,8 @@ class GrpcClientPlugin extends ClientPlugin {
addMetadataTags(span, metadata, metadataFilter, 'response')
}

super.finish()
this.tagPeerService(span)
span.finish()
}

configure (config) {
Expand Down
Loading

0 comments on commit f154bcf

Please sign in to comment.