From a5f35d42e77d8d5b29286ca62f8c3291e96dafa2 Mon Sep 17 00:00:00 2001 From: Bob Evans Date: Fri, 31 May 2024 15:45:52 -0400 Subject: [PATCH 1/2] feat: Added instrumentation for `kafkajs.Kafka.consumer` --- lib/instrumentation/kafkajs.js | 1 - lib/instrumentation/kafkajs/consumer.js | 99 ++++++++++ lib/instrumentation/kafkajs/index.js | 4 +- lib/instrumentation/kafkajs/producer.js | 2 + lib/shim/message-shim/subscribe-consume.js | 11 +- lib/shim/specs/message-subscribe.js | 8 + lib/symbols.js | 1 + lib/transaction/tracecontext.js | 9 +- .../distributed_tracing/tracecontext.test.js | 31 +++ test/unit/shim/message-shim.test.js | 28 +++ test/versioned/kafkajs/kafka.tap.js | 178 +++++++++++++----- test/versioned/kafkajs/utils.js | 65 ++++++- 12 files changed, 382 insertions(+), 55 deletions(-) create mode 100644 lib/instrumentation/kafkajs/consumer.js diff --git a/lib/instrumentation/kafkajs.js b/lib/instrumentation/kafkajs.js index b1dd62ba39..770f1aedfa 100644 --- a/lib/instrumentation/kafkajs.js +++ b/lib/instrumentation/kafkajs.js @@ -4,5 +4,4 @@ */ 'use strict' - module.exports = require('./kafkajs/index') diff --git a/lib/instrumentation/kafkajs/consumer.js b/lib/instrumentation/kafkajs/consumer.js new file mode 100644 index 0000000000..516c491b5a --- /dev/null +++ b/lib/instrumentation/kafkajs/consumer.js @@ -0,0 +1,99 @@ +/* + * Copyright 2024 New Relic Corporation. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +'use strict' +const { kafkaCtx } = require('../../symbols') +const { MessageSpec, MessageSubscribeSpec, RecorderSpec } = require('../../shim/specs') +const { DESTINATIONS } = require('../../config/attribute-filter') +const CONSUMER_METHODS = [ + 'connect', + 'disconnect', + 'subscribe', + 'stop', + 'commitOffsets', + 'seek', + 'pause', + 'resume' +] +const SEGMENT_PREFIX = 'kafkajs.Kafka.consumer#' + +module.exports = function instrumentConsumer({ shim, kafkajs }) { + shim.wrap(kafkajs.Kafka.prototype, 'consumer', function wrapConsumer(shim, orig) { + return function wrappedConsumer() { + const args = shim.argsToArray.apply(shim, arguments) + const consumer = orig.apply(this, args) + consumer.on(consumer.events.REQUEST, function listener(data) { + // storing broker for when we add `host`, `port` to messaging spans + consumer[kafkaCtx] = { + clientId: data?.payload?.clientId, + broker: data?.payload.broker + } + }) + shim.record(consumer, CONSUMER_METHODS, function wrapper(shim, fn, name) { + return new RecorderSpec({ + name: `${SEGMENT_PREFIX}${name}`, + promise: true + }) + }) + shim.recordSubscribedConsume( + consumer, + 'run', + new MessageSubscribeSpec({ + name: `${SEGMENT_PREFIX}#run`, + destinationType: shim.TOPIC, + promise: true, + consumer: shim.FIRST, + functions: ['eachMessage'], + messageHandler: handler.bind(null, consumer) + }) + ) + return consumer + } + }) +} + +/** + * Message handler that extracts the topic and headers from message being consumed. + * + * This also sets some metrics for byte length of message, and number of messages. + * Lastly, adds tx attributes for byteCount and clientId + * + * @param {object} consumer the instance of kafka consumer + * @param {MessageShim} shim intance of shim + * @param {Array} args arguments passed to the `eachMessage` function of the `consumer.run` + * @returns {MessageSpec} spec for message handling + */ +function handler(consumer, shim, args) { + const [data] = args + const { topic } = data + const segment = shim.getActiveSegment() + + if (segment?.transaction) { + const tx = segment.transaction + const byteLength = data?.message.value?.byteLength + const metricPrefix = `Message/Kafka/Topic/Named/${topic}/Received` + tx.metrics.measureBytes(`${metricPrefix}/Bytes`, byteLength) + // This will always be 1 + tx.metrics.getOrCreateMetric(`${metricPrefix}/Messages`).recordValue(1) + tx.trace.attributes.addAttribute( + DESTINATIONS.TRANS_SCOPE, + 'kafka.consume.byteCount', + byteLength + ) + if (consumer?.[kafkaCtx]) { + tx.trace.attributes.addAttribute( + DESTINATIONS.TRANS_EVENT, + 'kafka.consume.client_id', + consumer[kafkaCtx].clientId + ) + } + } + + return new MessageSpec({ + destinationType: `Topic/Consume`, + destinationName: data?.topic, + headers: data?.message?.headers + }) +} diff --git a/lib/instrumentation/kafkajs/index.js b/lib/instrumentation/kafkajs/index.js index c446cb37cc..a738e1f603 100644 --- a/lib/instrumentation/kafkajs/index.js +++ b/lib/instrumentation/kafkajs/index.js @@ -6,8 +6,8 @@ 'use strict' const instrumentProducer = require('./producer') +const instrumentConsumer = require('./consumer') -// eslint-disable-next-line no-unused-vars module.exports = function initialize(agent, kafkajs, _moduleName, shim) { if (agent.config.feature_flag.kafkajs_instrumentation === false) { shim.logger.debug( @@ -17,6 +17,6 @@ module.exports = function initialize(agent, kafkajs, _moduleName, shim) { } shim.setLibrary(shim.KAFKA) - + instrumentConsumer({ shim, kafkajs }) instrumentProducer({ shim, kafkajs }) } diff --git a/lib/instrumentation/kafkajs/producer.js b/lib/instrumentation/kafkajs/producer.js index 32a75b2b0e..7b648bd0c6 100644 --- a/lib/instrumentation/kafkajs/producer.js +++ b/lib/instrumentation/kafkajs/producer.js @@ -30,6 +30,7 @@ module.exports = function instrumentProducer({ shim, kafkajs }) { } return new MessageSpec({ + promise: true, destinationName: data.topic, destinationType: shim.TOPIC, headers: firstMessage.headers @@ -45,6 +46,7 @@ module.exports = function instrumentProducer({ shim, kafkajs }) { } return new MessageSpec({ + promise: true, destinationName: data.topicMessages[0].topic, destinationType: shim.TOPIC, headers: firstMessage.headers diff --git a/lib/shim/message-shim/subscribe-consume.js b/lib/shim/message-shim/subscribe-consume.js index 228c601f02..1014072568 100644 --- a/lib/shim/message-shim/subscribe-consume.js +++ b/lib/shim/message-shim/subscribe-consume.js @@ -56,13 +56,22 @@ function createSubscriberWrapper({ shim, fn, spec, destNameIsArg }) { } } - if (consumerIdx !== null) { + if (consumerIdx !== null && !spec.functions) { args[consumerIdx] = shim.wrap( args[consumerIdx], makeWrapConsumer({ spec, queue, destinationName, destNameIsArg }) ) } + if (consumerIdx !== null && spec.functions) { + spec.functions.forEach((fn) => { + args[consumerIdx][fn] = shim.wrap( + args[consumerIdx][fn], + makeWrapConsumer({ spec, queue, destinationName, destNameIsArg }) + ) + }) + } + return fn.apply(this, args) } } diff --git a/lib/shim/specs/message-subscribe.js b/lib/shim/specs/message-subscribe.js index f3da187d85..466e3e2d4a 100644 --- a/lib/shim/specs/message-subscribe.js +++ b/lib/shim/specs/message-subscribe.js @@ -27,6 +27,13 @@ class MessageSubscribeSpec extends MessageSpec { */ consumer + /** + * Indicates names of functions to be wrapped for message consumption. + * This must be used in tandem with consumer. + * @type {Array|null} + */ + functions + /* eslint-disable jsdoc/require-param-description */ /** * @param {MessageSubscribeSpecParams} params @@ -35,6 +42,7 @@ class MessageSubscribeSpec extends MessageSpec { super(params) this.consumer = params.consumer ?? null + this.functions = params.functions ?? null } } diff --git a/lib/symbols.js b/lib/symbols.js index fdac740724..f5cd6fb734 100644 --- a/lib/symbols.js +++ b/lib/symbols.js @@ -11,6 +11,7 @@ module.exports = { databaseName: Symbol('databaseName'), disableDT: Symbol('Disable distributed tracing'), // description for backwards compatibility executorContext: Symbol('executorContext'), + kafkaCtx: Symbol('kafkaCtx'), koaBody: Symbol('body'), koaBodySet: Symbol('bodySet'), koaRouter: Symbol('koaRouter'), diff --git a/lib/transaction/tracecontext.js b/lib/transaction/tracecontext.js index afe0750a87..033f7b66c1 100644 --- a/lib/transaction/tracecontext.js +++ b/lib/transaction/tracecontext.js @@ -313,6 +313,9 @@ class TraceContext { return traceParentInfo } + if (Buffer.isBuffer(traceparent)) { + traceparent = traceparent.toString() + } const trimmed = traceparent.trim() const parts = trimmed.split('-') @@ -445,10 +448,14 @@ class TraceContext { } _parseTraceState(params) { - const { tracestate, hasTrustKey, expectedNrKey } = params + const { hasTrustKey, expectedNrKey } = params + let { tracestate } = params let nrTraceStateValue = null const finalListMembers = [] const vendors = [] + if (Buffer.isBuffer(tracestate)) { + tracestate = tracestate.toString() + } const incomingListMembers = tracestate.split(',') for (let i = 0; i < incomingListMembers.length; i++) { const listMember = incomingListMembers[i].trim() diff --git a/test/unit/distributed_tracing/tracecontext.test.js b/test/unit/distributed_tracing/tracecontext.test.js index 48c10768dd..e229d1b29c 100644 --- a/test/unit/distributed_tracing/tracecontext.test.js +++ b/test/unit/distributed_tracing/tracecontext.test.js @@ -255,6 +255,14 @@ tap.test('TraceContext', function (t) { t.equal(traceContext._validateAndParseTraceParentHeader(shorterStr).entryValid, false) t.end() }) + + t.test('should handle if traceparent is a buffer', (t) => { + const { traceContext } = t.context + const traceparent = '00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-00' + const bufferTraceParent = Buffer.from(traceparent, 'utf8') + t.ok(traceContext._validateAndParseTraceParentHeader(bufferTraceParent).entryValid) + t.end() + }) }) t.test('_validateAndParseTraceStateHeader', (t) => { @@ -283,6 +291,29 @@ tap.test('TraceContext', function (t) { t.end() }) + t.test('should pass a valid tracestate header if a buffer', (t) => { + const { agent, traceContext } = t.context + agent.config.trusted_account_key = '190' + const goodTraceStateHeader = + /* eslint-disable-next-line max-len */ + '190@nr=0-0-709288-8599547-f85f42fd82a4cf1d-164d3b4b0d09cb05-1-0.789-1563574856827,234234@foo=bar' + const bufferTraceState = Buffer.from(goodTraceStateHeader, 'utf8') + const valid = traceContext._validateAndParseTraceStateHeader(bufferTraceState) + t.ok(valid) + t.equal(valid.entryFound, true) + t.equal(valid.entryValid, true) + t.equal(valid.intrinsics.version, 0) + t.equal(valid.intrinsics.parentType, 'App') + t.equal(valid.intrinsics.accountId, '709288') + t.equal(valid.intrinsics.appId, '8599547') + t.equal(valid.intrinsics.spanId, 'f85f42fd82a4cf1d') + t.equal(valid.intrinsics.transactionId, '164d3b4b0d09cb05') + t.equal(valid.intrinsics.sampled, true) + t.equal(valid.intrinsics.priority, 0.789) + t.equal(valid.intrinsics.timestamp, 1563574856827) + t.end() + }) + t.test('should fail mismatched trusted account ID in tracestate header', (t) => { const { agent, traceContext } = t.context agent.config.trusted_account_key = '666' diff --git a/test/unit/shim/message-shim.test.js b/test/unit/shim/message-shim.test.js index 780c6608da..39c5499b4a 100644 --- a/test/unit/shim/message-shim.test.js +++ b/test/unit/shim/message-shim.test.js @@ -1170,5 +1170,33 @@ tap.test('MessageShim', function (t) { t.ok(parent) }) }) + + t.test('should wrap object key of consumer', function (t) { + t.plan(3) + const message = { foo: 'bar' } + const subscriber = function subscriber(consumer) { + consumer.eachMessage(message) + } + const wrapped = shim.recordSubscribedConsume(subscriber, { + name: 'Channel#subscribe', + consumer: shim.FIRST, + functions: ['eachMessage'], + messageHandler: function (shim, args) { + t.same(args[0], message) + return { + destinationName: 'exchange.foo', + destinationType: shim.EXCHANGE + } + } + }) + wrapped({ + eachMessage: function consumer(msg) { + const segment = shim.getSegment() + t.equal(segment.name, 'OtherTransaction/Message/RabbitMQ/Exchange/Named/exchange.foo') + t.equal(msg, message) + t.end() + } + }) + }) }) }) diff --git a/test/versioned/kafkajs/kafka.tap.js b/test/versioned/kafkajs/kafka.tap.js index 36284b4acf..76c631fd5a 100644 --- a/test/versioned/kafkajs/kafka.tap.js +++ b/test/versioned/kafkajs/kafka.tap.js @@ -10,6 +10,7 @@ const helper = require('../../lib/agent_helper') const params = require('../../lib/params') const { removeModules } = require('../../lib/cache-buster') const utils = require('./utils') +const SEGMENT_PREFIX = 'kafkajs.Kafka.consumer#' const broker = `${params.kafka_host}:${params.kafka_port}` @@ -22,11 +23,13 @@ tap.beforeEach(async (t) => { const { Kafka, logLevel } = require('kafkajs') t.context.Kafka = Kafka - const topic = utils.randomTopic() + const topic = utils.randomString() t.context.topic = topic + const clientId = utils.randomString('kafka-test') + t.context.clientId = clientId const kafka = new Kafka({ - clientId: 'kafka-test', + clientId, brokers: [broker], logLevel: logLevel.NOTHING }) @@ -52,21 +55,29 @@ tap.test('send records correctly', (t) => { const { agent, consumer, producer, topic } = t.context const message = 'test message' + const expectedName = 'produce-tx' + let txCount = 0 agent.on('transactionFinished', (tx) => { - const name = `MessageBroker/Kafka/Topic/Produce/Named/${topic}` - const segment = tx.agent.tracer.getSegment() + txCount++ + if (tx.name === expectedName) { + const name = `MessageBroker/Kafka/Topic/Produce/Named/${topic}` + const segment = tx.agent.tracer.getSegment() - const foundSegment = segment.children.find((s) => s.name.endsWith(topic)) - t.equal(foundSegment.name, name) + const foundSegment = segment.children.find((s) => s.name.endsWith(topic)) + t.equal(foundSegment.name, name) - const metric = tx.metrics.getMetric(name) - t.equal(metric.callCount, 1) + const metric = tx.metrics.getMetric(name) + t.equal(metric.callCount, 1) + } - t.end() + if (txCount === 2) { + t.end() + } }) helper.runInTransaction(agent, async (tx) => { + tx.name = expectedName await consumer.subscribe({ topic, fromBeginning: true }) const promise = new Promise((resolve) => { consumer.run({ @@ -98,52 +109,43 @@ tap.test('send records correctly', (t) => { }) tap.test('send passes along DT headers', (t) => { - // The intent of this test is to verify the scenario: - // - // 1. A service receives a request - // 2. The service builds a payload for Kafka - // 3. The produced Kafka data includes the distributed trace data that was - // provided to the service handling the request. - - t.plan(5) - - const now = Date.now - Date.now = () => 1717426365982 - t.teardown(() => { - Date.now = now - }) + const expectedName = 'produce-tx' const { agent, consumer, producer, topic } = t.context - const messages = ['one', 'two', 'three'] // These agent.config lines are utilized to simulate the inbound // distributed trace that we are trying to validate. agent.config.account_id = 'account_1' agent.config.primary_application_id = 'app_1' agent.config.trusted_account_key = 42 + let produceTx = null + let consumeTx = null + let txCount = 0 agent.on('transactionFinished', (tx) => { - t.equal(tx.isDistributedTrace, true) + txCount++ - const headers = {} - tx.traceContext.addTraceContextHeaders(headers) - t.equal(headers.tracestate.startsWith('42@nr=0-0-account_1-app_1-'), true) + if (tx.name === expectedName) { + produceTx = tx + } else { + consumeTx = tx + } - t.end() + if (txCount === 2) { + utils.verifyDistributedTrace({ t, consumeTx, produceTx }) + t.end() + } }) helper.runInTransaction(agent, async (tx) => { + tx.name = expectedName await consumer.subscribe({ topic, fromBeginning: true }) - let msgCount = 0 const promise = new Promise((resolve) => { consumer.run({ eachMessage: async ({ message: actualMessage }) => { - t.equal(messages.includes(actualMessage.value.toString()), true) - msgCount += 1 - if (msgCount === 3) { - resolve() - } + t.equal(actualMessage.value.toString(), 'one') + resolve() } }) }) @@ -152,9 +154,7 @@ tap.test('send passes along DT headers', (t) => { await producer.send({ acks: 1, topic, - messages: messages.map((m) => { - return { key: 'key', value: m } - }) + messages: [{ key: 'key', value: 'one' }] }) await promise @@ -168,23 +168,27 @@ tap.test('sendBatch records correctly', (t) => { const { agent, consumer, producer, topic } = t.context const message = 'test message' + const expectedName = 'produce-tx' agent.on('transactionFinished', (tx) => { - const name = `MessageBroker/Kafka/Topic/Produce/Named/${topic}` - const segment = tx.agent.tracer.getSegment() + if (tx.name === expectedName) { + const name = `MessageBroker/Kafka/Topic/Produce/Named/${topic}` + const segment = tx.agent.tracer.getSegment() - const foundSegment = segment.children.find((s) => s.name.endsWith(topic)) - t.equal(foundSegment.name, name) + const foundSegment = segment.children.find((s) => s.name.endsWith(topic)) + t.equal(foundSegment.name, name) - const metric = tx.metrics.getMetric(name) - t.equal(metric.callCount, 1) + const metric = tx.metrics.getMetric(name) + t.equal(metric.callCount, 1) - t.equal(tx.isDistributedTrace, true) + t.equal(tx.isDistributedTrace, true) - t.end() + t.end() + } }) helper.runInTransaction(agent, async (tx) => { + tx.name = expectedName await consumer.subscribe({ topic, fromBeginning: true }) const promise = new Promise((resolve) => { consumer.run({ @@ -216,3 +220,85 @@ tap.test('sendBatch records correctly', (t) => { tx.end() }) }) + +tap.test('consume outside of a transaction', async (t) => { + const { agent, consumer, producer, topic, clientId } = t.context + const message = 'test message' + + const txPromise = new Promise((resolve) => { + agent.on('transactionFinished', (tx) => { + utils.verifyConsumeTransaction({ t, tx, topic, clientId }) + resolve() + }) + }) + + await consumer.subscribe({ topics: [topic], fromBeginning: true }) + const testPromise = new Promise((resolve) => { + consumer.run({ + eachMessage: async ({ message: actualMessage }) => { + t.equal(actualMessage.value.toString(), message) + resolve() + } + }) + }) + await utils.waitForConsumersToJoinGroup({ consumer }) + await producer.send({ + acks: 1, + topic, + messages: [{ key: 'key', value: message }] + }) + + return Promise.all([txPromise, testPromise]) +}) + +tap.test('consume inside of a transaction', async (t) => { + const { agent, consumer, producer, topic, clientId } = t.context + const expectedName = 'testing-tx-consume' + + const messages = ['one', 'two', 'three'] + let txCount = 0 + let msgCount = 0 + + const txPromise = new Promise((resolve) => { + agent.on('transactionFinished', (tx) => { + txCount++ + if (tx.name === expectedName) { + t.assertSegments(tx.trace.root, [`${SEGMENT_PREFIX}subscribe`, `${SEGMENT_PREFIX}run`], { + exact: false + }) + } else { + utils.verifyConsumeTransaction({ t, tx, topic, clientId }) + } + + if (txCount === messages.length + 1) { + resolve() + } + }) + }) + + await helper.runInTransaction(agent, async (tx) => { + tx.name = expectedName + await consumer.subscribe({ topics: [topic], fromBeginning: true }) + const testPromise = new Promise((resolve) => { + consumer.run({ + eachMessage: async ({ message: actualMessage }) => { + msgCount++ + t.ok(messages.includes(actualMessage.value.toString())) + if (msgCount === messages.length) { + resolve() + } + } + }) + }) + await utils.waitForConsumersToJoinGroup({ consumer }) + const messagePayload = messages.map((m, i) => ({ key: `key-${i}`, value: m })) + await producer.send({ + acks: 1, + topic, + messages: messagePayload + }) + + tx.end() + return Promise.all([txPromise, testPromise]) + }) +}) diff --git a/test/versioned/kafkajs/utils.js b/test/versioned/kafkajs/utils.js index f311232154..b97ee32b0e 100644 --- a/test/versioned/kafkajs/utils.js +++ b/test/versioned/kafkajs/utils.js @@ -6,13 +6,15 @@ 'use strict' const { makeId } = require('../../../lib/util/hashes') const utils = module.exports +const metrics = require('../../lib/metrics_helper') +const { DESTINATIONS } = require('../../../lib/config/attribute-filter') /** - * Creates a random topic to be used for testing - * @param {string} [prefix=test-topic] topic prefix - * @returns {string} topic name with random id appended + * Creates a random string with prefix to be used for testing + * @param {string} [prefix=test-topic] prefix for random string + * @returns {string} prefix with random id appended */ -utils.randomTopic = (prefix = 'test-topic') => { +utils.randomString = (prefix = 'test-topic') => { return `${prefix}-${makeId()}` } @@ -62,3 +64,58 @@ utils.waitForConsumersToJoinGroup = ({ consumer, maxWait = 10000 }) => }) }) }) + +/** + * Verifies the metrics of the consume transaction. Also verifies the tx name of consme transaction + * and the relevant tx attributes + * + * @param {object} params function params + * @param {object} params.t test instance + * @param {object} params.tx consumer transaction + * @param {string} params.topic topic name + * @params {string} params.clientId client id + */ +utils.verifyConsumeTransaction = ({ t, tx, topic, clientId }) => { + const expectedName = `OtherTransaction/Message/Kafka/Topic/Consume/Named/${topic}` + t.assertMetrics( + tx.metrics, + [ + [{ name: expectedName }], + [{ name: `Message/Kafka/Topic/Named/${topic}/Received/Bytes` }], + [{ name: `Message/Kafka/Topic/Named/${topic}/Received/Messages` }], + [{ name: 'OtherTransaction/Message/all' }], + [{ name: 'OtherTransaction/all' }], + [{ name: 'OtherTransactionTotalTime' }] + ], + false, + false + ) + + t.equal(tx.getFullName(), expectedName) + const consume = metrics.findSegment(tx.trace.root, expectedName) + t.equal(consume, tx.baseSegment) + + const attributes = tx.trace.attributes.get(DESTINATIONS.TRANS_SCOPE) + t.ok(attributes['kafka.consume.byteCount'], 'should have byteCount') + t.equal(attributes['kafka.consume.client_id'], clientId, 'should have client_id') +} + +/** + * Asserts the properties on both the produce and consume transactions + * @param {object} params function params + * @param {object} params.t test instance + * @param {object} params.consumeTx consumer transaction + * @param {object} params.produceTx produce transaction + */ +utils.verifyDistributedTrace = ({ t, consumeTx, produceTx }) => { + t.ok(produceTx.isDistributedTrace, 'should mark producer as distributed') + t.ok(consumeTx.isDistributedTrace, 'should mark consumer as distributed') + + t.equal(consumeTx.incomingCatId, null, 'should not set old CAT properties') + + t.equal(produceTx.id, consumeTx.parentId, 'should have proper parent id') + t.equal(produceTx.traceId, consumeTx.traceId, 'should have proper trace id') + const produceSegment = produceTx.trace.root.children[3] + t.equal(produceSegment.id, consumeTx.parentSpanId, 'should have proper parentSpanId') + t.equal(consumeTx.parentTransportType, 'Kafka', 'should have correct transport type') +} From 6f3c90217e0324ed4165e28a0722f1c5653701b4 Mon Sep 17 00:00:00 2001 From: Bob Evans Date: Tue, 4 Jun 2024 15:27:24 -0400 Subject: [PATCH 2/2] chore: feedback from PR review --- lib/instrumentation/kafkajs/consumer.js | 16 +++++++++------- lib/shim/specs/message-subscribe.js | 19 ++++++++++++++++++- package.json | 2 +- test/unit/shim/shim.test.js | 14 ++++++++++++++ 4 files changed, 42 insertions(+), 9 deletions(-) diff --git a/lib/instrumentation/kafkajs/consumer.js b/lib/instrumentation/kafkajs/consumer.js index 516c491b5a..0ff9a293fa 100644 --- a/lib/instrumentation/kafkajs/consumer.js +++ b/lib/instrumentation/kafkajs/consumer.js @@ -61,7 +61,7 @@ module.exports = function instrumentConsumer({ shim, kafkajs }) { * Lastly, adds tx attributes for byteCount and clientId * * @param {object} consumer the instance of kafka consumer - * @param {MessageShim} shim intance of shim + * @param {MessageShim} shim instance of shim * @param {Array} args arguments passed to the `eachMessage` function of the `consumer.run` * @returns {MessageSpec} spec for message handling */ @@ -74,14 +74,16 @@ function handler(consumer, shim, args) { const tx = segment.transaction const byteLength = data?.message.value?.byteLength const metricPrefix = `Message/Kafka/Topic/Named/${topic}/Received` - tx.metrics.measureBytes(`${metricPrefix}/Bytes`, byteLength) // This will always be 1 tx.metrics.getOrCreateMetric(`${metricPrefix}/Messages`).recordValue(1) - tx.trace.attributes.addAttribute( - DESTINATIONS.TRANS_SCOPE, - 'kafka.consume.byteCount', - byteLength - ) + if (byteLength) { + tx.metrics.measureBytes(`${metricPrefix}/Bytes`, byteLength) + tx.trace.attributes.addAttribute( + DESTINATIONS.TRANS_SCOPE, + 'kafka.consume.byteCount', + byteLength + ) + } if (consumer?.[kafkaCtx]) { tx.trace.attributes.addAttribute( DESTINATIONS.TRANS_EVENT, diff --git a/lib/shim/specs/message-subscribe.js b/lib/shim/specs/message-subscribe.js index 466e3e2d4a..433ce377be 100644 --- a/lib/shim/specs/message-subscribe.js +++ b/lib/shim/specs/message-subscribe.js @@ -31,6 +31,23 @@ class MessageSubscribeSpec extends MessageSpec { * Indicates names of functions to be wrapped for message consumption. * This must be used in tandem with consumer. * @type {Array|null} + * @example + * // Wrap the eachMessage method on a consumer + * class Consumer() { + * constructor() {} + * async run(consumer) { + * consumer.eachMessage({ message }) + * } + * } + * + * const spec = new MessageSubscribeSpec({ + * name: 'Consumer#run' + * promise: true + * consumer: shim.FIRST, + * functions: ['eachMessage'] + * }) + * + * shim.recordSubscribedConsume(Consumer.prototype, 'run', spec) */ functions @@ -42,7 +59,7 @@ class MessageSubscribeSpec extends MessageSpec { super(params) this.consumer = params.consumer ?? null - this.functions = params.functions ?? null + this.functions = Array.isArray(params.functions) ? params.functions : null } } diff --git a/package.json b/package.json index cf7ef1ee01..eecc4f9862 100644 --- a/package.json +++ b/package.json @@ -155,7 +155,7 @@ "scripts": { "bench": "node ./bin/run-bench.js", "docker-env": "./bin/docker-env-vars.sh", - "docs": "npm ci && jsdoc -c ./jsdoc-conf.json --private -r .", + "docs": "rm -rf ./out && jsdoc -c ./jsdoc-conf.jsonc --private -r .", "integration": "npm run prepare-test && npm run sub-install && time c8 -o ./coverage/integration tap --test-regex='(\\/|^test\\/integration\\/.*\\.tap\\.js)$' --timeout=600 --no-coverage --reporter classic", "integration:esm": "time c8 -o ./coverage/integration-esm tap --node-arg='--loader=./esm-loader.mjs' --test-regex='(test\\/integration\\/.*\\.tap\\.mjs)$' --timeout=600 --no-coverage --reporter classic", "prepare-test": "npm run ssl && npm run docker-env", diff --git a/test/unit/shim/shim.test.js b/test/unit/shim/shim.test.js index d5e215d564..7a773bcb6c 100644 --- a/test/unit/shim/shim.test.js +++ b/test/unit/shim/shim.test.js @@ -3149,4 +3149,18 @@ tap.test('Shim', function (t) { t.ok(shim.specs.params.QueueMessageParameters) t.end() }) + + t.test('should not use functions in MessageSubscribeSpec if it is not an array', (t) => { + const agent = helper.loadMockedAgent() + t.teardown(() => { + helper.unloadAgent(agent) + }) + + const shim = new Shim(agent, 'test-mod') + const spec = new shim.specs.MessageSubscribeSpec({ + functions: 'foo-bar' + }) + t.notOk(spec.functions) + t.end() + }) })