Skip to content

Commit

Permalink
Support for gRPC time tracing (#267)
Browse files Browse the repository at this point in the history
  • Loading branch information
misterpoe authored and matthewloring committed Jun 28, 2016
1 parent 239890b commit 693b6ae
Show file tree
Hide file tree
Showing 5 changed files with 334 additions and 13 deletions.
3 changes: 0 additions & 3 deletions lib/hooks/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,3 @@ module.exports = {
findModulePath: findModulePath,
findModuleVersion: findModuleVersion
};



85 changes: 76 additions & 9 deletions lib/hooks/userspace/hook-grpc.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,74 @@
var cls = require('../../cls.js');
var shimmer = require('shimmer');
var semver = require('semver');
var SpanData = require('../../span-data.js');

var agent;

var SUPPORTED_VERSIONS = '0.13 - 0.15';

function startBatchWrap(startBatch) {
return function startBatchTrace(thing, callback) {
// TODO: maybe we only want to do this if a root context exists.
return startBatch.call(this, thing, cls.getNamespace().bind(callback));
function makeClientMethod(useDeprecatedArgumentOrder, method, name) {
return function clientMethodTrace() {
var root = cls.getRootContext();
if (!root) {
agent.logger.debug('Untraced gRPC call: ', name);
return method.apply(this, arguments);
} else if (root === SpanData.nullSpan) {
return method.apply(this, arguments);
}
var span = agent.startSpan('grpc-call-' + name);
// Check if the response is through a stream or a callback.
if (!method.responseStream) {
// Grab the callback which is always required.
// Depending on the version of grpc, the position of the callback
// function differs.
// We need to wrap the callback with the context, to propagate it.
var cbIndex;
if (useDeprecatedArgumentOrder) {
cbIndex = method.requestStream ? 0 : 1;
} else {
cbIndex = arguments.length - 1;
}
// If the arguments are incorrect, we want gRPC to throw the Error
// so we do not wrap the callback unnecessarily.
if (cbIndex >= 0 && cbIndex < arguments.length &&
typeof arguments[cbIndex] === 'function') {
arguments[cbIndex] = wrapCallback(span, arguments[cbIndex]);
}
}
var call = method.apply(this, arguments);
// The user might need the current context in listeners to this stream.
cls.getNamespace().bindEmitter(call);
if (method.responseStream) {
call.on('end', function() {
agent.endSpan(span);
});
}
return call;
};
}

/**
* Wraps a callback so that the current span for this trace is also ended when
* the callback is invoked.
* @param {SpanData} span - The span that should end after this callback.
* @param {function(?Error, value=)} done - The callback to be wrapped.
*/
function wrapCallback(span, done) {
var fn = function(err, res) {
agent.endSpan(span);
done(err, res);
};
return cls.getNamespace().bind(fn);
}

function makeClientConstructorWrap(useDeprecatedArgumentOrder,
makeClientConstructor) {
return function makeClientConstructorTrace(methods) {
var Client = makeClientConstructor.apply(this, arguments);
shimmer.massWrap(Client.prototype, Object.keys(methods),
makeClientMethod.bind(null, useDeprecatedArgumentOrder));
return Client;
};
}

Expand All @@ -36,13 +96,20 @@ module.exports = function(version_, agent_) {
return {};
}
return {
'src/node/src/grpc_extension.js': {
patch: function(extension) {
'src/node/src/client.js': {
patch: function(client) {
agent = agent_;
shimmer.wrap(extension.Call.prototype, 'startBatch', startBatchWrap);
// If version < 0.14, use the old argument order for client methods.
var useDeprecatedArgumentOrder = semver.satisfies(version_, '<0.14');
shimmer.wrap(client, 'makeClientConstructor',
makeClientConstructorWrap.bind(null, useDeprecatedArgumentOrder));
},
unpatch: function(extension) {
shimmer.unwrap(extension.Call.prototype, 'startBatch');
unpatch: function(client) {
// Only the client constructor is unwrapped, so that future grpc.load's
// will not wrap client methods with tracing. However, existing Client
// objects with wrapped prototype methods will continue tracing.
shimmer.unwrap(client, 'makeClientConstructor');
agent_.logger.info('gRPC makeClientConstructor: unpatched');
}
}
};
Expand Down
18 changes: 18 additions & 0 deletions test/fixtures/test-grpc.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
syntax = "proto3";

package nodetest;

service Tester {
rpc TestUnary (TestRequest) returns (TestReply) {}
rpc TestClientStream (stream TestRequest) returns (TestReply) {}
rpc TestServerStream (TestRequest) returns (stream TestReply) {}
rpc TestBidiStream (stream TestRequest) returns (stream TestReply) {}
}

message TestRequest {
int32 n = 1;
}

message TestReply {
int32 n = 1;
}
2 changes: 1 addition & 1 deletion test/fixtures/test.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ message TestRequest {

message TestReply {
string message = 1;
}
}
239 changes: 239 additions & 0 deletions test/hooks/test-trace-grpc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
/**
* Copyright 2016 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
'use strict';

if (process.platform === 'win32') {
// Skip grpc due to https://github.com/nodejs/node/issues/4932
process.exit(0);
}

var common = require('./common.js');
var assert = require('assert');
var traceLabels = require('../../lib/trace-labels.js');

var versions = {
grpc013: require('./fixtures/grpc0.13'),
grpc014: require('./fixtures/grpc0.14')
};
var protoFile = __dirname + '/../fixtures/test-grpc.proto';
var grpcPort = 50051;
var client, server;

Object.keys(versions).forEach(function(version) {
var grpc = versions[version];

function startServer(proto) {
var _server = new grpc.Server();
_server.addProtoService(proto.Tester.service, {
testUnary: function(call, cb) {
setTimeout(function() {
cb(null, {n: call.request.n});
}, common.serverWait);
},
testClientStream: function(call, cb) {
var sum = 0;
call.on('data', function(data) {
sum += data.n;
});
call.on('end', function() {
setTimeout(function() {
cb(null, {n: sum});
}, common.serverWait);
});
},
testServerStream: function(stream) {
for (var i = 0; i < 10; ++i) {
stream.write({n: i});
}
setTimeout(function() {
stream.end();
}, common.serverWait);
},
testBidiStream: function(stream) {
stream.on('data', function(data) {
stream.write({n: data.n});
});
stream.on('end', function() {
setTimeout(function() {
stream.end();
}, common.serverWait);
});
}
});
_server.bind('localhost:' + grpcPort,
grpc.ServerCredentials.createInsecure());
_server.start();
return _server;
}

function createClient(proto) {
return new proto.Tester('localhost:' + grpcPort,
grpc.credentials.createInsecure());
}

describe(version, function() {
before(function() {
var proto = grpc.load(protoFile).nodetest;
server = startServer(proto);
client = createClient(proto);
});

after(function() {
server.forceShutdown();
});

afterEach(function() {
common.cleanTraces();
});

it('should accurately measure time for unary requests', function(done) {
common.runInTransaction(function(endTransaction) {
client.testUnary({n: 42}, function(err, result) {
endTransaction();
assert.ifError(err);
assert.strictEqual(result.n, 42);
var trace = common.getMatchingSpan(grpcPredicate);
assert(trace);
common.assertDurationCorrect();
done();
});
});
});

it('should accurately measure time for client requests', function(done) {
common.runInTransaction(function(endTransaction) {
var stream = client.testClientStream(function(err, result) {
endTransaction();
assert.ifError(err);
assert.strictEqual(result.n, 45);
var trace = common.getMatchingSpan(grpcPredicate);
assert(trace);
common.assertDurationCorrect();
done();
});
for (var i = 0; i < 10; ++i) {
stream.write({n: i});
}
stream.end();
});
});

it('should accurately measure time for server requests', function(done) {
common.runInTransaction(function(endTransaction) {
var stream = client.testServerStream();
var sum = 0;
stream.on('data', function(data) {
sum += data.n;
});
stream.on('status', function(status) {
endTransaction();
assert.strictEqual(status.code, grpc.status.OK);
assert.strictEqual(sum, 45);
var trace = common.getMatchingSpan(grpcPredicate);
assert(trace);
common.assertDurationCorrect();
done();
});
});
});

it('should accurately measure time for bidi requests', function(done) {
common.runInTransaction(function(endTransaction) {
var stream = client.testBidiStream();
var sum = 0;
stream.on('data', function(data) {
sum += data.n;
});
for (var i = 0; i < 10; ++i) {
stream.write({n: i});
}
stream.end();
stream.on('status', function(status) {
endTransaction();
assert.strictEqual(status.code, grpc.status.OK);
assert.strictEqual(sum, 45);
var trace = common.getMatchingSpan(grpcPredicate);
assert(trace);
common.assertDurationCorrect();
done();
});
});
});

it('should not break if no parent transaction', function(done) {
client.testUnary({n: 42}, function(err, result) {
assert.ifError(err);
assert.strictEqual(result.n, 42);
assert.strictEqual(common.getTraces().length, 0);
done();
});
});

it('should remove trace frames from stack', function(done) {
common.runInTransaction(function(endTransaction) {
client.testUnary({n: 42}, function(err, result) {
endTransaction();
assert.ifError(err);
assert.strictEqual(result.n, 42);
var trace = common.getMatchingSpan(grpcPredicate);
var labels = trace.labels;
var stack = JSON.parse(labels[traceLabels.STACK_TRACE_DETAILS_KEY]);
assert.notStrictEqual(-1,
stack.stack_frame[0].method_name.indexOf('clientMethodTrace'));
done();
});
});
});

if (version === 'grpc013') {
it('should work for old argument orders (unary)', function(done) {
common.runInTransaction(function(endTransaction) {
client.testUnary({n: 42}, function(err, result) {
endTransaction();
assert.ifError(err);
assert.strictEqual(result.n, 42);
var trace = common.getMatchingSpan(grpcPredicate);
assert(trace);
common.assertDurationCorrect();
done();
}, null, {});
});
});

it('should work for old argument orders (stream)', function(done) {
common.runInTransaction(function(endTransaction) {
var stream = client.testClientStream(function(err, result) {
endTransaction();
assert.ifError(err);
assert.strictEqual(result.n, 45);
var trace = common.getMatchingSpan(grpcPredicate);
assert(trace);
common.assertDurationCorrect();
done();
}, null, {});
for (var i = 0; i < 10; ++i) {
stream.write({n: i});
}
stream.end();
});
});
}
});
});

function grpcPredicate(span) {
return span.name.indexOf('grpc-') === 0;
}

0 comments on commit 693b6ae

Please sign in to comment.