Skip to content

Commit

Permalink
Send instrumentation telemetry app-heartbeat events in uniform interv…
Browse files Browse the repository at this point in the history
…als (#3553)

* modified how telemetry heartbeat is sent

* added tests for instel app-heartbeat

* added TODO comment

* modified telemetry heartbeat and tests

* fixed amqp10 ci test failure
  • Loading branch information
ida613 authored and tlhunter committed Aug 25, 2023
1 parent 200acd2 commit 88e2124
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ import 'dd-trace/init.js'
import amqp from 'amqp10'

const client = new amqp.Client()
await client.connect('amqp://admin:admin@localhost:5673')
await client.connect('amqp://admin:admin@127.0.0.1:5673')
const handlers = await Promise.all([client.createSender('amq.topic')])
const sender = handlers[0]

sender.send({ key: 'value' })

if (sender) {
await sender.detach()
}
if (client) {
await client.disconnect()
}
}
12 changes: 11 additions & 1 deletion packages/dd-trace/src/telemetry/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ let pluginManager
let application
let host
let interval
let heartbeatTimeout
let heartbeatInterval
const sentIntegrations = new Set()

Expand Down Expand Up @@ -110,6 +111,14 @@ function getTelemetryData () {
return { config, application, host, heartbeatInterval }
}

function heartbeat (config, application, host) {
heartbeatTimeout = setTimeout(() => {
sendData(config, application, host, 'app-heartbeat')
heartbeat(config, application, host)
}, heartbeatInterval).unref()
return heartbeatTimeout
}

function start (aConfig, thePluginManager) {
if (!aConfig.telemetry.enabled) {
return
Expand All @@ -122,9 +131,9 @@ function start (aConfig, thePluginManager) {

dependencies.start(config, application, host)
sendData(config, application, host, 'app-started', appStarted())
heartbeat(config, application, host)
interval = setInterval(() => {
metricsManager.send(config, application, host)
sendData(config, application, host, 'app-heartbeat')
}, heartbeatInterval)
interval.unref()
process.on('beforeExit', onBeforeExit)
Expand All @@ -137,6 +146,7 @@ function stop () {
return
}
clearInterval(interval)
clearTimeout(heartbeatTimeout)
process.removeListener('beforeExit', onBeforeExit)

telemetryStopChannel.publish(getTelemetryData())
Expand Down
108 changes: 98 additions & 10 deletions packages/dd-trace/test/telemetry/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const os = require('os')
let traceAgent

describe('telemetry', () => {
const HEARTBEAT_INTERVAL = 60000
let origSetInterval
let telemetry
let pluginsByName
Expand All @@ -20,7 +21,7 @@ describe('telemetry', () => {
origSetInterval = setInterval

global.setInterval = (fn, interval) => {
expect(interval).to.equal(60000)
expect(interval).to.equal(HEARTBEAT_INTERVAL)
// we only want one of these
return setTimeout(fn, 100)
}
Expand Down Expand Up @@ -64,7 +65,7 @@ describe('telemetry', () => {
circularObject.child.parent = circularObject

telemetry.start({
telemetry: { enabled: true, heartbeatInterval: 60000 },
telemetry: { enabled: true, heartbeatInterval: HEARTBEAT_INTERVAL },
hostname: 'localhost',
port: traceAgent.address().port,
service: 'test service',
Expand Down Expand Up @@ -107,17 +108,11 @@ describe('telemetry', () => {
})
})

it('should send app-heartbeat', () => {
return testSeq(2, 'app-heartbeat', payload => {
expect(payload).to.deep.equal({})
})
})

it('should send app-integrations-change', () => {
pluginsByName.baz2 = { _enabled: true }
telemetry.updateIntegrations()

return testSeq(3, 'app-integrations-change', payload => {
return testSeq(2, 'app-integrations-change', payload => {
expect(payload).to.deep.equal({
integrations: [
{ name: 'baz2', enabled: true, auto_enabled: true }
Expand All @@ -130,7 +125,7 @@ describe('telemetry', () => {
pluginsByName.boo2 = { _enabled: true }
telemetry.updateIntegrations()

return testSeq(4, 'app-integrations-change', payload => {
return testSeq(3, 'app-integrations-change', payload => {
expect(payload).to.deep.equal({
integrations: [
{ name: 'boo2', enabled: true, auto_enabled: true }
Expand Down Expand Up @@ -167,6 +162,98 @@ describe('telemetry', () => {
})
})

describe('telemetry app-heartbeat', () => {
const HEARTBEAT_INTERVAL = 60
let origSetInterval
let telemetry
let pluginsByName

before(done => {
origSetInterval = setInterval

global.setInterval = (fn, interval) => {
expect(interval).to.equal(HEARTBEAT_INTERVAL)
// we only want one of these
return setTimeout(fn, 100)
}

storage.run({ noop: true }, () => {
traceAgent = http.createServer(async (req, res) => {
const chunks = []
for await (const chunk of req) {
chunks.push(chunk)
}
req.body = JSON.parse(Buffer.concat(chunks).toString('utf8'))
traceAgent.reqs.push(req)
traceAgent.emit('handled-req')
res.end()
}).listen(0, done)
})

traceAgent.reqs = []

telemetry = proxyquire('../../src/telemetry', {
'../exporters/common/docker': {
id () {
return 'test docker id'
}
}
})

pluginsByName = {
foo2: { _enabled: true },
bar2: { _enabled: false }
}

const circularObject = {
child: { parent: null, field: 'child_value' },
field: 'parent_value'
}
circularObject.child.parent = circularObject

telemetry.start({
telemetry: { enabled: true, heartbeatInterval: HEARTBEAT_INTERVAL },
hostname: 'localhost',
port: traceAgent.address().port,
service: 'test service',
version: '1.2.3-beta4',
env: 'preprod',
tags: {
'runtime-id': '1a2b3c'
},
circularObject
}, {
_pluginsByName: pluginsByName
})
})

after(() => {
setTimeout(() => {
telemetry.stop()
traceAgent.close()
global.setInterval = origSetInterval
}, HEARTBEAT_INTERVAL * 3)
})

it('should send app-heartbeat at uniform intervals', () => {
// TODO: switch to clock.tick
setTimeout(() => {
const heartbeats = []
const reqCount = traceAgent.reqs.length
for (let i = 0; i < reqCount; i++) {
const req = traceAgent.reqs[i]
if (req.headers && req.headers['dd-telemetry-request-type'] === 'app-heartbeat') {
heartbeats.push(req.body.tracer_time)
}
}
expect(heartbeats.length).to.be.greaterThanOrEqual(2)
for (let k = 0; k++; k < heartbeats.length - 1) {
expect(heartbeats[k + 1] - heartbeats[k]).to.be.equal(1)
}
}, HEARTBEAT_INTERVAL * 3)
})
})

describe('telemetry with interval change', () => {
it('should set the interval correctly', (done) => {
const telemetry = proxyquire('../../src/telemetry', {
Expand Down Expand Up @@ -203,6 +290,7 @@ describe('telemetry with interval change', () => {

process.nextTick(() => {
expect(intervalSetCorrectly).to.be.true
telemetry.stop()
done()
})
})
Expand Down

0 comments on commit 88e2124

Please sign in to comment.