Skip to content

Commit

Permalink
Merge branch 'master' into run-stores
Browse files Browse the repository at this point in the history
  • Loading branch information
tlhunter authored Jun 26, 2023
2 parents 9e10b4e + eeda4f8 commit b293353
Show file tree
Hide file tree
Showing 95 changed files with 4,762 additions and 246 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
injection-image-publish:
runs-on: ubuntu-latest
needs: ['publish']
needs: ['dev_release']
steps:
- uses: actions/checkout@v3
- uses: actions/setup-node@v3
Expand Down
1 change: 1 addition & 0 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ package:
stage: deploy
variables:
PRODUCT_NAME: auto_inject-node
PACKAGE_FILTER: js # product name is "node" but package name ends "js"

deploy_to_reliability_env:
stage: deploy
Expand Down
14 changes: 14 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1450,6 +1450,20 @@ declare namespace plugins {
};
}

/**
* This plugin automatically instruments the
* [openai](https://platform.openai.com/docs/api-reference?lang=node.js) module.
*
* Note that for logs to work you'll need to set the `DD_API_KEY` environment variable.
* You'll also need to adjust any firewall settings to allow the tracer to communicate
* with `http-intake.logs.datadoghq.com`.
*
* Note that for metrics to work you'll need to enable
* [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/?tab=hostagent#setup)
* in the agent.
*/
interface openai extends Instrumentation {}

/**
* This plugin automatically instruments the
* [opensearch](https://github.com/opensearch-project/opensearch-js) module.
Expand Down
50 changes: 50 additions & 0 deletions integration-tests/helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class FakeAgent extends EventEmitter {
async start () {
const app = express()
app.use(bodyParser.raw({ limit: Infinity, type: 'application/msgpack' }))
app.use(bodyParser.json({ limit: Infinity, type: 'application/json' }))
app.put('/v0.4/traces', (req, res) => {
if (req.body.length === 0) return res.status(200).send()
res.status(200).send({ rate_by_service: { 'service:,env:': 1 } })
Expand All @@ -43,6 +44,13 @@ class FakeAgent extends EventEmitter {
files: req.files
})
})
app.post('/telemetry/proxy/api/v2/apmtelemetry', (req, res) => {
res.status(200).send()
this.emit('telemetry', {
headers: req.headers,
payload: req.body
})
})

return new Promise((resolve, reject) => {
const timeoutObj = setTimeout(() => {
Expand Down Expand Up @@ -103,6 +111,48 @@ class FakeAgent extends EventEmitter {

return resultPromise
}

assertTelemetryReceived (fn, timeout, requestType, expectedMessageCount = 1) {
timeout = timeout || 5000
let resultResolve
let resultReject
let msgCount = 0
const errors = []

const timeoutObj = setTimeout(() => {
resultReject([...errors, new Error('timeout')])
}, timeout)

const resultPromise = new Promise((resolve, reject) => {
resultResolve = () => {
clearTimeout(timeoutObj)
resolve()
}
resultReject = (e) => {
clearTimeout(timeoutObj)
reject(e)
}
})

const messageHandler = msg => {
if (msg.payload.request_type !== requestType) return
msgCount += 1
try {
fn(msg)
if (msgCount === expectedMessageCount) {
resultResolve()
}
} catch (e) {
errors.push(e)
}
if (msgCount === expectedMessageCount) {
this.removeListener('telemetry', messageHandler)
}
}
this.on('telemetry', messageHandler)

return resultPromise
}
}

function spawnProc (filename, options = {}) {
Expand Down
56 changes: 54 additions & 2 deletions integration-tests/opentelemetry.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ const { join } = require('path')
const { assert } = require('chai')
const { satisfies } = require('semver')

function check (agent, proc, timeout, onMessage = () => { }) {
function check (agent, proc, timeout, onMessage = () => { }, isMetrics) {
const messageReceiver = isMetrics
? agent.assertTelemetryReceived(onMessage, timeout, 'generate-metrics')
: agent.assertMessageReceived(onMessage, timeout)

return Promise.all([
agent.assertMessageReceived(onMessage, timeout),
messageReceiver,
new Promise((resolve, reject) => {
const timer = setTimeout(() => {
reject(new Error('Process timed out'))
Expand Down Expand Up @@ -38,6 +42,11 @@ function eachEqual (spans, expected, fn) {
return spans.every((span, i) => fn(span) === expected[i])
}

function nearNow (ts, now = Date.now(), range = 1000) {
const delta = Math.abs(now - ts)
return delta < range && delta >= 0
}

describe('opentelemetry', () => {
let agent
let proc
Expand Down Expand Up @@ -84,6 +93,49 @@ describe('opentelemetry', () => {
})
})

it('should capture telemetry', () => {
proc = fork(join(cwd, 'opentelemetry/basic.js'), {
cwd,
env: {
DD_TRACE_AGENT_PORT: agent.port,
DD_TRACE_OTEL_ENABLED: 1,
DD_TELEMETRY_HEARTBEAT_INTERVAL: 1,
TIMEOUT: 1500
}
})

return check(agent, proc, timeout, ({ payload }) => {
assert.strictEqual(payload.request_type, 'generate-metrics')

const metrics = payload.payload
assert.strictEqual(metrics.namespace, 'tracers')

const spanCreated = metrics.series.find(({ metric }) => metric === 'span_created')
const spanFinished = metrics.series.find(({ metric }) => metric === 'span_finished')

// Validate common fields between start and finish
for (const series of [spanCreated, spanFinished]) {
assert.ok(series)

assert.strictEqual(series.points.length, 1)
assert.strictEqual(series.points[0].length, 2)

const [ts, value] = series.points[0]
assert.ok(nearNow(ts, Date.now() / 1e3))
assert.strictEqual(value, 1)

assert.strictEqual(series.type, 'count')
assert.strictEqual(series.common, true)
assert.deepStrictEqual(series.tags, [
'integration_name:otel',
'otel_enabled:true',
'lib_language:nodejs',
`version:${process.version}`
])
}
}, true)
})

it('should work within existing datadog-traced http request', async () => {
proc = fork(join(cwd, 'opentelemetry/server.js'), {
cwd,
Expand Down
5 changes: 5 additions & 0 deletions integration-tests/opentelemetry/basic.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
'use strict'

const TIMEOUT = Number(process.env.TIMEOUT || 0)

const tracer = require('dd-trace').init()

const { TracerProvider } = tracer
Expand All @@ -16,5 +18,8 @@ const otelTracer = ot.trace.getTracer(
otelTracer.startActiveSpan('otel-sub', otelSpan => {
setImmediate(() => {
otelSpan.end()

// Allow the process to be held open to gather telemetry metrics
setTimeout(() => {}, TIMEOUT)
})
})
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
"@datadog/native-iast-rewriter": "2.0.1",
"@datadog/native-iast-taint-tracking": "^1.5.0",
"@datadog/native-metrics": "^2.0.0",
"@datadog/pprof": "2.2.2",
"@datadog/pprof": "2.2.3",
"@datadog/sketches-js": "^2.1.0",
"@opentelemetry/api": "^1.0.0",
"@opentelemetry/core": "^1.14.0",
Expand Down
3 changes: 2 additions & 1 deletion packages/.eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
"proxyquire": true,
"withNamingSchema": true,
"withVersions": true,
"withExports": true
"withExports": true,
"withPeerService": true
},
"rules": {
"no-unused-expressions": 0,
Expand Down
1 change: 1 addition & 0 deletions packages/datadog-instrumentations/src/helpers/hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ module.exports = {
'net': () => require('../net'),
'next': () => require('../next'),
'oracledb': () => require('../oracledb'),
'openai': () => require('../openai'),
'paperplane': () => require('../paperplane'),
'pg': () => require('../pg'),
'pino': () => require('../pino'),
Expand Down
50 changes: 50 additions & 0 deletions packages/datadog-instrumentations/src/openai.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
'use strict'

const {
channel,
addHook
} = require('./helpers/instrument')
const shimmer = require('../../datadog-shimmer')

const startCh = channel('apm:openai:request:start')
const finishCh = channel('apm:openai:request:finish')
const errorCh = channel('apm:openai:request:error')

addHook({ name: 'openai', file: 'dist/api.js', versions: ['>=3.0.0'] }, exports => {
const methodNames = Object.getOwnPropertyNames(exports.OpenAIApi.prototype)
methodNames.shift() // remove leading 'constructor' method

for (const methodName of methodNames) {
shimmer.wrap(exports.OpenAIApi.prototype, methodName, fn => function () {
if (!startCh.hasSubscribers) {
return fn.apply(this, arguments)
}

startCh.publish({
methodName,
args: arguments,
basePath: this.basePath,
apiKey: this.configuration.apiKey
})

return fn.apply(this, arguments)
.then((response) => {
finishCh.publish({
headers: response.headers,
body: response.data,
path: response.request.path,
method: response.request.method
})

return response
})
.catch((err) => {
errorCh.publish({ err })

throw err
})
})
}

return exports
})
7 changes: 7 additions & 0 deletions packages/datadog-plugin-amqp10/test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ describe('Plugin', () => {
})

describe('when sending messages', () => {
withPeerService(
() => tracer,
() => sender.send({ key: 'value' }),
'localhost',
'out.host'
)

it('should do automatic instrumentation', done => {
agent
.use(traces => {
Expand Down
14 changes: 14 additions & 0 deletions packages/datadog-plugin-amqplib/test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ describe('Plugin', () => {
})

describe('when sending commands', () => {
withPeerService(
() => tracer,
() => channel.assertQueue('test', {}, () => {}),
'localhost',
'out.host'
)

it('should do automatic instrumentation for immediate commands', done => {
agent
.use(traces => {
Expand Down Expand Up @@ -124,6 +131,13 @@ describe('Plugin', () => {
})

describe('when publishing messages', () => {
withPeerService(
() => tracer,
() => channel.assertQueue('test', {}, () => {}),
'localhost',
'out.host'
)

it('should do automatic instrumentation', done => {
agent
.use(traces => {
Expand Down
4 changes: 2 additions & 2 deletions packages/datadog-plugin-cassandra-driver/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ class CassandraDriverPlugin extends DatabasePlugin {
query = combine(query)
}

this.startSpan('cassandra.query', {
service: this.config.service,
this.startSpan(this.operationName(), {
service: this.serviceName(this.config, this.system),
resource: trim(query, 5000),
type: 'cassandra',
kind: 'client',
Expand Down
19 changes: 17 additions & 2 deletions packages/datadog-plugin-cassandra-driver/test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const semver = require('semver')
const agent = require('../../dd-trace/test/plugins/agent')
const { ERROR_TYPE, ERROR_MESSAGE, ERROR_STACK } = require('../../dd-trace/src/constants')
const namingSchema = require('./naming')

describe('Plugin', () => {
let cassandra
Expand Down Expand Up @@ -46,7 +47,8 @@ describe('Plugin', () => {
const query = 'SELECT now() FROM local;'
agent
.use(traces => {
expect(traces[0][0]).to.have.property('service', 'test-cassandra')
expect(traces[0][0]).to.have.property('name', namingSchema.outbound.opName)
expect(traces[0][0]).to.have.property('service', namingSchema.outbound.serviceName)
expect(traces[0][0]).to.have.property('resource', query)
expect(traces[0][0]).to.have.property('type', 'cassandra')
expect(traces[0][0].meta).to.have.property('db.type', 'cassandra')
Expand Down Expand Up @@ -142,6 +144,12 @@ describe('Plugin', () => {
})
})
})

withNamingSchema(
done => client.execute('SELECT now() FROM local;', err => err && done(err)),
() => namingSchema.outbound.opName,
() => namingSchema.outbound.serviceName
)
})

describe('with configuration', () => {
Expand Down Expand Up @@ -181,6 +189,12 @@ describe('Plugin', () => {

client.execute('SELECT now() FROM local;', err => err && done(err))
})

withNamingSchema(
done => client.execute('SELECT now() FROM local;', err => err && done(err)),
() => namingSchema.outbound.opName,
() => 'custom'
)
})

// Promise support added in 3.2.0
Expand Down Expand Up @@ -219,7 +233,8 @@ describe('Plugin', () => {

agent
.use(traces => {
expect(traces[0][0]).to.have.property('service', 'test-cassandra')
expect(traces[0][0]).to.have.property('name', namingSchema.outbound.opName)
expect(traces[0][0]).to.have.property('service', namingSchema.outbound.serviceName)
expect(traces[0][0]).to.have.property('resource', query)
expect(traces[0][0]).to.have.property('type', 'cassandra')
expect(traces[0][0].meta).to.have.property('db.type', 'cassandra')
Expand Down
Loading

0 comments on commit b293353

Please sign in to comment.