Skip to content

Commit

Permalink
Follow-Up Fixes for gRPC (#357)
Browse files Browse the repository at this point in the history
PR-URL: #357
  • Loading branch information
kjin authored Jan 30, 2017
1 parent 62e6c66 commit f8f8145
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 114 deletions.
101 changes: 58 additions & 43 deletions src/hooks/userspace/hook-grpc.js
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,6 @@ function sendMetadataWrapper(rootContext) {
return function sendMetadataTrace(responseMetadata) {
if (rootContext) {
rootContext.addLabel('metadata', JSON.stringify(responseMetadata.getMap()));
} else {
agent.logger.info('gRPC: No root context found in sendMetadata');
}
return sendMetadata.apply(this, arguments);
};
Expand All @@ -155,29 +153,30 @@ function sendMetadataWrapper(rootContext) {
/**
* Wraps a unary function in order to record trace spans.
* @param {cls.Namespace} namespace The CLS namespace.
* @param {Object} handlerSet An object containing references to the function handle,
* as well as serialize and deserialize handles.
* @param {Object} handlerSet An object containing references to the function
* handle.
* @param {string} requestName The human-friendly name of the request.
*/
function wrapUnary(namespace, handlerSet, requestName) {
shimmer.wrap(handlerSet, 'func', function (func) {
// handlerSet.func is the gRPC method implementation itself.
// We wrap it so that a span is started immediately beforehand, and ended
// when the callback provided to it as an argument is invoked.
shimmer.wrap(handlerSet, 'func', function (serverMethod) {
return function serverMethodTrace(call, callback) {
var that = this;
var args = arguments;
if (!agent.shouldTrace(requestName)) {
return serverMethod.call(that, call, callback);
}
// Running in the namespace here propagates context to func.
return namespace.runAndReturn(function() {
var rootContext = startRootSpanForRequest(requestName, 5);
if (agent.config_.enhancedDatabaseReporting) {
shimmer.wrap(call, 'sendMetadata', sendMetadataWrapper(rootContext));
}
if (agent.config_.enhancedDatabaseReporting) {
rootContext.addLabel('argument', JSON.stringify(call.request));
}
// args[1] is the callback.
// Here, we patch the callback so that the span is ended immediately
// beforehand.
args = Array.prototype.slice.call(args);
args[1] = function (err, result, trailer, flags) {
var wrappedCb = function (err, result, trailer, flags) {
if (agent.config_.enhancedDatabaseReporting) {
if (err) {
rootContext.addLabel('error', err);
Expand All @@ -191,7 +190,7 @@ function wrapUnary(namespace, handlerSet, requestName) {
endRootSpanForRequest(rootContext);
return callback(err, result, trailer, flags);
};
return func.apply(that, args);
return serverMethod.call(that, call, wrappedCb);
});
};
});
Expand All @@ -200,26 +199,29 @@ function wrapUnary(namespace, handlerSet, requestName) {
/**
* Wraps a server streaming function in order to record trace spans.
* @param {cls.Namespace} namespace The CLS namespace.
* @param {Object} handlerSet An object containing references to the function handle,
* as well as serialize and deserialize handles.
* @param {Object} handlerSet An object containing references to the function
* handle.
* @param {string} requestName The human-friendly name of the request.
*/
function wrapServerStream(namespace, handlerSet, requestName) {
shimmer.wrap(handlerSet, 'func', function (func) {
// handlerSet.func is the gRPC method implementation itself.
// We wrap it so that a span is started immediately beforehand, and ended
// when there is no data to be sent from the server.
shimmer.wrap(handlerSet, 'func', function (serverMethod) {
return function serverMethodTrace(stream) {
var that = this;
var args = arguments;
if (!agent.shouldTrace(requestName)) {
return serverMethod.call(that, stream);
}
// Running in the namespace here propagates context to func.
return namespace.runAndReturn(function() {
var rootContext = startRootSpanForRequest(requestName, 5);
if (agent.config_.enhancedDatabaseReporting) {
shimmer.wrap(stream, 'sendMetadata', sendMetadataWrapper(rootContext));
}
if (agent.config_.enhancedDatabaseReporting) {
rootContext.addLabel('argument', JSON.stringify(stream.request));
}
var spanEnded = false;
var endSpan = function () {
var endSpan = function() {
if (!spanEnded) {
spanEnded = true;
endRootSpanForRequest(rootContext);
Expand All @@ -244,7 +246,7 @@ function wrapServerStream(namespace, handlerSet, requestName) {
}
endSpan();
});
return func.apply(that, args);
return serverMethod.call(that, stream);
});
};
});
Expand All @@ -253,15 +255,20 @@ function wrapServerStream(namespace, handlerSet, requestName) {
/**
* Wraps a client streaming function in order to record trace spans.
* @param {cls.Namespace} namespace The CLS namespace.
* @param {Object} handlerSet An object containing references to the function handle,
* as well as serialize and deserialize handles.
* @param {Object} handlerSet An object containing references to the function
* handle.
* @param {string} requestName The human-friendly name of the request.
*/
function wrapClientStream(namespace, handlerSet, requestName) {
shimmer.wrap(handlerSet, 'func', function (func) {
// handlerSet.func is the gRPC method implementation itself.
// We wrap it so that a span is started immediately beforehand, and ended
// when the callback provided to it as an argument is invoked.
shimmer.wrap(handlerSet, 'func', function (serverMethod) {
return function serverMethodTrace(stream, callback) {
var that = this;
var args = arguments;
if (!agent.shouldTrace(requestName)) {
return serverMethod.call(that, stream, callback);
}
// Running in the namespace here propagates context to func.
return namespace.runAndReturn(function() {
var rootContext = startRootSpanForRequest(requestName, 5);
Expand All @@ -275,11 +282,9 @@ function wrapClientStream(namespace, handlerSet, requestName) {
// the server to send a response, not the time until all data has been
// received from the client.
namespace.bindEmitter(stream);
// args[1] is the callback.
// Here, we patch the callback so that the span is ended immediately
// beforehand.
args = Array.prototype.slice.call(args);
args[1] = function (err, result, trailer, flags) {
var wrappedCb = function (err, result, trailer, flags) {
if (agent.config_.enhancedDatabaseReporting) {
if (err) {
rootContext.addLabel('error', err);
Expand All @@ -293,7 +298,7 @@ function wrapClientStream(namespace, handlerSet, requestName) {
endRootSpanForRequest(rootContext);
return callback(err, result, trailer, flags);
};
return func.apply(that, args);
return serverMethod.call(that, stream, wrappedCb);
});
};
});
Expand All @@ -302,15 +307,20 @@ function wrapClientStream(namespace, handlerSet, requestName) {
/**
* Wraps a bidirectional streaming function in order to record trace spans.
* @param {cls.Namespace} namespace The CLS namespace.
* @param {Object} handlerSet An object containing references to the function handle,
* as well as serialize and deserialize handles.
* @param {Object} handlerSet An object containing references to the function
* handle.
* @param {string} requestName The human-friendly name of the request.
*/
function wrapBidi(namespace, handlerSet, requestName) {
shimmer.wrap(handlerSet, 'func', function (func) {
// handlerSet.func is the gRPC method implementation itself.
// We wrap it so that a span is started immediately beforehand, and ended
// when there is no data to be sent from the server.
shimmer.wrap(handlerSet, 'func', function (serverMethod) {
return function serverMethodTrace(stream) {
var that = this;
var args = arguments;
if (!agent.shouldTrace(requestName)) {
return serverMethod.call(that, stream);
}
// Running in the namespace here propagates context to func.
return namespace.runAndReturn(function() {
var rootContext = startRootSpanForRequest(requestName, 5);
Expand Down Expand Up @@ -344,7 +354,7 @@ function wrapBidi(namespace, handlerSet, requestName) {
}
endSpan();
});
return func.apply(that, args);
return serverMethod.call(that, stream);
});
};
});
Expand Down Expand Up @@ -372,16 +382,21 @@ function serverRegisterWrap(register) {
var requestName = 'grpc:' + name;
// Proceed to wrap methods that are invoked when a gRPC service call is made.
// In every case, the function 'func' is the user-implemented handling function.
if (method_type === 'unary') {
wrapUnary(namespace, handlerSet, requestName);
} else if (method_type === 'server_stream') {
wrapServerStream(namespace, handlerSet, requestName);
} else if (method_type === 'client_stream') {
wrapClientStream(namespace, handlerSet, requestName);
} else if (method_type === 'bidi') {
wrapBidi(namespace, handlerSet, requestName);
} else {
agent.logger.warn('gRPC Server: Unrecognized method_type ' + method_type);
switch (method_type) {
case 'unary':
wrapUnary(namespace, handlerSet, requestName);
break;
case 'server_stream':
wrapServerStream(namespace, handlerSet, requestName);
break;
case 'client_stream':
wrapClientStream(namespace, handlerSet, requestName);
break;
case 'bidi':
wrapBidi(namespace, handlerSet, requestName);
break;
default:
agent.logger.warn('gRPC Server: Unrecognized method_type ' + method_type);
}
return result;
};
Expand Down
12 changes: 12 additions & 0 deletions test/hooks/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ var config = { enhancedDatabaseReporting: true, samplingRate: 0 };
var agent = require('../..').start(config).private_();
// We want to disable publishing to avoid conflicts with production.
agent.traceWriter.publish_ = function() {};
agent._shouldTraceArgs = [];
var shouldTrace = agent.shouldTrace;
agent.shouldTrace = function() {
agent._shouldTraceArgs.push([].slice.call(arguments, 0));
return shouldTrace.apply(this, arguments);
};

var cls = require('../../src/cls.js');

Expand All @@ -44,12 +50,17 @@ var SERVER_CERT = fs.readFileSync(path.join(__dirname, 'fixtures', 'cert.pem'));
*/
function cleanTraces() {
agent.traceWriter.buffer_ = [];
agent._shouldTraceArgs = [];
}

function getTraces() {
return agent.traceWriter.buffer_.map(JSON.parse);
}

function getShouldTraceArgs() {
return agent._shouldTraceArgs;
}

function getMatchingSpan(predicate) {
var spans = getMatchingSpans(predicate);
assert.equal(spans.length, 1,
Expand Down Expand Up @@ -146,6 +157,7 @@ module.exports = {
createChildSpan: createChildSpan,
getTraces: getTraces,
runInTransaction: runInTransaction,
getShouldTraceArgs: getShouldTraceArgs,
serverWait: SERVER_WAIT,
serverRes: SERVER_RES,
serverPort: SERVER_PORT,
Expand Down
Loading

0 comments on commit f8f8145

Please sign in to comment.