diff --git a/lib/hooks/index.js b/lib/hooks/index.js index 85e3bf0ed..54ff60ce2 100644 --- a/lib/hooks/index.js +++ b/lib/hooks/index.js @@ -208,6 +208,3 @@ module.exports = { findModulePath: findModulePath, findModuleVersion: findModuleVersion }; - - - diff --git a/lib/hooks/userspace/hook-grpc.js b/lib/hooks/userspace/hook-grpc.js index 327e67383..37cfb50a7 100644 --- a/lib/hooks/userspace/hook-grpc.js +++ b/lib/hooks/userspace/hook-grpc.js @@ -19,14 +19,74 @@ var cls = require('../../cls.js'); var shimmer = require('shimmer'); var semver = require('semver'); +var SpanData = require('../../span-data.js'); + var agent; var SUPPORTED_VERSIONS = '0.13 - 0.15'; -function startBatchWrap(startBatch) { - return function startBatchTrace(thing, callback) { - // TODO: maybe we only want to do this if a root context exists. - return startBatch.call(this, thing, cls.getNamespace().bind(callback)); +function makeClientMethod(useDeprecatedArgumentOrder, method, name) { + return function clientMethodTrace() { + var root = cls.getRootContext(); + if (!root) { + agent.logger.debug('Untraced gRPC call: ', name); + return method.apply(this, arguments); + } else if (root === SpanData.nullSpan) { + return method.apply(this, arguments); + } + var span = agent.startSpan('grpc-call-' + name); + // Check if the response is through a stream or a callback. + if (!method.responseStream) { + // Grab the callback which is always required. + // Depending on the version of grpc, the position of the callback + // function differs. + // We need to wrap the callback with the context, to propagate it. + var cbIndex; + if (useDeprecatedArgumentOrder) { + cbIndex = method.requestStream ? 0 : 1; + } else { + cbIndex = arguments.length - 1; + } + // If the arguments are incorrect, we want gRPC to throw the Error + // so we do not wrap the callback unnecessarily. + if (cbIndex >= 0 && cbIndex < arguments.length && + typeof arguments[cbIndex] === 'function') { + arguments[cbIndex] = wrapCallback(span, arguments[cbIndex]); + } + } + var call = method.apply(this, arguments); + // The user might need the current context in listeners to this stream. + cls.getNamespace().bindEmitter(call); + if (method.responseStream) { + call.on('end', function() { + agent.endSpan(span); + }); + } + return call; + }; +} + +/** + * Wraps a callback so that the current span for this trace is also ended when + * the callback is invoked. + * @param {SpanData} span - The span that should end after this callback. + * @param {function(?Error, value=)} done - The callback to be wrapped. + */ +function wrapCallback(span, done) { + var fn = function(err, res) { + agent.endSpan(span); + done(err, res); + }; + return cls.getNamespace().bind(fn); +} + +function makeClientConstructorWrap(useDeprecatedArgumentOrder, + makeClientConstructor) { + return function makeClientConstructorTrace(methods) { + var Client = makeClientConstructor.apply(this, arguments); + shimmer.massWrap(Client.prototype, Object.keys(methods), + makeClientMethod.bind(null, useDeprecatedArgumentOrder)); + return Client; }; } @@ -36,13 +96,20 @@ module.exports = function(version_, agent_) { return {}; } return { - 'src/node/src/grpc_extension.js': { - patch: function(extension) { + 'src/node/src/client.js': { + patch: function(client) { agent = agent_; - shimmer.wrap(extension.Call.prototype, 'startBatch', startBatchWrap); + // If version < 0.14, use the old argument order for client methods. + var useDeprecatedArgumentOrder = semver.satisfies(version_, '<0.14'); + shimmer.wrap(client, 'makeClientConstructor', + makeClientConstructorWrap.bind(null, useDeprecatedArgumentOrder)); }, - unpatch: function(extension) { - shimmer.unwrap(extension.Call.prototype, 'startBatch'); + unpatch: function(client) { + // Only the client constructor is unwrapped, so that future grpc.load's + // will not wrap client methods with tracing. However, existing Client + // objects with wrapped prototype methods will continue tracing. + shimmer.unwrap(client, 'makeClientConstructor'); + agent_.logger.info('gRPC makeClientConstructor: unpatched'); } } }; diff --git a/test/fixtures/test-grpc.proto b/test/fixtures/test-grpc.proto new file mode 100644 index 000000000..f5349d4aa --- /dev/null +++ b/test/fixtures/test-grpc.proto @@ -0,0 +1,18 @@ +syntax = "proto3"; + +package nodetest; + +service Tester { + rpc TestUnary (TestRequest) returns (TestReply) {} + rpc TestClientStream (stream TestRequest) returns (TestReply) {} + rpc TestServerStream (TestRequest) returns (stream TestReply) {} + rpc TestBidiStream (stream TestRequest) returns (stream TestReply) {} +} + +message TestRequest { + int32 n = 1; +} + +message TestReply { + int32 n = 1; +} diff --git a/test/fixtures/test.proto b/test/fixtures/test.proto index 0a2343abf..84d1d9bd2 100644 --- a/test/fixtures/test.proto +++ b/test/fixtures/test.proto @@ -12,4 +12,4 @@ message TestRequest { message TestReply { string message = 1; -} \ No newline at end of file +} diff --git a/test/hooks/test-trace-grpc.js b/test/hooks/test-trace-grpc.js new file mode 100644 index 000000000..0f2fc97a4 --- /dev/null +++ b/test/hooks/test-trace-grpc.js @@ -0,0 +1,239 @@ +/** + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +'use strict'; + +if (process.platform === 'win32') { + // Skip grpc due to https://github.com/nodejs/node/issues/4932 + process.exit(0); +} + +var common = require('./common.js'); +var assert = require('assert'); +var traceLabels = require('../../lib/trace-labels.js'); + +var versions = { + grpc013: require('./fixtures/grpc0.13'), + grpc014: require('./fixtures/grpc0.14') +}; +var protoFile = __dirname + '/../fixtures/test-grpc.proto'; +var grpcPort = 50051; +var client, server; + +Object.keys(versions).forEach(function(version) { + var grpc = versions[version]; + + function startServer(proto) { + var _server = new grpc.Server(); + _server.addProtoService(proto.Tester.service, { + testUnary: function(call, cb) { + setTimeout(function() { + cb(null, {n: call.request.n}); + }, common.serverWait); + }, + testClientStream: function(call, cb) { + var sum = 0; + call.on('data', function(data) { + sum += data.n; + }); + call.on('end', function() { + setTimeout(function() { + cb(null, {n: sum}); + }, common.serverWait); + }); + }, + testServerStream: function(stream) { + for (var i = 0; i < 10; ++i) { + stream.write({n: i}); + } + setTimeout(function() { + stream.end(); + }, common.serverWait); + }, + testBidiStream: function(stream) { + stream.on('data', function(data) { + stream.write({n: data.n}); + }); + stream.on('end', function() { + setTimeout(function() { + stream.end(); + }, common.serverWait); + }); + } + }); + _server.bind('localhost:' + grpcPort, + grpc.ServerCredentials.createInsecure()); + _server.start(); + return _server; + } + + function createClient(proto) { + return new proto.Tester('localhost:' + grpcPort, + grpc.credentials.createInsecure()); + } + + describe(version, function() { + before(function() { + var proto = grpc.load(protoFile).nodetest; + server = startServer(proto); + client = createClient(proto); + }); + + after(function() { + server.forceShutdown(); + }); + + afterEach(function() { + common.cleanTraces(); + }); + + it('should accurately measure time for unary requests', function(done) { + common.runInTransaction(function(endTransaction) { + client.testUnary({n: 42}, function(err, result) { + endTransaction(); + assert.ifError(err); + assert.strictEqual(result.n, 42); + var trace = common.getMatchingSpan(grpcPredicate); + assert(trace); + common.assertDurationCorrect(); + done(); + }); + }); + }); + + it('should accurately measure time for client requests', function(done) { + common.runInTransaction(function(endTransaction) { + var stream = client.testClientStream(function(err, result) { + endTransaction(); + assert.ifError(err); + assert.strictEqual(result.n, 45); + var trace = common.getMatchingSpan(grpcPredicate); + assert(trace); + common.assertDurationCorrect(); + done(); + }); + for (var i = 0; i < 10; ++i) { + stream.write({n: i}); + } + stream.end(); + }); + }); + + it('should accurately measure time for server requests', function(done) { + common.runInTransaction(function(endTransaction) { + var stream = client.testServerStream(); + var sum = 0; + stream.on('data', function(data) { + sum += data.n; + }); + stream.on('status', function(status) { + endTransaction(); + assert.strictEqual(status.code, grpc.status.OK); + assert.strictEqual(sum, 45); + var trace = common.getMatchingSpan(grpcPredicate); + assert(trace); + common.assertDurationCorrect(); + done(); + }); + }); + }); + + it('should accurately measure time for bidi requests', function(done) { + common.runInTransaction(function(endTransaction) { + var stream = client.testBidiStream(); + var sum = 0; + stream.on('data', function(data) { + sum += data.n; + }); + for (var i = 0; i < 10; ++i) { + stream.write({n: i}); + } + stream.end(); + stream.on('status', function(status) { + endTransaction(); + assert.strictEqual(status.code, grpc.status.OK); + assert.strictEqual(sum, 45); + var trace = common.getMatchingSpan(grpcPredicate); + assert(trace); + common.assertDurationCorrect(); + done(); + }); + }); + }); + + it('should not break if no parent transaction', function(done) { + client.testUnary({n: 42}, function(err, result) { + assert.ifError(err); + assert.strictEqual(result.n, 42); + assert.strictEqual(common.getTraces().length, 0); + done(); + }); + }); + + it('should remove trace frames from stack', function(done) { + common.runInTransaction(function(endTransaction) { + client.testUnary({n: 42}, function(err, result) { + endTransaction(); + assert.ifError(err); + assert.strictEqual(result.n, 42); + var trace = common.getMatchingSpan(grpcPredicate); + var labels = trace.labels; + var stack = JSON.parse(labels[traceLabels.STACK_TRACE_DETAILS_KEY]); + assert.notStrictEqual(-1, + stack.stack_frame[0].method_name.indexOf('clientMethodTrace')); + done(); + }); + }); + }); + + if (version === 'grpc013') { + it('should work for old argument orders (unary)', function(done) { + common.runInTransaction(function(endTransaction) { + client.testUnary({n: 42}, function(err, result) { + endTransaction(); + assert.ifError(err); + assert.strictEqual(result.n, 42); + var trace = common.getMatchingSpan(grpcPredicate); + assert(trace); + common.assertDurationCorrect(); + done(); + }, null, {}); + }); + }); + + it('should work for old argument orders (stream)', function(done) { + common.runInTransaction(function(endTransaction) { + var stream = client.testClientStream(function(err, result) { + endTransaction(); + assert.ifError(err); + assert.strictEqual(result.n, 45); + var trace = common.getMatchingSpan(grpcPredicate); + assert(trace); + common.assertDurationCorrect(); + done(); + }, null, {}); + for (var i = 0; i < 10; ++i) { + stream.write({n: i}); + } + stream.end(); + }); + }); + } + }); +}); + +function grpcPredicate(span) { + return span.name.indexOf('grpc-') === 0; +}