Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Added kafkajs instrumentation and versioned tests skeleton #2224

Merged
merged 10 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
version: "3"
services:

elasticsearch:
container_name: nr_node_elastic
image: docker.elastic.co/elasticsearch/elasticsearch:8.8.0
Expand All @@ -21,11 +22,45 @@ services:
interval: 30s
timeout: 10s
retries: 5

# Kafka setup based on the e2e tests in node-rdkafka. Needs both the
# `zookeeper` and `kafka` services.
zookeeper:
container_name: nr_node_kafka_zookeeper
image: confluentinc/cp-zookeeper
ports:
- '2181:2181'
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
container_name: nr_node_kafka
image: confluentinc/cp-kafka
links:
- zookeeper
ports:
- '9092:9092'
healthcheck:
test: /usr/bin/kafka-cluster cluster-id --bootstrap-server localhost:9092 || exit 1
interval: 1s
timeout: 60s
retries: 60
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1

memcached:
container_name: nr_node_memcached
image: memcached
ports:
- "11211:11211"

mongodb_3:
container_name: nr_node_mongodb
platform: linux/amd64
Expand All @@ -37,6 +72,7 @@ services:
interval: 1s
timeout: 10s
retries: 30

mongodb_5:
container_name: nr_node_mongodb_5
image: library/mongo:5
Expand All @@ -47,6 +83,7 @@ services:
interval: 1s
timeout: 10s
retries: 30

mysql:
container_name: nr_node_mysql
platform: linux/amd64
Expand All @@ -60,6 +97,7 @@ services:
interval: 1s
timeout: 10s
retries: 30

redis:
container_name: nr_node_redis
image: redis
Expand All @@ -70,6 +108,7 @@ services:
interval: 1s
timeout: 10s
retries: 30

cassandra:
container_name: nr_node_cassandra
platform: linux/amd64
Expand All @@ -80,13 +119,15 @@ services:
test: [ "CMD", "cqlsh", "-u cassandra", "-p cassandra"]
interval: 5s
timeout: 10s
retries: 6
retries: 6

# pg 9.2 has built in healthcheck
pg:
container_name: nr_node_postgres
image: postgres:9.2
ports:
- "5432:5432"

pg_prisma:
container_name: nr_node_postgres_prisma
image: postgres:15
Expand All @@ -100,6 +141,7 @@ services:
interval: 1s
timeout: 10s
retries: 30

rmq:
container_name: nr_node_rabbit
image: rabbitmq:3
Expand Down
11 changes: 11 additions & 0 deletions lib/instrumentation/kafkajs.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright 2024 New Relic Corporation. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/

'use strict'

// eslint-disable-next-line no-unused-vars
module.exports = function initialize(_agent, kafkajs, _moduleName, shim) {
// Put instrumentation code here for kafkajs.Kafka.producer and kafkajs.Kafka.consumer
}
1 change: 1 addition & 0 deletions lib/instrumentations.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ module.exports = function instrumentations() {
'fastify': { type: InstrumentationDescriptor.TYPE_WEB_FRAMEWORK },
'generic-pool': { type: InstrumentationDescriptor.TYPE_GENERIC },
'ioredis': { type: InstrumentationDescriptor.TYPE_DATASTORE },
'kafkajs': { type: InstrumentationDescriptor.TYPE_MESSAGE },
'koa': { module: './instrumentation/koa' },
'langchain': { module: './instrumentation/langchain' },
'memcached': { type: InstrumentationDescriptor.TYPE_DATASTORE },
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@
"public-docs": "jsdoc -c ./jsdoc-conf.jsonc && cp examples/shim/*.png out/",
"publish-docs": "./bin/publish-docs.sh",
"services": "docker compose up -d --wait",
"services:stop": "docker compose down",
"smoke": "npm run ssl && time tap test/smoke/**/**/*.tap.js --timeout=180 --no-coverage",
"ssl": "./bin/ssl.sh",
"sub-install": "node test/bin/install_sub_deps",
Expand Down
5 changes: 0 additions & 5 deletions test/lib/cache-buster.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@

'use strict'

/**
* Utility method to remove a set of modules from the require cache.
*
* @param {string[]} modules The set of module names to remove from the cache.
*/
module.exports = {
/**
* Removes explicitly named modules from the require cache.
Expand Down
3 changes: 3 additions & 0 deletions test/lib/params.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
'use strict'

module.exports = {
kafka_host: process.env.NR_NODE_TEST_KAFKA_HOST || '127.0.0.1',
kafka_port: process.env.NR_NODE_TEST_KAFKA_PORT || 9092,

memcached_host: process.env.NR_NODE_TEST_MEMCACHED_HOST || 'localhost',
memcached_port: process.env.NR_NODE_TEST_MEMCACHED_PORT || 11211,

Expand Down
66 changes: 66 additions & 0 deletions test/versioned/kafkajs/kafka.tap.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2024 New Relic Corporation. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/

'use strict'

const tap = require('tap')
const helper = require('../../lib/agent_helper')
const params = require('../../lib/params')
const { removeModules } = require('../../lib/cache-buster')
const utils = require('./utils')

const broker = `${params.kafka_host}:${params.kafka_port}`

tap.beforeEach(async (t) => {
t.context.agent = helper.instrumentMockedAgent()

const { Kafka, logLevel } = require('kafkajs')
t.context.Kafka = Kafka
const topic = utils.randomTopic()
t.context.topic = topic

const kafka = new Kafka({
clientId: 'kafka-test',
brokers: [broker],
logLevel: logLevel.NOTHING
})
await utils.createTopic({ topic, kafka })

const producer = kafka.producer()
await producer.connect()
t.context.producer = producer
const consumer = kafka.consumer({ groupId: 'kafka' })
await consumer.connect()
t.context.consumer = consumer
})

tap.afterEach(async (t) => {
helper.unloadAgent(t.context.agent)
removeModules(['kafkajs'])
await t.context.consumer.disconnect()
await t.context.producer.disconnect()
})

tap.test('stub', async (t) => {
const { consumer, producer, topic } = t.context
const message = 'test message'

await consumer.subscribe({ topics: [topic], fromBeginning: true })
const testPromise = new Promise((resolve) => {
consumer.run({
eachMessage: async ({ message: actualMessage }) => {
t.equal(actualMessage.value.toString(), message)
resolve()
}
})
})
await utils.waitForConsumersToJoinGroup({ consumer })
await producer.send({
acks: 1,
topic,
messages: [{ key: 'key', value: message }]
})
await testPromise
})
28 changes: 28 additions & 0 deletions test/versioned/kafkajs/newrelic.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2021 New Relic Corporation. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/

'use strict'

exports.config = {
app_name: ['My Application'],
license_key: 'license key here',
logging: {
level: 'trace',
filepath: '../../newrelic_agent.log'
},
utilization: {
detect_aws: false,
detect_pcf: false,
detect_azure: false,
detect_gcp: false,
detect_docker: false
},
distributed_tracing: {
enabled: true
},
transaction_tracer: {
enabled: true
}
}
19 changes: 19 additions & 0 deletions test/versioned/kafkajs/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"name": "kafka-tests",
"targets": [{"name":"kafkajs","minAgentVersion":"11.19.0"}],
"version": "0.0.0",
"private": true,
"tests": [
{
"engines": {
"node": ">=16"
},
"dependencies": {
"kafkajs": ">=2.0.0"
},
"files": [
"kafka.tap.js"
]
}
]
}
64 changes: 64 additions & 0 deletions test/versioned/kafkajs/utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2024 New Relic Corporation. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/

'use strict'
const { makeId } = require('../../../lib/util/hashes')
const utils = module.exports

/**
* Creates a random topic to be used for testing
* @param {string} [prefix=test-topic] topic prefix
* @returns {string} topic name with random id appended
*/
utils.randomTopic = (prefix = 'test-topic') => {
return `${prefix}-${makeId()}`
}

/**
* Creates a topic with the admin class
* @param {object} params to function
* @param {object} params.kafka instance of kafka.Kafka
* @param {string} params.topic topic name
*/
utils.createTopic = async ({ kafka, topic }) => {
const admin = kafka.admin()
try {
await admin.connect()
await admin.createTopics({
waitForLeaders: true,
topics: [{ topic, numPartitions: 1, replicationFactor: 1, configEntries: [] }]
})
} finally {
await admin.disconnect()
}
}

/**
* Waits for consumer to join the group
*
* @param {object} params to function
* @param {object} params.consumer instance of kafkajs.Kafka.consumer
* @param {number} [params.maxWait=10000] how long to wait for consumer to join group
* @returns {Promise}
*
*/
utils.waitForConsumersToJoinGroup = ({ consumer, maxWait = 10000 }) =>
new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => {
consumer.disconnect().then(() => {
reject()
})
}, maxWait)
consumer.on(consumer.events.GROUP_JOIN, (event) => {
clearTimeout(timeoutId)
resolve(event)
})
consumer.on(consumer.events.CRASH, (event) => {
clearTimeout(timeoutId)
consumer.disconnect().then(() => {
reject(event.payload.error)
})
})
})
Loading