Skip to content

Commit

Permalink
Merge pull request #361 from jgilbert01/instrument-xray-telemetry
Browse files Browse the repository at this point in the history
metrics-support-2-xray-instrumentation
  • Loading branch information
petermyers authored Jul 3, 2024
2 parents 7b0170c + c04425b commit 7f764ed
Show file tree
Hide file tree
Showing 23 changed files with 615 additions and 85 deletions.
2 changes: 1 addition & 1 deletion .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ module.exports = {
"prefer-template": 1,
"import/no-unresolved": 1,
"import/no-extraneous-dependencies": [2, { "devDependencies": ["**/test/**/*.js"] }],
"global-require": 1,
"global-require": 0,
"no-underscore-dangle": 0,
"new-cap": 0,
"no-use-before-define": 0,
Expand Down
106 changes: 104 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "aws-lambda-stream",
"version": "1.0.11",
"version": "1.0.12",
"description": "Create stream processors with AWS Lambda functions.",
"keywords": [
"aws",
Expand Down Expand Up @@ -41,7 +41,9 @@
"env": {
"AWS_REGION": "us-west-2",
"KMS_REGIONS": "us-west-2",
"NODE_ENV": "test"
"NODE_ENV": "test",
"AWS_ACCESS_KEY_ID": "notarealaccesskeyid",
"AWS_SECRET_ACCESS_KEY": "notarealsecretaccesskey"
}
}
},
Expand Down Expand Up @@ -78,6 +80,7 @@
"@smithy/util-stream": "^2.1.1",
"aws-kms-ee": "^1.0.0",
"aws-sdk-client-mock": "^3.0.0",
"aws-xray-sdk-core": "^3.9.0",
"babel-eslint": "^10.0.0",
"babel-plugin-istanbul": "^5.2.0",
"better-npm-run": "^0.1.1",
Expand Down Expand Up @@ -108,6 +111,7 @@
"@smithy/util-retry": "2.x",
"@smithy/util-stream": "2.x",
"aws-kms-ee": "1.x",
"aws-xray-sdk-core": "3.x",
"bluebird": "3.x",
"debug": "4.x",
"highland": "2.x",
Expand Down
14 changes: 7 additions & 7 deletions src/connectors/dynamodb.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class Connector {
...inputParams,
};

return this._executeCommand(new UpdateCommand(params), ctx)
return this._sendCommand(new UpdateCommand(params), ctx)
.catch((err) => {
/* istanbul ignore else */
if (err.name === 'ConditionalCheckFailedException') {
Expand All @@ -80,7 +80,7 @@ class Connector {
...inputParams,
};

return this._executeCommand(new PutCommand(params), ctx);
return this._sendCommand(new PutCommand(params), ctx);
}

batchGet(inputParams, ctx) {
Expand All @@ -106,7 +106,7 @@ class Connector {

return _((push, next) => {
params.ExclusiveStartKey = cursor;
return this._executeCommand(new QueryCommand(params), ctx)
return this._sendCommand(new QueryCommand(params), ctx)
.then((data) => {
itemsCount += data.Items.length;

Expand Down Expand Up @@ -141,7 +141,7 @@ class Connector {
...inputParams,
};

return this._executeCommand(new QueryCommand(params), ctx);
return this._sendCommand(new QueryCommand(params), ctx);
}

scan(inputParams, ctx) {
Expand All @@ -150,14 +150,14 @@ class Connector {
...inputParams,
};

return this._executeCommand(new ScanCommand(params), ctx);
return this._sendCommand(new ScanCommand(params), ctx);
}

_batchGet(params, attempts, ctx) {
assertMaxRetries(attempts, this.retryConfig.maxRetries);

return wait(getDelay(this.retryConfig.retryWait, attempts.length))
.then(() => this._executeCommand(new BatchGetCommand(params), ctx)
.then(() => this._sendCommand(new BatchGetCommand(params), ctx)
.then((resp) => {
const response = {
Responses: {},
Expand All @@ -171,7 +171,7 @@ class Connector {
}));
}

_executeCommand(command, ctx) {
_sendCommand(command, ctx) {
this.opt.metrics?.capture(this.client, command, 'dynamodb', this.opt, ctx);
return Promise.resolve(this.client.send(command))
.tap(this.debug)
Expand Down
2 changes: 2 additions & 0 deletions src/metrics/calculate.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export const calculateMetrics = (collected) => { // eslint-disable-line import/p
functionMetrics()['stream.uow.count'] = collected.length;

const checkpoints = collected
.filter((uow) => uow.metrics?.timer)
.reduce((a, { metrics: { timer } }) => [
...a,
...Object.entries(timer.checkpoints)
Expand All @@ -39,6 +40,7 @@ export const calculateMetrics = (collected) => { // eslint-disable-line import/p
// console.log('checkpoints: ', checkpoints);

const gauges = collected
.filter((uow) => uow.metrics?.gauges)
.reduce((a, { metrics: { gauges } }) => [ // eslint-disable-line no-shadow
...a,
...Object.entries(gauges)
Expand Down
11 changes: 8 additions & 3 deletions src/metrics/capture.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
export const capture = (client, command, connector, opt, ctx) => // eslint-disable-line import/prefer-default-export
/* eslint-disable import/prefer-default-export */
export const capture = (client, command, connector, opt, ctx) => {
// console.log('capture: ', connector, opt, ctx);
// TODO addMiddleware(client, command, connector, opt, ctx);
// TODO conditional xray
client;
if (opt.xrayEnabled) {
// Wraps client with xray middlware.
require('./xray').captureSdkClientTraces(client);
}
return client;
};
5 changes: 5 additions & 0 deletions src/metrics/monitor.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ export const monitor = (handle, opt) => { // eslint-disable-line import/prefer-d
};
}

/* istanbul ignore else */
if (process.env.ENABLE_XRAY === 'true' || process.env.AWS_XRAY_DAEMON_ADDRESS) {
opt.xrayEnabled = true;
}

// could collect metrics here
return (event, context) => Promise.resolve().then(() => handle(event, context));
// could collect metrics here in .tap() and .tapCatch()
Expand Down
26 changes: 15 additions & 11 deletions src/metrics/pipelines.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ export const functionMetrics = (m) => {

export const clear = (opt) => {
funcMetrics = {};
// TODO conditional xray
if (opt.xrayEnabled) {
require('./xray').clearPipelineSegments();
}
};

const startUow = (publishTime, batchSize) => {
export const startUow = (publishTime, batchSize) => {
funcMetrics['stream.batch.size'] = batchSize;
funcMetrics['stream.batch.utilization'] = batchSize / Number(process.env.BATCH_SIZE || /* istanbul ignore next */ 100);

Expand Down Expand Up @@ -72,7 +74,11 @@ class PipelineMetrics {
funcMetrics['stream.pipeline.count'] = pipelineCount;
}

// TODO conditional xray
// Initialize an xray segment if enabled
if (opt.xrayEnabled) {
clone.xraySegment = require('./xray').startPipelineSegment(pipeline);
}

return clone;
}

Expand All @@ -95,13 +101,9 @@ class PipelineMetrics {
// wrap promise
w(p, step) {
const self = this;
return new Promise((resolve, reject) => {
// console.log('uow: ', uow);
self.startStep(step);
return Promise.resolve(p).then(resolve, reject)
// TODO record capacity utilization ???
.tap(self.endStep(step));
});
self.startStep(step);
return Promise.resolve(p()) // now start the promise
.tap(() => self.endStep(step));
}
}

Expand All @@ -112,7 +114,9 @@ export const endPipeline = (pipelineId, opt, s) =>
push(err);
next();
} else if (x === _.nil) {
// TODO conditional xray
if (opt.xrayEnabled) {
require('./xray').terminateSegment(pipelineId);
}
push(null, x);
} else {
// per uow
Expand Down
Loading

0 comments on commit 7f764ed

Please sign in to comment.