From 35776af47b17098cfed400a6fd4b6c1ea4bb0853 Mon Sep 17 00:00:00 2001 From: James Sumners Date: Wed, 29 May 2024 09:57:35 -0400 Subject: [PATCH 01/10] chore: Added node-rdkafka versioned tests skeleton --- docker-compose.yml | 39 +++++++++++++- package.json | 1 + test/lib/cache-buster.js | 5 -- test/versioned/node-rdkafka/kafka.tap.js | 67 ++++++++++++++++++++++++ test/versioned/node-rdkafka/newrelic.js | 28 ++++++++++ test/versioned/node-rdkafka/package.json | 19 +++++++ 6 files changed, 153 insertions(+), 6 deletions(-) create mode 100644 test/versioned/node-rdkafka/kafka.tap.js create mode 100644 test/versioned/node-rdkafka/newrelic.js create mode 100644 test/versioned/node-rdkafka/package.json diff --git a/docker-compose.yml b/docker-compose.yml index ac6fb6a20a..a988cd1d6a 100755 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,6 @@ version: "3" services: + elasticsearch: container_name: nr_node_elastic image: docker.elastic.co/elasticsearch/elasticsearch:8.8.0 @@ -21,11 +22,40 @@ services: interval: 30s timeout: 10s retries: 5 + + kafka: + # https://github.com/apache/kafka/tree/8d11d95/docker/examples#single-node + container_name: nr_node_kafka + image: apache/kafka + ports: + - '9092:9092' + memcached: container_name: nr_node_memcached image: memcached ports: - "11211:11211" + healthcheck: + test: /opt/kafka/bin/kafka-cluster.sh cluster-id --bootstrap-server 127.0.0.1:9092 || exit 1 + interval: 1s + timeout: 60s + retries: 60 + environment: + KAFKA_NODE_ID: 1 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' + KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT_HOST://localhost:9092,PLAINTEXT://broker:19092' + KAFKA_PROCESS_ROLES: 'broker,controller' + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093' + KAFKA_LISTENERS: 'CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092' + KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' + KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' + CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw' + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' + mongodb_3: container_name: nr_node_mongodb platform: linux/amd64 @@ -37,6 +67,7 @@ services: interval: 1s timeout: 10s retries: 30 + mongodb_5: container_name: nr_node_mongodb_5 image: library/mongo:5 @@ -47,6 +78,7 @@ services: interval: 1s timeout: 10s retries: 30 + mysql: container_name: nr_node_mysql platform: linux/amd64 @@ -60,6 +92,7 @@ services: interval: 1s timeout: 10s retries: 30 + redis: container_name: nr_node_redis image: redis @@ -70,6 +103,7 @@ services: interval: 1s timeout: 10s retries: 30 + cassandra: container_name: nr_node_cassandra platform: linux/amd64 @@ -80,13 +114,15 @@ services: test: [ "CMD", "cqlsh", "-u cassandra", "-p cassandra"] interval: 5s timeout: 10s - retries: 6 + retries: 6 + # pg 9.2 has built in healthcheck pg: container_name: nr_node_postgres image: postgres:9.2 ports: - "5432:5432" + pg_prisma: container_name: nr_node_postgres_prisma image: postgres:15 @@ -100,6 +136,7 @@ services: interval: 1s timeout: 10s retries: 30 + rmq: container_name: nr_node_rabbit image: rabbitmq:3 diff --git a/package.json b/package.json index 0923d8e5fc..f229ac7134 100644 --- a/package.json +++ b/package.json @@ -164,6 +164,7 @@ "public-docs": "jsdoc -c ./jsdoc-conf.jsonc && cp examples/shim/*.png out/", "publish-docs": "./bin/publish-docs.sh", "services": "docker compose up -d --wait", + "services:stop": "docker compose down", "smoke": "npm run ssl && time tap test/smoke/**/**/*.tap.js --timeout=180 --no-coverage", "ssl": "./bin/ssl.sh", "sub-install": "node test/bin/install_sub_deps", diff --git a/test/lib/cache-buster.js b/test/lib/cache-buster.js index 93cbcccde3..57b10a4d83 100644 --- a/test/lib/cache-buster.js +++ b/test/lib/cache-buster.js @@ -5,11 +5,6 @@ 'use strict' -/** - * Utility method to remove a set of modules from the require cache. - * - * @param {string[]} modules The set of module names to remove from the cache. - */ module.exports = { /** * Removes explicitly named modules from the require cache. diff --git a/test/versioned/node-rdkafka/kafka.tap.js b/test/versioned/node-rdkafka/kafka.tap.js new file mode 100644 index 0000000000..faa48d9697 --- /dev/null +++ b/test/versioned/node-rdkafka/kafka.tap.js @@ -0,0 +1,67 @@ +/* + * Copyright 2024 New Relic Corporation. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +'use strict' + +const tap = require('tap') +const helper = require('../../lib/agent_helper') +const { removeModules } = require('../../lib/cache-buster') + +tap.beforeEach(async (t) => { + const Kafka = require('node-rdkafka') + t.context.Kafka = Kafka + t.context.agent = helper.instrumentMockedAgent() + + await new Promise((resolve) => { + const producer = new Kafka.Producer({ + 'metadata.broker.list': '127.0.0.1:9092' + }) + producer.connect() + producer.setPollInterval(10) + producer.on('ready', () => { + t.context.producer = producer + resolve() + }) + }) + + await new Promise((resolve) => { + const consumer = new Kafka.KafkaConsumer({ + 'metadata.broker.list': '127.0.0.1:9092', + 'group.id': 'kafka' + }) + consumer.connect() + consumer.on('ready', () => { + t.context.consumer = consumer + resolve() + }) + }) +}) + +tap.afterEach(async (t) => { + helper.unloadAgent(t.context.agent) + removeModules(['node-rdkafka']) + + await new Promise((resolve) => { + t.context.producer.disconnect(resolve) + }) + await new Promise((resolve) => { + t.context.consumer.disconnect(resolve) + }) +}) + +tap.test('stub', (t) => { + const { consumer, producer } = t.context + + consumer.subscribe(['test-topic']) + consumer.consume() + consumer.on('data', (data) => { + t.equal(data.value.toString(), 'test message') + t.end() + }) + + setTimeout(() => { + producer.produce('test-topic', null, Buffer.from('test message'), null, 0) + }, 2000) +}) diff --git a/test/versioned/node-rdkafka/newrelic.js b/test/versioned/node-rdkafka/newrelic.js new file mode 100644 index 0000000000..af4b696e2f --- /dev/null +++ b/test/versioned/node-rdkafka/newrelic.js @@ -0,0 +1,28 @@ +/* + * Copyright 2021 New Relic Corporation. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +'use strict' + +exports.config = { + app_name: ['My Application'], + license_key: 'license key here', + logging: { + level: 'trace', + filepath: '../../newrelic_agent.log' + }, + utilization: { + detect_aws: false, + detect_pcf: false, + detect_azure: false, + detect_gcp: false, + detect_docker: false + }, + distributed_tracing: { + enabled: true + }, + transaction_tracer: { + enabled: true + } +} diff --git a/test/versioned/node-rdkafka/package.json b/test/versioned/node-rdkafka/package.json new file mode 100644 index 0000000000..8c34eece82 --- /dev/null +++ b/test/versioned/node-rdkafka/package.json @@ -0,0 +1,19 @@ +{ + "name": "kafka-tests", + "targets": [{"name":"node-rdkafka","minAgentVersion":"11.18.0"}], + "version": "0.0.0", + "private": true, + "tests": [ + { + "engines": { + "node": ">=16" + }, + "dependencies": { + "node-rdkafka": ">=3.0.0" + }, + "files": [ + "kafka.tap.js" + ] + } + ] +} From dfb35e0663d18534b6019701fd50b462fb58e36e Mon Sep 17 00:00:00 2001 From: Bob Evans Date: Wed, 29 May 2024 12:32:24 -0400 Subject: [PATCH 02/10] chore: added instrumentation skeleton --- lib/instrumentation/node-rdkafka.js | 11 +++++++++++ lib/instrumentations.js | 1 + test/versioned/node-rdkafka/kafka.tap.js | 2 +- 3 files changed, 13 insertions(+), 1 deletion(-) create mode 100644 lib/instrumentation/node-rdkafka.js diff --git a/lib/instrumentation/node-rdkafka.js b/lib/instrumentation/node-rdkafka.js new file mode 100644 index 0000000000..be96dc91ea --- /dev/null +++ b/lib/instrumentation/node-rdkafka.js @@ -0,0 +1,11 @@ +/* + * Copyright 2024 New Relic Corporation. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +'use strict' + +// eslint-disable-next-line no-unused-vars +module.exports = function initialize(_agent, Kafka, _moduleName, shim) { + // Put instrumentation code here for Kafka.Producer and Kafka.Consumer +} diff --git a/lib/instrumentations.js b/lib/instrumentations.js index 127a542e02..b5ed23bf65 100644 --- a/lib/instrumentations.js +++ b/lib/instrumentations.js @@ -34,6 +34,7 @@ module.exports = function instrumentations() { 'memcached': { type: InstrumentationDescriptor.TYPE_DATASTORE }, 'mongodb': { type: InstrumentationDescriptor.TYPE_DATASTORE }, 'mysql': { module: './instrumentation/mysql' }, + 'node-rdkafka': { type: InstrumentationDescriptor.TYPE_MESSAGE }, 'openai': { type: InstrumentationDescriptor.TYPE_GENERIC }, 'pg': { type: InstrumentationDescriptor.TYPE_DATASTORE }, 'pino': { module: './instrumentation/pino' }, diff --git a/test/versioned/node-rdkafka/kafka.tap.js b/test/versioned/node-rdkafka/kafka.tap.js index faa48d9697..ebf9aea562 100644 --- a/test/versioned/node-rdkafka/kafka.tap.js +++ b/test/versioned/node-rdkafka/kafka.tap.js @@ -10,9 +10,9 @@ const helper = require('../../lib/agent_helper') const { removeModules } = require('../../lib/cache-buster') tap.beforeEach(async (t) => { + t.context.agent = helper.instrumentMockedAgent() const Kafka = require('node-rdkafka') t.context.Kafka = Kafka - t.context.agent = helper.instrumentMockedAgent() await new Promise((resolve) => { const producer = new Kafka.Producer({ From 79e6c77827543bd79a7940418c55add6bde2e9e9 Mon Sep 17 00:00:00 2001 From: James Sumners Date: Wed, 29 May 2024 12:47:56 -0400 Subject: [PATCH 03/10] tweaks --- docker-compose.yml | 45 +++++++++++++----------- test/versioned/node-rdkafka/kafka.tap.js | 12 ++++--- 2 files changed, 33 insertions(+), 24 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index a988cd1d6a..084467cce3 100755 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -23,38 +23,43 @@ services: timeout: 10s retries: 5 + # Kafka setup based on the e2e tests in node-rdkafka. Needs both the + # `zookeeper` and `kafka` services. + zookeeper: + container_name: nr_node_kafka_zookeeper + image: confluentinc/cp-zookeeper + ports: + - '2181:2181' + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 kafka: - # https://github.com/apache/kafka/tree/8d11d95/docker/examples#single-node container_name: nr_node_kafka - image: apache/kafka + image: confluentinc/cp-kafka + links: + - zookeeper ports: - '9092:9092' - - memcached: - container_name: nr_node_memcached - image: memcached - ports: - - "11211:11211" healthcheck: - test: /opt/kafka/bin/kafka-cluster.sh cluster-id --bootstrap-server 127.0.0.1:9092 || exit 1 + test: /usr/bin/kafka-cluster cluster-id --bootstrap-server localhost:9092 || exit 1 interval: 1s timeout: 60s retries: 60 environment: - KAFKA_NODE_ID: 1 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' - KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT_HOST://localhost:9092,PLAINTEXT://broker:19092' - KAFKA_PROCESS_ROLES: 'broker,controller' - KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093' - KAFKA_LISTENERS: 'CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092' - KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' - KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' - CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw' + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092' KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 - KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' + KAFKA_DEFAULT_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + + memcached: + container_name: nr_node_memcached + image: memcached + ports: + - "11211:11211" mongodb_3: container_name: nr_node_mongodb diff --git a/test/versioned/node-rdkafka/kafka.tap.js b/test/versioned/node-rdkafka/kafka.tap.js index ebf9aea562..722fbe2b9c 100644 --- a/test/versioned/node-rdkafka/kafka.tap.js +++ b/test/versioned/node-rdkafka/kafka.tap.js @@ -11,6 +11,7 @@ const { removeModules } = require('../../lib/cache-buster') tap.beforeEach(async (t) => { t.context.agent = helper.instrumentMockedAgent() + const Kafka = require('node-rdkafka') t.context.Kafka = Kafka @@ -51,17 +52,20 @@ tap.afterEach(async (t) => { }) }) -tap.test('stub', (t) => { +tap.test('stub', { timeout: 10_000 }, (t) => { const { consumer, producer } = t.context + const topic = 'test-topic' - consumer.subscribe(['test-topic']) - consumer.consume() consumer.on('data', (data) => { + console.log('consumed') t.equal(data.value.toString(), 'test message') t.end() }) + consumer.subscribe([topic]) + consumer.consume() setTimeout(() => { - producer.produce('test-topic', null, Buffer.from('test message'), null, 0) + console.log('producing') + producer.produce(topic, null, Buffer.from('test message'), 'key') }, 2000) }) From 390227c8ecabe03676c317646838da96edb1e5f1 Mon Sep 17 00:00:00 2001 From: James Sumners Date: Wed, 29 May 2024 12:56:07 -0400 Subject: [PATCH 04/10] working stub --- test/versioned/node-rdkafka/kafka.tap.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/test/versioned/node-rdkafka/kafka.tap.js b/test/versioned/node-rdkafka/kafka.tap.js index 722fbe2b9c..948bda023d 100644 --- a/test/versioned/node-rdkafka/kafka.tap.js +++ b/test/versioned/node-rdkafka/kafka.tap.js @@ -57,7 +57,6 @@ tap.test('stub', { timeout: 10_000 }, (t) => { const topic = 'test-topic' consumer.on('data', (data) => { - console.log('consumed') t.equal(data.value.toString(), 'test message') t.end() }) @@ -65,7 +64,6 @@ tap.test('stub', { timeout: 10_000 }, (t) => { consumer.consume() setTimeout(() => { - console.log('producing') producer.produce(topic, null, Buffer.from('test message'), 'key') - }, 2000) + }, 500) }) From 617276907b299c683d55c3194a328e9a5dd12169 Mon Sep 17 00:00:00 2001 From: James Sumners Date: Wed, 29 May 2024 14:09:39 -0400 Subject: [PATCH 05/10] tweak broker config --- test/lib/params.js | 3 +++ test/versioned/node-rdkafka/kafka.tap.js | 7 +++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/test/lib/params.js b/test/lib/params.js index 2d81b7b5cd..60f82472e4 100644 --- a/test/lib/params.js +++ b/test/lib/params.js @@ -6,6 +6,9 @@ 'use strict' module.exports = { + kafka_host: process.env.NR_NODE_TEST_KAFKA_HOST || '127.0.0.1', + kafka_port: process.env.NR_NODE_TEST_KAFKA_PORT || 9092, + memcached_host: process.env.NR_NODE_TEST_MEMCACHED_HOST || 'localhost', memcached_port: process.env.NR_NODE_TEST_MEMCACHED_PORT || 11211, diff --git a/test/versioned/node-rdkafka/kafka.tap.js b/test/versioned/node-rdkafka/kafka.tap.js index 948bda023d..91cd73acf5 100644 --- a/test/versioned/node-rdkafka/kafka.tap.js +++ b/test/versioned/node-rdkafka/kafka.tap.js @@ -7,8 +7,11 @@ const tap = require('tap') const helper = require('../../lib/agent_helper') +const params = require('../../lib/params') const { removeModules } = require('../../lib/cache-buster') +const broker = `${params.kafka_host}:${params.kafka_port}` + tap.beforeEach(async (t) => { t.context.agent = helper.instrumentMockedAgent() @@ -17,7 +20,7 @@ tap.beforeEach(async (t) => { await new Promise((resolve) => { const producer = new Kafka.Producer({ - 'metadata.broker.list': '127.0.0.1:9092' + 'metadata.broker.list': broker }) producer.connect() producer.setPollInterval(10) @@ -29,7 +32,7 @@ tap.beforeEach(async (t) => { await new Promise((resolve) => { const consumer = new Kafka.KafkaConsumer({ - 'metadata.broker.list': '127.0.0.1:9092', + 'metadata.broker.list': broker, 'group.id': 'kafka' }) consumer.connect() From caa9584114c185b849bfcc9fda4612d6a9cb2611 Mon Sep 17 00:00:00 2001 From: Bob Evans Date: Wed, 29 May 2024 14:38:16 -0400 Subject: [PATCH 06/10] chore: updated instrumentation to be kafkajs, fixed tests --- .../{node-rdkafka.js => kafkajs.js} | 4 +- lib/instrumentations.js | 2 +- test/versioned/kafkajs/kafka.tap.js | 55 ++++++++++++++ .../{node-rdkafka => kafkajs}/newrelic.js | 0 .../{node-rdkafka => kafkajs}/package.json | 4 +- test/versioned/node-rdkafka/kafka.tap.js | 72 ------------------- 6 files changed, 60 insertions(+), 77 deletions(-) rename lib/instrumentation/{node-rdkafka.js => kafkajs.js} (50%) create mode 100644 test/versioned/kafkajs/kafka.tap.js rename test/versioned/{node-rdkafka => kafkajs}/newrelic.js (100%) rename test/versioned/{node-rdkafka => kafkajs}/package.json (69%) delete mode 100644 test/versioned/node-rdkafka/kafka.tap.js diff --git a/lib/instrumentation/node-rdkafka.js b/lib/instrumentation/kafkajs.js similarity index 50% rename from lib/instrumentation/node-rdkafka.js rename to lib/instrumentation/kafkajs.js index be96dc91ea..7aa3861857 100644 --- a/lib/instrumentation/node-rdkafka.js +++ b/lib/instrumentation/kafkajs.js @@ -6,6 +6,6 @@ 'use strict' // eslint-disable-next-line no-unused-vars -module.exports = function initialize(_agent, Kafka, _moduleName, shim) { - // Put instrumentation code here for Kafka.Producer and Kafka.Consumer +module.exports = function initialize(_agent, kafkajs, _moduleName, shim) { + // Put instrumentation code here for kafkajs.Kafka.producer and kafkajs.Kafka.consumer } diff --git a/lib/instrumentations.js b/lib/instrumentations.js index b5ed23bf65..4b0f7f8d07 100644 --- a/lib/instrumentations.js +++ b/lib/instrumentations.js @@ -29,12 +29,12 @@ module.exports = function instrumentations() { 'fastify': { type: InstrumentationDescriptor.TYPE_WEB_FRAMEWORK }, 'generic-pool': { type: InstrumentationDescriptor.TYPE_GENERIC }, 'ioredis': { type: InstrumentationDescriptor.TYPE_DATASTORE }, + 'kafkajs': { type: InstrumentationDescriptor.TYPE_MESSAGE }, 'koa': { module: './instrumentation/koa' }, 'langchain': { module: './instrumentation/langchain' }, 'memcached': { type: InstrumentationDescriptor.TYPE_DATASTORE }, 'mongodb': { type: InstrumentationDescriptor.TYPE_DATASTORE }, 'mysql': { module: './instrumentation/mysql' }, - 'node-rdkafka': { type: InstrumentationDescriptor.TYPE_MESSAGE }, 'openai': { type: InstrumentationDescriptor.TYPE_GENERIC }, 'pg': { type: InstrumentationDescriptor.TYPE_DATASTORE }, 'pino': { module: './instrumentation/pino' }, diff --git a/test/versioned/kafkajs/kafka.tap.js b/test/versioned/kafkajs/kafka.tap.js new file mode 100644 index 0000000000..05cd88f7d6 --- /dev/null +++ b/test/versioned/kafkajs/kafka.tap.js @@ -0,0 +1,55 @@ +/* + * Copyright 2024 New Relic Corporation. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +'use strict' + +const tap = require('tap') +const helper = require('../../lib/agent_helper') +const { removeModules } = require('../../lib/cache-buster') + +tap.beforeEach(async (t) => { + t.context.agent = helper.instrumentMockedAgent() + + const { Kafka } = require('kafkajs') + t.context.Kafka = Kafka + const kafka = new Kafka({ + clientId: 'kafka-test', + brokers: ['127.0.0.1:9092'] + }) + + const producer = kafka.producer() + await producer.connect() + t.context.producer = producer + const consumer = kafka.consumer({ groupId: 'kafka' }) + await consumer.connect() + t.context.consumer = consumer +}) + +tap.afterEach(async (t) => { + helper.unloadAgent(t.context.agent) + removeModules(['kafkajs']) + await t.context.consumer.disconnect() + await t.context.producer.disconnect() +}) + +tap.test('stub', async (t) => { + const { consumer, producer } = t.context + const topic = 'test-topic' + + await consumer.subscribe({ topics: [topic] }) + const testPromise = new Promise((resolve) => { + consumer.run({ + eachMessage: async ({ message }) => { + t.equal(message.value.toString(), 'test message') + resolve() + } + }) + }) + await producer.send({ + topic, + messages: [{ key: 'key', value: 'test message' }] + }) + await testPromise +}) diff --git a/test/versioned/node-rdkafka/newrelic.js b/test/versioned/kafkajs/newrelic.js similarity index 100% rename from test/versioned/node-rdkafka/newrelic.js rename to test/versioned/kafkajs/newrelic.js diff --git a/test/versioned/node-rdkafka/package.json b/test/versioned/kafkajs/package.json similarity index 69% rename from test/versioned/node-rdkafka/package.json rename to test/versioned/kafkajs/package.json index 8c34eece82..00d51e92c1 100644 --- a/test/versioned/node-rdkafka/package.json +++ b/test/versioned/kafkajs/package.json @@ -1,6 +1,6 @@ { "name": "kafka-tests", - "targets": [{"name":"node-rdkafka","minAgentVersion":"11.18.0"}], + "targets": [{"name":"kafkajs","minAgentVersion":"11.19.0"}], "version": "0.0.0", "private": true, "tests": [ @@ -9,7 +9,7 @@ "node": ">=16" }, "dependencies": { - "node-rdkafka": ">=3.0.0" + "kafkajs": ">=2.0.0" }, "files": [ "kafka.tap.js" diff --git a/test/versioned/node-rdkafka/kafka.tap.js b/test/versioned/node-rdkafka/kafka.tap.js deleted file mode 100644 index 91cd73acf5..0000000000 --- a/test/versioned/node-rdkafka/kafka.tap.js +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright 2024 New Relic Corporation. All rights reserved. - * SPDX-License-Identifier: Apache-2.0 - */ - -'use strict' - -const tap = require('tap') -const helper = require('../../lib/agent_helper') -const params = require('../../lib/params') -const { removeModules } = require('../../lib/cache-buster') - -const broker = `${params.kafka_host}:${params.kafka_port}` - -tap.beforeEach(async (t) => { - t.context.agent = helper.instrumentMockedAgent() - - const Kafka = require('node-rdkafka') - t.context.Kafka = Kafka - - await new Promise((resolve) => { - const producer = new Kafka.Producer({ - 'metadata.broker.list': broker - }) - producer.connect() - producer.setPollInterval(10) - producer.on('ready', () => { - t.context.producer = producer - resolve() - }) - }) - - await new Promise((resolve) => { - const consumer = new Kafka.KafkaConsumer({ - 'metadata.broker.list': broker, - 'group.id': 'kafka' - }) - consumer.connect() - consumer.on('ready', () => { - t.context.consumer = consumer - resolve() - }) - }) -}) - -tap.afterEach(async (t) => { - helper.unloadAgent(t.context.agent) - removeModules(['node-rdkafka']) - - await new Promise((resolve) => { - t.context.producer.disconnect(resolve) - }) - await new Promise((resolve) => { - t.context.consumer.disconnect(resolve) - }) -}) - -tap.test('stub', { timeout: 10_000 }, (t) => { - const { consumer, producer } = t.context - const topic = 'test-topic' - - consumer.on('data', (data) => { - t.equal(data.value.toString(), 'test message') - t.end() - }) - consumer.subscribe([topic]) - consumer.consume() - - setTimeout(() => { - producer.produce(topic, null, Buffer.from('test message'), 'key') - }, 500) -}) From 3ad694a5741a06fba8196aa9bae7e41610eea169 Mon Sep 17 00:00:00 2001 From: James Sumners Date: Wed, 29 May 2024 14:41:13 -0400 Subject: [PATCH 07/10] tweak broker config --- test/versioned/kafkajs/kafka.tap.js | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/test/versioned/kafkajs/kafka.tap.js b/test/versioned/kafkajs/kafka.tap.js index 05cd88f7d6..38093debdd 100644 --- a/test/versioned/kafkajs/kafka.tap.js +++ b/test/versioned/kafkajs/kafka.tap.js @@ -7,8 +7,11 @@ const tap = require('tap') const helper = require('../../lib/agent_helper') +const params = require('../../lib/params') const { removeModules } = require('../../lib/cache-buster') +const broker = `${params.kafka_host}:${params.kafka_port}` + tap.beforeEach(async (t) => { t.context.agent = helper.instrumentMockedAgent() @@ -16,7 +19,7 @@ tap.beforeEach(async (t) => { t.context.Kafka = Kafka const kafka = new Kafka({ clientId: 'kafka-test', - brokers: ['127.0.0.1:9092'] + brokers: [broker] }) const producer = kafka.producer() From b2b5130c03a0aa09787ef894241f0812f777c052 Mon Sep 17 00:00:00 2001 From: James Sumners Date: Wed, 29 May 2024 15:06:01 -0400 Subject: [PATCH 08/10] silence --- test/versioned/kafkajs/kafka.tap.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/versioned/kafkajs/kafka.tap.js b/test/versioned/kafkajs/kafka.tap.js index 38093debdd..35d285f1d2 100644 --- a/test/versioned/kafkajs/kafka.tap.js +++ b/test/versioned/kafkajs/kafka.tap.js @@ -15,11 +15,12 @@ const broker = `${params.kafka_host}:${params.kafka_port}` tap.beforeEach(async (t) => { t.context.agent = helper.instrumentMockedAgent() - const { Kafka } = require('kafkajs') + const { Kafka, logLevel } = require('kafkajs') t.context.Kafka = Kafka const kafka = new Kafka({ clientId: 'kafka-test', - brokers: [broker] + brokers: [broker], + logLevel: logLevel.NOTHING }) const producer = kafka.producer() From cd56037feef60948a06c1a72abbdc6eff568eb44 Mon Sep 17 00:00:00 2001 From: Bob Evans Date: Wed, 29 May 2024 16:03:58 -0400 Subject: [PATCH 09/10] wip: seeing if this fixes the CI issues --- test/versioned/kafkajs/kafka.tap.js | 19 +++++++++---- test/versioned/kafkajs/utils.js | 44 +++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 6 deletions(-) create mode 100644 test/versioned/kafkajs/utils.js diff --git a/test/versioned/kafkajs/kafka.tap.js b/test/versioned/kafkajs/kafka.tap.js index 35d285f1d2..0acabf3c2b 100644 --- a/test/versioned/kafkajs/kafka.tap.js +++ b/test/versioned/kafkajs/kafka.tap.js @@ -9,6 +9,7 @@ const tap = require('tap') const helper = require('../../lib/agent_helper') const params = require('../../lib/params') const { removeModules } = require('../../lib/cache-buster') +const utils = require('./utils') const broker = `${params.kafka_host}:${params.kafka_port}` @@ -17,11 +18,15 @@ tap.beforeEach(async (t) => { const { Kafka, logLevel } = require('kafkajs') t.context.Kafka = Kafka + const topic = utils.randomTopic() + t.context.topic = topic + const kafka = new Kafka({ clientId: 'kafka-test', brokers: [broker], logLevel: logLevel.NOTHING }) + utils.createTopic({ topic, kafka }) const producer = kafka.producer() await producer.connect() @@ -39,21 +44,23 @@ tap.afterEach(async (t) => { }) tap.test('stub', async (t) => { - const { consumer, producer } = t.context - const topic = 'test-topic' + const { consumer, producer, topic } = t.context + const message = 'test message' - await consumer.subscribe({ topics: [topic] }) + await consumer.subscribe({ topics: [topic], fromBeginning: true }) const testPromise = new Promise((resolve) => { consumer.run({ - eachMessage: async ({ message }) => { - t.equal(message.value.toString(), 'test message') + eachMessage: async ({ message: actualMessage }) => { + t.equal(actualMessage.value.toString(), message) resolve() } }) }) + utils.waitForConsumersToJoinGroup(consumer) await producer.send({ + acks: 1, topic, - messages: [{ key: 'key', value: 'test message' }] + messages: [{ key: 'key', value: message }] }) await testPromise }) diff --git a/test/versioned/kafkajs/utils.js b/test/versioned/kafkajs/utils.js new file mode 100644 index 0000000000..90e0ee6f15 --- /dev/null +++ b/test/versioned/kafkajs/utils.js @@ -0,0 +1,44 @@ +/* + * Copyright 2024 New Relic Corporation. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +'use strict' +const { makeId } = require('../../../lib/util/hashes') +const utils = module.exports + +utils.randomTopic = (prefix = 'test-topic') => { + return `${prefix}-${makeId()}` +} + +utils.createTopic = async ({ kafka, topic }) => { + const admin = kafka.admin() + try { + await admin.connect() + await admin.createTopics({ + waitForLeaders: true, + topics: [{ topic, numPartitions: 1, replicationFactor: 1, configEntries: [] }] + }) + } finally { + await admin.disconnect() + } +} + +utils.waitForConsumersToJoinGroup = (consumer, { maxWait = 10000, label = '' } = {}) => + new Promise((resolve, reject) => { + const timeoutId = setTimeout(() => { + consumer.disconnect().then(() => { + reject(new Error(`Timeout ${label}`.trim())) + }) + }, maxWait) + consumer.on(consumer.events.GROUP_JOIN, (event) => { + clearTimeout(timeoutId) + resolve(event) + }) + consumer.on(consumer.events.CRASH, (event) => { + clearTimeout(timeoutId) + consumer.disconnect().then(() => { + reject(event.payload.error) + }) + }) + }) From a622b5824564f3dfb9b1523ab00b320dd10db62d Mon Sep 17 00:00:00 2001 From: Bob Evans Date: Wed, 29 May 2024 16:23:36 -0400 Subject: [PATCH 10/10] chore: cleanup utils code and add jsdoc --- test/versioned/kafkajs/kafka.tap.js | 4 ++-- test/versioned/kafkajs/utils.js | 24 ++++++++++++++++++++++-- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/test/versioned/kafkajs/kafka.tap.js b/test/versioned/kafkajs/kafka.tap.js index 0acabf3c2b..125ff57815 100644 --- a/test/versioned/kafkajs/kafka.tap.js +++ b/test/versioned/kafkajs/kafka.tap.js @@ -26,7 +26,7 @@ tap.beforeEach(async (t) => { brokers: [broker], logLevel: logLevel.NOTHING }) - utils.createTopic({ topic, kafka }) + await utils.createTopic({ topic, kafka }) const producer = kafka.producer() await producer.connect() @@ -56,7 +56,7 @@ tap.test('stub', async (t) => { } }) }) - utils.waitForConsumersToJoinGroup(consumer) + await utils.waitForConsumersToJoinGroup({ consumer }) await producer.send({ acks: 1, topic, diff --git a/test/versioned/kafkajs/utils.js b/test/versioned/kafkajs/utils.js index 90e0ee6f15..f311232154 100644 --- a/test/versioned/kafkajs/utils.js +++ b/test/versioned/kafkajs/utils.js @@ -7,10 +7,21 @@ const { makeId } = require('../../../lib/util/hashes') const utils = module.exports +/** + * Creates a random topic to be used for testing + * @param {string} [prefix=test-topic] topic prefix + * @returns {string} topic name with random id appended + */ utils.randomTopic = (prefix = 'test-topic') => { return `${prefix}-${makeId()}` } +/** + * Creates a topic with the admin class + * @param {object} params to function + * @param {object} params.kafka instance of kafka.Kafka + * @param {string} params.topic topic name + */ utils.createTopic = async ({ kafka, topic }) => { const admin = kafka.admin() try { @@ -24,11 +35,20 @@ utils.createTopic = async ({ kafka, topic }) => { } } -utils.waitForConsumersToJoinGroup = (consumer, { maxWait = 10000, label = '' } = {}) => +/** + * Waits for consumer to join the group + * + * @param {object} params to function + * @param {object} params.consumer instance of kafkajs.Kafka.consumer + * @param {number} [params.maxWait=10000] how long to wait for consumer to join group + * @returns {Promise} + * + */ +utils.waitForConsumersToJoinGroup = ({ consumer, maxWait = 10000 }) => new Promise((resolve, reject) => { const timeoutId = setTimeout(() => { consumer.disconnect().then(() => { - reject(new Error(`Timeout ${label}`.trim())) + reject() }) }, maxWait) consumer.on(consumer.events.GROUP_JOIN, (event) => {