Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for gRPC time tracing #267

Merged
merged 1 commit into from
Jun 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions lib/hooks/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,3 @@ module.exports = {
findModulePath: findModulePath,
findModuleVersion: findModuleVersion
};



85 changes: 76 additions & 9 deletions lib/hooks/userspace/hook-grpc.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,74 @@
var cls = require('../../cls.js');
var shimmer = require('shimmer');
var semver = require('semver');
var SpanData = require('../../span-data.js');

var agent;

var SUPPORTED_VERSIONS = '0.13 - 0.15';

function startBatchWrap(startBatch) {
return function startBatchTrace(thing, callback) {
// TODO: maybe we only want to do this if a root context exists.
return startBatch.call(this, thing, cls.getNamespace().bind(callback));
function makeClientMethod(useDeprecatedArgumentOrder, method, name) {
return function clientMethodTrace() {
var root = cls.getRootContext();
if (!root) {
agent.logger.debug('Untraced gRPC call: ', name);
return method.apply(this, arguments);
} else if (root === SpanData.nullSpan) {
return method.apply(this, arguments);
}
var span = agent.startSpan('grpc-call-' + name);
// Check if the response is through a stream or a callback.
if (!method.responseStream) {
// Grab the callback which is always required.
// Depending on the version of grpc, the position of the callback
// function differs.
// We need to wrap the callback with the context, to propagate it.
var cbIndex;
if (useDeprecatedArgumentOrder) {
cbIndex = method.requestStream ? 0 : 1;
} else {
cbIndex = arguments.length - 1;
}
// If the arguments are incorrect, we want gRPC to throw the Error
// so we do not wrap the callback unnecessarily.
if (cbIndex >= 0 && cbIndex < arguments.length &&
typeof arguments[cbIndex] === 'function') {
arguments[cbIndex] = wrapCallback(span, arguments[cbIndex]);
}
}
var call = method.apply(this, arguments);
// The user might need the current context in listeners to this stream.
cls.getNamespace().bindEmitter(call);
if (method.responseStream) {
call.on('end', function() {

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

agent.endSpan(span);
});
}
return call;
};
}

/**
* Wraps a callback so that the current span for this trace is also ended when
* the callback is invoked.
* @param {SpanData} span - The span that should end after this callback.
* @param {function(?Error, value=)} done - The callback to be wrapped.
*/
function wrapCallback(span, done) {
var fn = function(err, res) {
agent.endSpan(span);
done(err, res);
};
return cls.getNamespace().bind(fn);
}

function makeClientConstructorWrap(useDeprecatedArgumentOrder,
makeClientConstructor) {
return function makeClientConstructorTrace(methods) {
var Client = makeClientConstructor.apply(this, arguments);
shimmer.massWrap(Client.prototype, Object.keys(methods),
makeClientMethod.bind(null, useDeprecatedArgumentOrder));
return Client;
};
}

Expand All @@ -36,13 +96,20 @@ module.exports = function(version_, agent_) {
return {};
}
return {
'src/node/src/grpc_extension.js': {
patch: function(extension) {
'src/node/src/client.js': {
patch: function(client) {
agent = agent_;
shimmer.wrap(extension.Call.prototype, 'startBatch', startBatchWrap);
// If version < 0.14, use the old argument order for client methods.
var useDeprecatedArgumentOrder = semver.satisfies(version_, '<0.14');
shimmer.wrap(client, 'makeClientConstructor',
makeClientConstructorWrap.bind(null, useDeprecatedArgumentOrder));
},
unpatch: function(extension) {
shimmer.unwrap(extension.Call.prototype, 'startBatch');
unpatch: function(client) {
// Only the client constructor is unwrapped, so that future grpc.load's
// will not wrap client methods with tracing. However, existing Client
// objects with wrapped prototype methods will continue tracing.
shimmer.unwrap(client, 'makeClientConstructor');

This comment was marked as spam.

This comment was marked as spam.

agent_.logger.info('gRPC makeClientConstructor: unpatched');

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

}
}
};
Expand Down
18 changes: 18 additions & 0 deletions test/fixtures/test-grpc.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
syntax = "proto3";

package nodetest;

service Tester {
rpc TestUnary (TestRequest) returns (TestReply) {}
rpc TestClientStream (stream TestRequest) returns (TestReply) {}
rpc TestServerStream (TestRequest) returns (stream TestReply) {}
rpc TestBidiStream (stream TestRequest) returns (stream TestReply) {}
}

message TestRequest {
int32 n = 1;
}

message TestReply {
int32 n = 1;
}
2 changes: 1 addition & 1 deletion test/fixtures/test.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ message TestRequest {

message TestReply {
string message = 1;
}
}
239 changes: 239 additions & 0 deletions test/hooks/test-trace-grpc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
/**
* Copyright 2016 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
'use strict';

if (process.platform === 'win32') {
// Skip grpc due to https://github.com/nodejs/node/issues/4932
process.exit(0);
}

var common = require('./common.js');
var assert = require('assert');
var traceLabels = require('../../lib/trace-labels.js');

var versions = {
grpc013: require('./fixtures/grpc0.13'),
grpc014: require('./fixtures/grpc0.14')
};
var protoFile = __dirname + '/../fixtures/test-grpc.proto';
var grpcPort = 50051;
var client, server;

Object.keys(versions).forEach(function(version) {
var grpc = versions[version];

function startServer(proto) {
var _server = new grpc.Server();
_server.addProtoService(proto.Tester.service, {
testUnary: function(call, cb) {
setTimeout(function() {
cb(null, {n: call.request.n});
}, common.serverWait);
},
testClientStream: function(call, cb) {
var sum = 0;
call.on('data', function(data) {
sum += data.n;
});
call.on('end', function() {
setTimeout(function() {
cb(null, {n: sum});
}, common.serverWait);
});
},
testServerStream: function(stream) {
for (var i = 0; i < 10; ++i) {
stream.write({n: i});
}
setTimeout(function() {
stream.end();
}, common.serverWait);
},
testBidiStream: function(stream) {
stream.on('data', function(data) {
stream.write({n: data.n});
});
stream.on('end', function() {
setTimeout(function() {
stream.end();
}, common.serverWait);
});
}
});
_server.bind('localhost:' + grpcPort,
grpc.ServerCredentials.createInsecure());
_server.start();
return _server;
}

function createClient(proto) {
return new proto.Tester('localhost:' + grpcPort,
grpc.credentials.createInsecure());
}

describe(version, function() {
before(function() {
var proto = grpc.load(protoFile).nodetest;
server = startServer(proto);
client = createClient(proto);
});

after(function() {
server.forceShutdown();
});

afterEach(function() {
common.cleanTraces();
});

it('should accurately measure time for unary requests', function(done) {
common.runInTransaction(function(endTransaction) {
client.testUnary({n: 42}, function(err, result) {
endTransaction();
assert.ifError(err);
assert.strictEqual(result.n, 42);
var trace = common.getMatchingSpan(grpcPredicate);
assert(trace);
common.assertDurationCorrect();
done();
});
});
});

it('should accurately measure time for client requests', function(done) {
common.runInTransaction(function(endTransaction) {
var stream = client.testClientStream(function(err, result) {
endTransaction();
assert.ifError(err);
assert.strictEqual(result.n, 45);
var trace = common.getMatchingSpan(grpcPredicate);
assert(trace);
common.assertDurationCorrect();
done();
});
for (var i = 0; i < 10; ++i) {
stream.write({n: i});
}
stream.end();
});
});

it('should accurately measure time for server requests', function(done) {
common.runInTransaction(function(endTransaction) {
var stream = client.testServerStream();
var sum = 0;
stream.on('data', function(data) {
sum += data.n;
});
stream.on('status', function(status) {
endTransaction();
assert.strictEqual(status.code, grpc.status.OK);
assert.strictEqual(sum, 45);
var trace = common.getMatchingSpan(grpcPredicate);
assert(trace);
common.assertDurationCorrect();
done();
});
});
});

it('should accurately measure time for bidi requests', function(done) {
common.runInTransaction(function(endTransaction) {
var stream = client.testBidiStream();
var sum = 0;
stream.on('data', function(data) {
sum += data.n;
});
for (var i = 0; i < 10; ++i) {
stream.write({n: i});
}
stream.end();
stream.on('status', function(status) {
endTransaction();
assert.strictEqual(status.code, grpc.status.OK);
assert.strictEqual(sum, 45);
var trace = common.getMatchingSpan(grpcPredicate);
assert(trace);
common.assertDurationCorrect();
done();
});
});
});

it('should not break if no parent transaction', function(done) {
client.testUnary({n: 42}, function(err, result) {
assert.ifError(err);
assert.strictEqual(result.n, 42);
assert.strictEqual(common.getTraces().length, 0);
done();
});
});

it('should remove trace frames from stack', function(done) {
common.runInTransaction(function(endTransaction) {
client.testUnary({n: 42}, function(err, result) {
endTransaction();
assert.ifError(err);
assert.strictEqual(result.n, 42);
var trace = common.getMatchingSpan(grpcPredicate);
var labels = trace.labels;
var stack = JSON.parse(labels[traceLabels.STACK_TRACE_DETAILS_KEY]);
assert.notStrictEqual(-1,
stack.stack_frame[0].method_name.indexOf('clientMethodTrace'));
done();
});
});
});

if (version === 'grpc013') {
it('should work for old argument orders (unary)', function(done) {
common.runInTransaction(function(endTransaction) {
client.testUnary({n: 42}, function(err, result) {
endTransaction();
assert.ifError(err);
assert.strictEqual(result.n, 42);
var trace = common.getMatchingSpan(grpcPredicate);
assert(trace);
common.assertDurationCorrect();
done();
}, null, {});
});
});

it('should work for old argument orders (stream)', function(done) {
common.runInTransaction(function(endTransaction) {
var stream = client.testClientStream(function(err, result) {
endTransaction();
assert.ifError(err);
assert.strictEqual(result.n, 45);
var trace = common.getMatchingSpan(grpcPredicate);
assert(trace);
common.assertDurationCorrect();
done();
}, null, {});
for (var i = 0; i < 10; ++i) {
stream.write({n: i});
}
stream.end();
});
});
}
});
});

function grpcPredicate(span) {
return span.name.indexOf('grpc-') === 0;
}