Skip to content

Commit

Permalink
Support for gRPC time tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
misterpoe committed Jun 23, 2016
1 parent 1aa95b2 commit 03606d5
Show file tree
Hide file tree
Showing 5 changed files with 311 additions and 13 deletions.
3 changes: 0 additions & 3 deletions lib/hooks/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,3 @@ module.exports = {
findModulePath: findModulePath,
findModuleVersion: findModuleVersion
};



77 changes: 68 additions & 9 deletions lib/hooks/userspace/hook-grpc.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,71 @@
var cls = require('../../cls.js');
var shimmer = require('shimmer');
var semver = require('semver');
var SpanData = require('../../span-data.js');

var agent;

var SUPPORTED_VERSIONS = '0.13 - 0.15';

function startBatchWrap(startBatch) {
return function startBatchTrace(thing, callback) {
// TODO: maybe we only want to do this if a root context exists.
return startBatch.call(this, thing, cls.getNamespace().bind(callback));
function wrapClientMethod(name, method, class_options) {
return function() {
var root = cls.getRootContext();
if (!root) {
agent.logger.debug('Untraced gRPC call: ', name);
return method.apply(this, arguments);
} else if (root === SpanData.nullSpan) {
return method.apply(this, arguments);
}
var span = agent.startSpan('grpc-call');
// Check if the response is through a stream or a callback.
if (!method.responseStream) {
// Grab the callback which is always required.
// Depending on whether deprecatedArgumentOrder is set, the position
// of the callback function differs.
// We need to wrap the callback with the context, to propagate it.
var cbIndex;
if (class_options && class_options.deprecatedArgumentOrder) {
cbIndex = method.requestStream ? 0 : 1;
} else {
cbIndex = arguments.length - 1;
}
if (cbIndex >= 0 && cbIndex < arguments.length) {
arguments[cbIndex] = wrapCallback(span, arguments[cbIndex]);
}
}
var call = method.apply(this, arguments);
if (agent.config_.enhancedDatabaseReporting) {
span.addLabel('call', name);
}
// The user might need the current context in listeners to this stream.
cls.getNamespace().bindEmitter(call);
if (method.responseStream) {
call.on('end', function() {
agent.endSpan(span);
});
}
return call;
};
}

function wrapCallback(span, done) {
var fn = function(err, res) {
agent.endSpan(span);
// If we reach this point, done will always be well-defined.
done(err, res);
};
return cls.getNamespace().bind(fn);
}

function makeClientConstructorWrap(makeClientConstructor) {
return function makeClientConstructorTrace(methods, serviceName,
class_options) {
var Client = makeClientConstructor.apply(this, arguments);
Object.keys(methods).forEach(function(name) {
var method = Client.prototype[name];
Client.prototype[name] = wrapClientMethod(name, method, class_options);
});
return Client;
};
}

Expand All @@ -36,13 +93,15 @@ module.exports = function(version_, agent_) {
return {};
}
return {
'src/node/src/grpc_extension.js': {
patch: function(extension) {
'src/node/src/client.js': {
patch: function(client) {
agent = agent_;
shimmer.wrap(extension.Call.prototype, 'startBatch', startBatchWrap);
shimmer.wrap(client, 'makeClientConstructor',
makeClientConstructorWrap);
},
unpatch: function(extension) {
shimmer.unwrap(extension.Call.prototype, 'startBatch');
unpatch: function(client) {
shimmer.unwrap(client, 'makeClientConstructor');
agent_.logger.info('gRPC makeClientConstructor: unpatched');
}
}
};
Expand Down
18 changes: 18 additions & 0 deletions test/fixtures/test-grpc.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
syntax = "proto3";

package nodetest;

service Tester {
rpc TestUnary (TestRequest) returns (TestReply) {}
rpc TestClientStream (stream TestRequest) returns (TestReply) {}
rpc TestServerStream (TestRequest) returns (stream TestReply) {}
rpc TestBidiStream (stream TestRequest) returns (stream TestReply) {}
}

message TestRequest {
int32 n = 1;
}

message TestReply {
int32 n = 1;
}
2 changes: 1 addition & 1 deletion test/fixtures/test.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ message TestRequest {

message TestReply {
string message = 1;
}
}
224 changes: 224 additions & 0 deletions test/hooks/test-trace-grpc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
/**
* Copyright 2016 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
'use strict';

var common = require('./common.js');
var assert = require('assert');

var versions = {
grpc013: require('./fixtures/grpc0.13'),
grpc014: require('./fixtures/grpc0.14')
};
var protoFile = __dirname + '/../fixtures/test-grpc.proto';
var grpcPort = 50051;
var client, server;
// These are to test deprecated argument orders.
var grpcPortDeprecated = 50052;
var clientDeprecated, serverDeprecated;

Object.keys(versions).forEach(function(version) {
var grpc = versions[version];

function startServer(proto, port) {
var _server = new grpc.Server();
_server.addProtoService(proto.Tester.service, {
testUnary: function(call, cb) {
setTimeout(function() {
cb(null, {n: call.request.n});
}, common.serverWait);
},
testClientStream: function(call, cb) {
var sum = 0;
call.on('data', function(data) {
sum += data.n;
});
call.on('end', function() {
setTimeout(function() {
cb(null, {n: sum});
}, common.serverWait);
});
},
testServerStream: function(stream) {
for (var i = 0; i < 10; ++i) {
stream.write({n: i});
}
setTimeout(function() {
stream.end();
}, common.serverWait);
},
testBidiStream: function(stream) {
stream.on('data', function(data) {
stream.write({n: data.n});
});
stream.on('end', function() {
setTimeout(function() {
stream.end();
}, common.serverWait);
});
}
});
_server.bind('localhost:' + port,
grpc.ServerCredentials.createInsecure());
_server.start();
return _server;
}

function createClient(proto, port) {
return new proto.Tester('localhost:' + grpcPort,
grpc.credentials.createInsecure());
}

describe(version, function() {
before(function() {
var proto = grpc.load(protoFile).nodetest;
server = startServer(proto, grpcPort);
client = createClient(proto, grpcPort);

var protoDeprecated = grpc.load(protoFile, 'proto',
{deprecatedArgumentOrder: true}).nodetest;
serverDeprecated = startServer(protoDeprecated, grpcPortDeprecated);
clientDeprecated = createClient(protoDeprecated, grpcPortDeprecated);
});

after(function() {
server.forceShutdown();
serverDeprecated.forceShutdown();
});

afterEach(function () {
common.cleanTraces();
});

it('should accurately measure time for unary requests', function(done) {
common.runInTransaction(function(endTransaction) {
client.testUnary({n: 42}, function(err, result) {
endTransaction();
assert.ifError(err);
assert.strictEqual(result.n, 42);
var trace = common.getMatchingSpan(grpcPredicate);
assert(trace);
common.assertDurationCorrect();
done();
});
});
});

it('should accurately measure time for client requests', function(done) {
common.runInTransaction(function(endTransaction) {
var stream = client.testClientStream(function(err, result) {
endTransaction();
assert.ifError(err);
assert.strictEqual(result.n, 45);
var trace = common.getMatchingSpan(grpcPredicate);
assert(trace);
common.assertDurationCorrect();
done();
});
for (var i = 0; i < 10; ++i) {
stream.write({n: i});
}
stream.end();
});
});

it('should accurately measure time for server requests', function(done) {
common.runInTransaction(function(endTransaction) {
var stream = client.testServerStream();
var sum = 0;
stream.on('data', function(data) {
sum += data.n;
});
stream.on('status', function(status) {
endTransaction();
assert.strictEqual(status.code, grpc.status.OK);
assert.strictEqual(sum, 45);
var trace = common.getMatchingSpan(grpcPredicate);
assert(trace);
common.assertDurationCorrect();
done();
});
});
});

it('should accurately measure time for bidi requests', function(done) {
common.runInTransaction(function(endTransaction) {
var stream = client.testBidiStream();
var sum = 0;
stream.on('data', function(data) {
sum += data.n;
});
for (var i = 0; i < 10; ++i) {
stream.write({n: i});
}
stream.end();
stream.on('status', function(status) {
endTransaction();
assert.strictEqual(status.code, grpc.status.OK);
assert.strictEqual(sum, 45);
var trace = common.getMatchingSpan(grpcPredicate);
assert(trace);
common.assertDurationCorrect();
done();
});
});
});

it('should not break if no parent transaction', function(done) {
client.testUnary({n: 42}, function(err, result) {
assert.ifError(err);
assert.strictEqual(result.n, 42);
assert.strictEqual(common.getTraces().length, 0);
done();
});
});

it('should work for deprecated argument orders (unary)', function(done) {
common.runInTransaction(function(endTransaction) {
clientDeprecated.testUnary({n: 42}, function(err, result) {
endTransaction();
assert.ifError(err);
assert.strictEqual(result.n, 42);
var trace = common.getMatchingSpan(grpcPredicate);
assert(trace);
common.assertDurationCorrect();
done();
});
});
});

it('should work for deprecated argument orders (stream)', function(done) {
common.runInTransaction(function(endTransaction) {
var stream = clientDeprecated.testClientStream(function(err, result) {
endTransaction();
assert.ifError(err);
assert.strictEqual(result.n, 45);
var trace = common.getMatchingSpan(grpcPredicate);
assert(trace);
common.assertDurationCorrect();
done();
});
for (var i = 0; i < 10; ++i) {
stream.write({n: i});
}
stream.end();
});
});
});
});

function grpcPredicate(span) {
return span.name === 'grpc-call';
}

0 comments on commit 03606d5

Please sign in to comment.