From 71cd5c317879bf487eb67f065adfb9409cc0f0d4 Mon Sep 17 00:00:00 2001 From: Kelvin Jin Date: Thu, 9 Mar 2017 16:18:22 -0800 Subject: [PATCH] Distributed tracing support in gRPC (#436) PR-URL: #436 --- src/plugins/plugin-grpc.js | 117 ++++++++++++++++-------- src/trace-agent.js | 23 ++--- src/util.js | 28 ++++++ test/plugins/test-trace-grpc.js | 154 +++++++++++++++++++++++++------- 4 files changed, 236 insertions(+), 86 deletions(-) diff --git a/src/plugins/plugin-grpc.js b/src/plugins/plugin-grpc.js index 644727b74..592bd27b7 100644 --- a/src/plugins/plugin-grpc.js +++ b/src/plugins/plugin-grpc.js @@ -21,6 +21,17 @@ var findIndex = require('lodash.findindex'); var SKIP_FRAMES = 3; +// Required for adding distributed tracing metadata to outgoing gRPC requests. +// This value is assigned in patchMetadata, and used in patchClient. +// patchMetadata is guaranteed to be called before patchClient because Client +// depends on Metadata. +var Metadata; + +function patchMetadata(metadata, api) { + // metadata is the value of module.exports of src/node/src/metadata.js + Metadata = metadata; +} + function patchClient(client, api) { /** * Wraps a callback so that the current span for this trace is also ended when @@ -59,37 +70,57 @@ function patchClient(client, api) { // doesn't exist. return method.apply(this, arguments); } + var args = Array.prototype.slice.call(arguments); // Check if the response is through a stream or a callback. if (!method.responseStream) { // We need to wrap the callback with the context, to propagate it. // The callback is always required. It should be the only function in // the arguments, since we cannot send a function as an argument through // gRPC. - var cbIndex = findIndex(arguments, function(arg) { + var cbIndex = findIndex(args, function(arg) { return typeof arg === 'function'; }); if (cbIndex !== -1) { - arguments[cbIndex] = wrapCallback(span, arguments[cbIndex]); + args[cbIndex] = wrapCallback(span, args[cbIndex]); } } - var call = method.apply(this, arguments); + // This finds an instance of Metadata among the arguments. + // A possible issue that could occur is if the 'options' parameter from + // the user contains an '_internal_repr' as well as a 'getMap' function, + // but this is an extremely rare case. + var metaIndex = findIndex(args, function(arg) { + return arg && typeof arg === 'object' && arg._internal_repr && + typeof arg.getMap === 'function'; + }); + if (metaIndex === -1) { + var metadata = new Metadata(); + if (!method.requestStream) { + // unary or server stream + if (args.length === 0) { + // No argument (for the gRPC call) was provided, so we will have to + // provide one, since metadata cannot be the first argument. + // The internal representation of argument defaults to undefined + // in its non-presence. + // Note that we can't pass null instead of undefined because the + // serializer within gRPC doesn't accept it. + args.push(undefined); + } + metaIndex = 1; + } else { + // client stream or bidi + metaIndex = 0; + } + args.splice(metaIndex, 0, metadata); + } + args[metaIndex].set(api.constants.TRACE_CONTEXT_HEADER_NAME, + span.getTraceContext()); + var call = method.apply(this, args); // Add extra data only when call successfully goes through. At this point // we know that the arguments are correct. if (api.enhancedDatabaseReportingEnabled()) { - // This finds an instance of Metadata among the arguments. - // A possible issue that could occur is if the 'options' parameter from - // the user contains an '_internal_repr' as well as a 'getMap' function, - // but this is an extremely rare case. - var metaIndex = findIndex(arguments, function(arg) { - return arg && typeof arg === 'object' && arg._internal_repr && - typeof arg.getMap === 'function'; - }); - if (metaIndex !== -1) { - var metadata = arguments[metaIndex]; - span.addLabel('metadata', JSON.stringify(metadata.getMap())); - } + span.addLabel('metadata', JSON.stringify(args[metaIndex].getMap())); if (!method.requestStream) { - span.addLabel('argument', JSON.stringify(arguments[0])); + span.addLabel('argument', JSON.stringify(args[0])); } } // The user might need the current context in listeners to this stream. @@ -142,6 +173,8 @@ function unpatchClient(client) { } function patchServer(server, api) { + var traceContextHeaderName = api.constants.TRACE_CONTEXT_HEADER_NAME; + /** * A helper function to record metadata in a trace span. The return value of * this function can be used as the 'wrapper' argument to wrap sendMetadata. @@ -171,13 +204,14 @@ function patchServer(server, api) { // We wrap it so that a span is started immediately beforehand, and ended // when the callback provided to it as an argument is invoked. shimmer.wrap(handlerSet, 'func', function (serverMethod) { - var rootSpanOptions = { - name: requestName, - url: requestName, - skipFrames: SKIP_FRAMES - }; return function serverMethodTrace(call, callback) { var that = this; + var rootSpanOptions = { + name: requestName, + url: requestName, + traceContext: call.metadata.getMap()[traceContextHeaderName], + skipFrames: SKIP_FRAMES + }; return api.runInRootSpan(rootSpanOptions, function(rootSpan) { if (!rootSpan) { return serverMethod.call(that, call, callback); @@ -221,13 +255,14 @@ function patchServer(server, api) { // We wrap it so that a span is started immediately beforehand, and ended // when there is no data to be sent from the server. shimmer.wrap(handlerSet, 'func', function (serverMethod) { - var rootSpanOptions = { - name: requestName, - url: requestName, - skipFrames: SKIP_FRAMES - }; return function serverMethodTrace(stream) { var that = this; + var rootSpanOptions = { + name: requestName, + url: requestName, + traceContext: stream.metadata.getMap()[traceContextHeaderName], + skipFrames: SKIP_FRAMES + }; return api.runInRootSpan(rootSpanOptions, function(rootSpan) { if (!rootSpan) { return serverMethod.call(that, stream); @@ -280,13 +315,14 @@ function patchServer(server, api) { // We wrap it so that a span is started immediately beforehand, and ended // when the callback provided to it as an argument is invoked. shimmer.wrap(handlerSet, 'func', function (serverMethod) { - var rootSpanOptions = { - name: requestName, - url: requestName, - skipFrames: SKIP_FRAMES - }; return function serverMethodTrace(stream, callback) { var that = this; + var rootSpanOptions = { + name: requestName, + url: requestName, + traceContext: stream.metadata.getMap()[traceContextHeaderName], + skipFrames: SKIP_FRAMES + }; return api.runInRootSpan(rootSpanOptions, function(rootSpan) { if (!rootSpan) { return serverMethod.call(that, stream, callback); @@ -336,13 +372,14 @@ function patchServer(server, api) { // We wrap it so that a span is started immediately beforehand, and ended // when there is no data to be sent from the server. shimmer.wrap(handlerSet, 'func', function (serverMethod) { - var rootSpanOptions = { - name: requestName, - url: requestName, - skipFrames: SKIP_FRAMES - }; return function serverMethodTrace(stream) { var that = this; + var rootSpanOptions = { + name: requestName, + url: requestName, + traceContext: stream.metadata.getMap()[traceContextHeaderName], + skipFrames: SKIP_FRAMES + }; return api.runInRootSpan(rootSpanOptions, function(rootSpan) { if (!rootSpan) { return serverMethod.call(that, stream); @@ -441,6 +478,14 @@ module.exports = [ patch: patchClient, unpatch: unpatchClient }, + { + file: 'src/node/src/metadata.js', + versions: SUPPORTED_VERSIONS, + patch: patchMetadata, + // patchMetadata doesn't modify the module exports of metadata.js. + // So it's safe to have provide a no-op unpatch function. + unpatch: function unpatchMetadata() {} + }, { file: 'src/node/src/server.js', versions: SUPPORTED_VERSIONS, diff --git a/src/trace-agent.js b/src/trace-agent.js index 594d0a42b..8e33c5c1d 100644 --- a/src/trace-agent.js +++ b/src/trace-agent.js @@ -24,6 +24,7 @@ var uuid = require('uuid'); var constants = require('./constants.js'); var tracingPolicy = require('./tracing-policy.js'); var isEqual = require('lodash.isequal'); +var util = require('./util.js'); /** @type {TraceAgent} */ var traceAgent; @@ -164,11 +165,9 @@ TraceAgent.prototype.isTraceAgentRequest = function(options) { }; /** - * Parse a cookie-style header string to extract traceId, spandId and options - * ex: '123456/667;o=3' - * -> {traceId: '123456', spanId: '667', options: '3'} - * note that we ignore trailing garbage if there is more than one '=' - * Returns null if traceId or spanId could not be found. + * Parse a cookie-style header string to extract traceId, spandId and options, + * or returns null if the agent has been configured to ignore it. + * @see util.parseContextFromHeader * * @param {string} str string representation of the trace headers * @return {?{traceId: string, spanId: string, options: number}} @@ -178,19 +177,7 @@ TraceAgent.prototype.parseContextFromHeader = function(str) { if (this.config_.ignoreContextHeader) { return null; } - if (!str) { - return null; - } - var matches = str.match(/^([0-9a-fA-F]+)(?:\/([0-9a-fA-F]+))?(?:;o=(.*))?/); - if (!matches || matches.length !== 4 || matches[0] !== str || - (matches[2] && isNaN(matches[2]))) { - return null; - } - return { - traceId: matches[1], - spanId: matches[2], - options: Number(matches[3]) - }; + return util.parseContextFromHeader(str); }; /** diff --git a/src/util.js b/src/util.js index e68e05203..0abc8bc62 100644 --- a/src/util.js +++ b/src/util.js @@ -50,6 +50,33 @@ var moduleRegex = new RegExp( ']*).*' ); +/** + * Parse a cookie-style header string to extract traceId, spandId and options + * ex: '123456/667;o=3' + * -> {traceId: '123456', spanId: '667', options: '3'} + * note that we ignore trailing garbage if there is more than one '=' + * Returns null if traceId or spanId could not be found. + * + * @param {string} str string representation of the trace headers + * @return {?{traceId: string, spanId: string, options: number}} + * object with keys. null if there is a problem. + */ +function parseContextFromHeader(str) { + if (!str) { + return null; + } + var matches = str.match(/^([0-9a-fA-F]+)(?:\/([0-9a-fA-F]+))?(?:;o=(.*))?/); + if (!matches || matches.length !== 4 || matches[0] !== str || + (matches[2] && isNaN(matches[2]))) { + return null; + } + return { + traceId: matches[1], + spanId: matches[2], + options: Number(matches[3]) + }; +} + /** * Retrieves a package name from the full import path. * For example: @@ -100,6 +127,7 @@ function findModuleVersion(modulePath, load) { module.exports = { truncate: truncate, + parseContextFromHeader: parseContextFromHeader, packageNameFromPath: packageNameFromPath, findModulePath: findModulePath, findModuleVersion: findModuleVersion diff --git a/test/plugins/test-trace-grpc.js b/test/plugins/test-trace-grpc.js index 2ccfb8438..16c229d60 100644 --- a/test/plugins/test-trace-grpc.js +++ b/test/plugins/test-trace-grpc.js @@ -16,6 +16,9 @@ 'use strict'; var assert = require('assert'); +var cls = require('../../src/cls.js'); +var util = require('../../src/util.js'); +var constants = require('../../src/constants.js'); var traceLabels = require('../../src/trace-labels.js'); var versions = { @@ -30,10 +33,29 @@ var grpcPort = 50051; var SEND_METADATA = 131; var EMIT_ERROR = 13412; +// Regular expression matching client-side metadata labels +var metadataRegExp = + /^{"a":"b","x-cloud-trace-context":"[a-f0-9]{32}\/[0-9]+;o=1"}$/; + +// Whether asserts in checkServerMetadata should be run +// Turned on only for the test that checks propagated tract context +var checkMetadata; + +function checkServerMetadata(metadata) { + if (checkMetadata) { + var traceContext = metadata.getMap()[constants.TRACE_CONTEXT_HEADER_NAME]; + assert.ok(/[a-f0-9]{32}\/[0-9]+;o=1/.test(traceContext)); + var parsedContext = util.parseContextFromHeader(traceContext); + var root = cls.getNamespace().get('root'); + assert.strictEqual(root.span.parentSpanId, parsedContext.spanId); + } +} + function startServer(proto, grpc, common, agent, metadata, trailing_metadata) { var _server = new grpc.Server(); _server.addProtoService(proto.Tester.service, { testUnary: function(call, cb) { + checkServerMetadata(call.metadata); if (call.request.n === EMIT_ERROR) { common.createChildSpan(agent, function () { cb(new Error('test')); @@ -50,6 +72,7 @@ function startServer(proto, grpc, common, agent, metadata, trailing_metadata) { } }, testClientStream: function(call, cb) { + checkServerMetadata(call.metadata); var sum = 0; var triggerCb = function () { cb(null, {n: sum}); @@ -82,6 +105,7 @@ function startServer(proto, grpc, common, agent, metadata, trailing_metadata) { }); }, testServerStream: function(stream) { + checkServerMetadata(stream.metadata); if (stream.request.n === EMIT_ERROR) { common.createChildSpan(agent, function () { stream.emit('error', new Error('test')); @@ -99,6 +123,7 @@ function startServer(proto, grpc, common, agent, metadata, trailing_metadata) { } }, testBidiStream: function(stream) { + checkServerMetadata(stream.metadata); var sum = 0; var stopChildSpan; var t = setTimeout(function() { @@ -140,28 +165,55 @@ function createClient(proto, grpc) { grpc.credentials.createInsecure()); } -function callUnary(client, cb) { - client.testUnary({n: 42}, function(err, result) { - assert.ifError(err); - assert.strictEqual(result.n, 42); - cb(); - }); +function callUnary(client, grpc, metadata, cb) { + var args = [ + {n: 42}, + function(err, result) { + assert.ifError(err); + assert.strictEqual(result.n, 42); + cb(); + } + ]; + if (Object.keys(metadata).length > 0) { + var m = new grpc.Metadata(); + for (var key in metadata) { + m.add(key, metadata[key]); + } + args.splice(1, 0, m); + } + client.testUnary.apply(client, args); } -function callClientStream(client, cb) { - var stream = client.testClientStream(function(err, result) { +function callClientStream(client, grpc, metadata, cb) { + var args = [function(err, result) { assert.ifError(err); assert.strictEqual(result.n, 45); cb(); - }); + }]; + if (Object.keys(metadata).length > 0) { + var m = new grpc.Metadata(); + for (var key in metadata) { + m.set(key, metadata[key]); + } + args.unshift(m); + } + var stream = client.testClientStream.apply(client, args); for (var i = 0; i < 10; ++i) { stream.write({n: i}); } stream.end(); } -function callServerStream(client, grpc, cb) { - var stream = client.testServerStream({n: 42}); +function callServerStream(client, grpc, metadata, cb) { + var args = [ {n: 42} ]; + if (Object.keys(metadata).length > 0) { + var m = new grpc.Metadata(); + for (var key in metadata) { + m.add(key, metadata[key]); + } + args.push(m); + } + var stream = client.testServerStream.apply(client, args); var sum = 0; stream.on('data', function(data) { sum += data.n; @@ -173,8 +225,16 @@ function callServerStream(client, grpc, cb) { }); } -function callBidi(client, grpc, cb) { - var stream = client.testBidiStream(); +function callBidi(client, grpc, metadata, cb) { + var args = []; + if (Object.keys(metadata).length > 0) { + var m = new grpc.Metadata(); + for (var key in metadata) { + m.add(key, metadata[key]); + } + args.push(m); + } + var stream = client.testBidiStream.apply(client, args); var sum = 0; stream.on('data', function(data) { sum += data.n; @@ -229,12 +289,14 @@ Object.keys(versions).forEach(function(version) { afterEach(function() { common.cleanTraces(agent); + common.clearNamespace(agent); + checkMetadata = false; }); it('should accurately measure time for unary requests', function(done) { var start = Date.now(); common.runInTransaction(agent, function(endTransaction) { - callUnary(client, function() { + callUnary(client, grpc, {}, function() { endTransaction(); var assertTraceProperties = function(predicate) { var trace = common.getMatchingSpan(agent, predicate); @@ -256,7 +318,7 @@ Object.keys(versions).forEach(function(version) { it('should accurately measure time for client streaming requests', function(done) { var start = Date.now(); common.runInTransaction(agent, function(endTransaction) { - callClientStream(client, function() { + callClientStream(client, grpc, {}, function() { endTransaction(); var assertTraceProperties = function(predicate) { var trace = common.getMatchingSpan(agent, predicate); @@ -276,7 +338,7 @@ Object.keys(versions).forEach(function(version) { it('should accurately measure time for server streaming requests', function(done) { var start = Date.now(); common.runInTransaction(agent, function(endTransaction) { - callServerStream(client, grpc, function() { + callServerStream(client, grpc, {}, function() { endTransaction(); var assertTraceProperties = function(predicate) { var trace = common.getMatchingSpan(agent, predicate); @@ -299,7 +361,7 @@ Object.keys(versions).forEach(function(version) { it('should accurately measure time for bidi streaming requests', function(done) { var start = Date.now(); common.runInTransaction(agent, function(endTransaction) { - callBidi(client, grpc, function() { + callBidi(client, grpc, {}, function() { endTransaction(); var assertTraceProperties = function(predicate) { var trace = common.getMatchingSpan(agent, predicate); @@ -319,7 +381,7 @@ Object.keys(versions).forEach(function(version) { }); it('should not break if no parent transaction', function(done) { - callUnary(client, function() { + callUnary(client, grpc, {}, function() { assert.strictEqual(common.getMatchingSpans(agent, grpcClientPredicate).length, 0); done(); }); @@ -340,13 +402,41 @@ Object.keys(versions).forEach(function(version) { assert.deepEqual(args[0], [prefix + 'BidiStream', undefined]); done(); }; - next = callUnary.bind(null, client, next); - next = callClientStream.bind(null, client, next); - next = callServerStream.bind(null, client, grpc, next); - next = callBidi.bind(null, client, grpc, next); + next = callUnary.bind(null, client, grpc, {}, next); + next = callClientStream.bind(null, client, grpc, {}, next); + next = callServerStream.bind(null, client, grpc, {}, next); + next = callBidi.bind(null, client, grpc, {}, next); next(); }); + it('should support distributed trace context', function(done) { + // Enable asserting properties of the metdata on the grpc server. + checkMetadata = true; + common.runInTransaction(agent, function (endTransaction) { + var metadata = { a: 'b' }; + var next = function() { + endTransaction(); + checkMetadata = false; + done(); + }; + // Try without supplying metadata (call* will not supply metadata to + // the grpc client methods at all if no fields are present). + // The plugin should automatically create a new Metadata object and + // populate it with trace context data accordingly. + next = callUnary.bind(null, client, grpc, {}, next); + next = callClientStream.bind(null, client, grpc, {}, next); + next = callServerStream.bind(null, client, grpc, {}, next); + next = callBidi.bind(null, client, grpc, {}, next); + // Try with metadata. The plugin should simply add trace context data + // to it. + next = callUnary.bind(null, client, grpc, metadata, next); + next = callClientStream.bind(null, client, grpc, metadata, next); + next = callServerStream.bind(null, client, grpc, metadata, next); + next = callBidi.bind(null, client, grpc, metadata, next); + next(); + }); + }); + it('should not let root spans interfere with one another', function(done) { this.timeout(8000); var next = done; @@ -387,10 +477,10 @@ Object.keys(versions).forEach(function(version) { }; // Call queueCallTogether with every possible pair of gRPC calls. - var methods = [ callUnary.bind(null, client), - callClientStream.bind(null, client), - callServerStream.bind(null, client, grpc), - callBidi.bind(null, client, grpc) ]; + var methods = [ callUnary.bind(null, client, grpc, {}), + callClientStream.bind(null, client, grpc, {}), + callServerStream.bind(null, client, grpc, {}), + callBidi.bind(null, client, grpc, {}) ]; for (var m of methods) { for (var n of methods) { queueCallTogether(m, n); @@ -517,7 +607,7 @@ Object.keys(versions).forEach(function(version) { var trace = common.getMatchingSpan(agent, predicate); assert(trace); common.assertDurationCorrect(agent, Date.now() - start, predicate); - assert.strictEqual(trace.labels.metadata, '{"a":"b"}'); + assert.ok(metadataRegExp.test(trace.labels.metadata)); }; assertTraceProperties(grpcClientPredicate); assertTraceProperties(grpcServerOuterPredicate); @@ -540,7 +630,7 @@ Object.keys(versions).forEach(function(version) { var trace = common.getMatchingSpan(agent, predicate); assert(trace); common.assertDurationCorrect(agent, Date.now() - start, predicate); - assert.strictEqual(trace.labels.metadata, '{"a":"b"}'); + assert.ok(metadataRegExp.test(trace.labels.metadata)); }; assertTraceProperties(grpcClientPredicate); assertTraceProperties(grpcServerOuterPredicate); @@ -560,7 +650,7 @@ Object.keys(versions).forEach(function(version) { var trace = common.getMatchingSpan(agent, predicate); assert(trace); common.assertDurationCorrect(agent, Date.now() - start, predicate); - assert.strictEqual(trace.labels.metadata, '{"a":"b"}'); + assert.ok(metadataRegExp.test(trace.labels.metadata)); return trace; }; assertTraceProperties(grpcClientPredicate); @@ -582,7 +672,7 @@ Object.keys(versions).forEach(function(version) { var trace = common.getMatchingSpan(agent, predicate); assert(trace); common.assertDurationCorrect(agent, Date.now() - start, predicate); - assert.strictEqual(trace.labels.metadata, '{"a":"b"}'); + assert.ok(metadataRegExp.test(trace.labels.metadata)); return trace; }; assertTraceProperties(grpcClientPredicate); @@ -607,7 +697,7 @@ Object.keys(versions).forEach(function(version) { var trace = common.getMatchingSpan(agent, predicate); assert(trace); common.assertDurationCorrect(agent, Date.now() - start, predicate); - assert.strictEqual(trace.labels.metadata, '{"a":"b"}'); + assert.ok(metadataRegExp.test(trace.labels.metadata)); return trace; }; assertTraceProperties(grpcClientPredicate); @@ -630,7 +720,7 @@ Object.keys(versions).forEach(function(version) { var trace = common.getMatchingSpan(agent, predicate); assert(trace); common.assertDurationCorrect(agent, Date.now() - start, predicate); - assert.strictEqual(trace.labels.metadata, '{"a":"b"}'); + assert.ok(metadataRegExp.test(trace.labels.metadata)); return trace; }; assertTraceProperties(grpcClientPredicate);