diff --git a/packages/datadog-plugin-google-cloud-pubsub/src/consumer.js b/packages/datadog-plugin-google-cloud-pubsub/src/consumer.js index 3a330ad4c3a..84c4122ec57 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/src/consumer.js +++ b/packages/datadog-plugin-google-cloud-pubsub/src/consumer.js @@ -1,5 +1,6 @@ 'use strict' +const { getMessageSize } = require('../../dd-trace/src/datastreams/processor') const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') class GoogleCloudPubsubConsumerPlugin extends ConsumerPlugin { @@ -11,7 +12,7 @@ class GoogleCloudPubsubConsumerPlugin extends ConsumerPlugin { const topic = subscription.metadata && subscription.metadata.topic const childOf = this.tracer.extract('text_map', message.attributes) || null - this.startSpan({ + const span = this.startSpan({ childOf, resource: topic, type: 'worker', @@ -23,6 +24,12 @@ class GoogleCloudPubsubConsumerPlugin extends ConsumerPlugin { 'pubsub.ack': 0 } }) + if (this.config.dsmEnabled && message?.attributes) { + const payloadSize = getMessageSize(message) + this.tracer.decodeDataStreamsContext(message.attributes) + this.tracer + .setCheckpoint(['direction:in', `topic:${topic}`, 'type:google-pubsub'], span, payloadSize) + } } finish (message) { diff --git a/packages/datadog-plugin-google-cloud-pubsub/src/producer.js b/packages/datadog-plugin-google-cloud-pubsub/src/producer.js index a34d6bfacd8..b6261ee85b6 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/src/producer.js +++ b/packages/datadog-plugin-google-cloud-pubsub/src/producer.js @@ -1,6 +1,8 @@ 'use strict' const ProducerPlugin = require('../../dd-trace/src/plugins/producer') +const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway') +const { getHeadersSize } = require('../../dd-trace/src/datastreams/processor') class GoogleCloudPubsubProducerPlugin extends ProducerPlugin { static get id () { return 'google-cloud-pubsub' } @@ -25,6 +27,12 @@ class GoogleCloudPubsubProducerPlugin extends ProducerPlugin { msg.attributes = {} } this.tracer.inject(span, 'text_map', msg.attributes) + if (this.config.dsmEnabled) { + const payloadSize = getHeadersSize(msg) + const dataStreamsContext = this.tracer + .setCheckpoint(['direction:out', `topic:${topic}`, 'type:google-pubsub'], span, payloadSize) + DsmPathwayCodec.encode(dataStreamsContext, msg.attributes) + } } } } diff --git a/packages/datadog-plugin-google-cloud-pubsub/test/index.spec.js b/packages/datadog-plugin-google-cloud-pubsub/test/index.spec.js index 89a0c5f03b8..80bc5f9509d 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/test/index.spec.js +++ b/packages/datadog-plugin-google-cloud-pubsub/test/index.spec.js @@ -6,9 +6,12 @@ const id = require('../../dd-trace/src/id') const { ERROR_MESSAGE, ERROR_TYPE, ERROR_STACK } = require('../../dd-trace/src/constants') const { expectedSchema, rawExpectedSchema } = require('./naming') +const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') +const { ENTRY_PARENT_HASH, DataStreamsProcessor } = require('../../dd-trace/src/datastreams/processor') // The roundtrip to the pubsub emulator takes time. Sometimes a *long* time. const TIMEOUT = 30000 +const dsmTopicName = 'dsm-topic' describe('Plugin', () => { let tracer @@ -18,6 +21,7 @@ describe('Plugin', () => { before(() => { process.env.PUBSUB_EMULATOR_HOST = 'localhost:8081' + process.env.DD_DATA_STREAMS_ENABLED = true }) after(() => { @@ -34,10 +38,12 @@ describe('Plugin', () => { let resource let v1 let gax + let expectedProducerHash + let expectedConsumerHash describe('without configuration', () => { beforeEach(() => { - return agent.load('google-cloud-pubsub') + return agent.load('google-cloud-pubsub', { dsmEnabled: false }) }) beforeEach(() => { @@ -296,7 +302,8 @@ describe('Plugin', () => { describe('with configuration', () => { beforeEach(() => { return agent.load('google-cloud-pubsub', { - service: 'a_test_service' + service: 'a_test_service', + dsmEnabled: false }) }) @@ -322,6 +329,113 @@ describe('Plugin', () => { }) }) + describe('data stream monitoring', () => { + let dsmTopic + let sub + let consume + + beforeEach(() => { + return agent.load('google-cloud-pubsub', { + dsmEnabled: true + }) + }) + + before(async () => { + const { PubSub } = require(`../../../versions/@google-cloud/pubsub@${version}`).get() + project = getProjectId() + resource = `projects/${project}/topics/${dsmTopicName}` + pubsub = new PubSub({ projectId: project }) + tracer.use('google-cloud-pubsub', { dsmEnabled: true }) + + dsmTopic = await pubsub.createTopic(dsmTopicName) + dsmTopic = dsmTopic[0] + sub = await dsmTopic.createSubscription('DSM') + sub = sub[0] + consume = function (cb) { + sub.on('message', cb) + } + + const dsmFullTopic = `projects/${project}/topics/${dsmTopicName}` + + expectedProducerHash = computePathwayHash( + 'test', + 'tester', + ['direction:out', 'topic:' + dsmFullTopic, 'type:google-pubsub'], + ENTRY_PARENT_HASH + ) + expectedConsumerHash = computePathwayHash( + 'test', + 'tester', + ['direction:in', 'topic:' + dsmFullTopic, 'type:google-pubsub'], + expectedProducerHash + ) + }) + + describe('should set a DSM checkpoint', () => { + it('on produce', async () => { + await publish(dsmTopic, { data: Buffer.from('DSM produce checkpoint') }) + + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = 0 + // we should have 1 dsm stats points + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived += statsBuckets.Stats.length + }) + } + }) + expect(statsPointsReceived).to.be.at.least(1) + expect(agent.dsmStatsExist(agent, expectedProducerHash.readBigUInt64BE(0).toString())).to.equal(true) + }, { timeoutMs: TIMEOUT }) + }) + + it('on consume', async () => { + await publish(dsmTopic, { data: Buffer.from('DSM consume checkpoint') }) + await consume(async () => { + agent.expectPipelineStats(dsmStats => { + let statsPointsReceived = 0 + // we should have 2 dsm stats points + dsmStats.forEach((timeStatsBucket) => { + if (timeStatsBucket && timeStatsBucket.Stats) { + timeStatsBucket.Stats.forEach((statsBuckets) => { + statsPointsReceived += statsBuckets.Stats.length + }) + } + }) + expect(statsPointsReceived).to.be.at.least(2) + expect(agent.dsmStatsExist(agent, expectedConsumerHash.readBigUInt64BE(0).toString())).to.equal(true) + }, { timeoutMs: TIMEOUT }) + }) + }) + }) + + describe('it should set a message payload size', () => { + let recordCheckpointSpy + + beforeEach(() => { + recordCheckpointSpy = sinon.spy(DataStreamsProcessor.prototype, 'recordCheckpoint') + }) + + afterEach(() => { + DataStreamsProcessor.prototype.recordCheckpoint.restore() + }) + + it('when producing a message', async () => { + await publish(dsmTopic, { data: Buffer.from('DSM produce payload size') }) + expect(recordCheckpointSpy.args[0][0].hasOwnProperty('payloadSize')) + }) + + it('when consuming a message', async () => { + await publish(dsmTopic, { data: Buffer.from('DSM consume payload size') }) + + await consume(async () => { + expect(recordCheckpointSpy.args[0][0].hasOwnProperty('payloadSize')) + }) + }) + }) + }) + function expectSpanWithDefaults (expected) { const prefixedResource = [expected.meta['pubsub.method'], resource].filter(x => x).join(' ') const service = expected.meta['pubsub.method'] ? 'test-pubsub' : 'test'