diff --git a/docker-compose.yml b/docker-compose.yml index ac6fb6a20a..084467cce3 100755 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,6 @@ version: "3" services: + elasticsearch: container_name: nr_node_elastic image: docker.elastic.co/elasticsearch/elasticsearch:8.8.0 @@ -21,11 +22,45 @@ services: interval: 30s timeout: 10s retries: 5 + + # Kafka setup based on the e2e tests in node-rdkafka. Needs both the + # `zookeeper` and `kafka` services. + zookeeper: + container_name: nr_node_kafka_zookeeper + image: confluentinc/cp-zookeeper + ports: + - '2181:2181' + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + kafka: + container_name: nr_node_kafka + image: confluentinc/cp-kafka + links: + - zookeeper + ports: + - '9092:9092' + healthcheck: + test: /usr/bin/kafka-cluster cluster-id --bootstrap-server localhost:9092 || exit 1 + interval: 1s + timeout: 60s + retries: 60 + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092' + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_DEFAULT_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + memcached: container_name: nr_node_memcached image: memcached ports: - "11211:11211" + mongodb_3: container_name: nr_node_mongodb platform: linux/amd64 @@ -37,6 +72,7 @@ services: interval: 1s timeout: 10s retries: 30 + mongodb_5: container_name: nr_node_mongodb_5 image: library/mongo:5 @@ -47,6 +83,7 @@ services: interval: 1s timeout: 10s retries: 30 + mysql: container_name: nr_node_mysql platform: linux/amd64 @@ -60,6 +97,7 @@ services: interval: 1s timeout: 10s retries: 30 + redis: container_name: nr_node_redis image: redis @@ -70,6 +108,7 @@ services: interval: 1s timeout: 10s retries: 30 + cassandra: container_name: nr_node_cassandra platform: linux/amd64 @@ -80,13 +119,15 @@ services: test: [ "CMD", "cqlsh", "-u cassandra", "-p cassandra"] interval: 5s timeout: 10s - retries: 6 + retries: 6 + # pg 9.2 has built in healthcheck pg: container_name: nr_node_postgres image: postgres:9.2 ports: - "5432:5432" + pg_prisma: container_name: nr_node_postgres_prisma image: postgres:15 @@ -100,6 +141,7 @@ services: interval: 1s timeout: 10s retries: 30 + rmq: container_name: nr_node_rabbit image: rabbitmq:3 diff --git a/lib/instrumentation/kafkajs.js b/lib/instrumentation/kafkajs.js new file mode 100644 index 0000000000..7aa3861857 --- /dev/null +++ b/lib/instrumentation/kafkajs.js @@ -0,0 +1,11 @@ +/* + * Copyright 2024 New Relic Corporation. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +'use strict' + +// eslint-disable-next-line no-unused-vars +module.exports = function initialize(_agent, kafkajs, _moduleName, shim) { + // Put instrumentation code here for kafkajs.Kafka.producer and kafkajs.Kafka.consumer +} diff --git a/lib/instrumentations.js b/lib/instrumentations.js index 127a542e02..4b0f7f8d07 100644 --- a/lib/instrumentations.js +++ b/lib/instrumentations.js @@ -29,6 +29,7 @@ module.exports = function instrumentations() { 'fastify': { type: InstrumentationDescriptor.TYPE_WEB_FRAMEWORK }, 'generic-pool': { type: InstrumentationDescriptor.TYPE_GENERIC }, 'ioredis': { type: InstrumentationDescriptor.TYPE_DATASTORE }, + 'kafkajs': { type: InstrumentationDescriptor.TYPE_MESSAGE }, 'koa': { module: './instrumentation/koa' }, 'langchain': { module: './instrumentation/langchain' }, 'memcached': { type: InstrumentationDescriptor.TYPE_DATASTORE }, diff --git a/package.json b/package.json index 0923d8e5fc..f229ac7134 100644 --- a/package.json +++ b/package.json @@ -164,6 +164,7 @@ "public-docs": "jsdoc -c ./jsdoc-conf.jsonc && cp examples/shim/*.png out/", "publish-docs": "./bin/publish-docs.sh", "services": "docker compose up -d --wait", + "services:stop": "docker compose down", "smoke": "npm run ssl && time tap test/smoke/**/**/*.tap.js --timeout=180 --no-coverage", "ssl": "./bin/ssl.sh", "sub-install": "node test/bin/install_sub_deps", diff --git a/test/lib/cache-buster.js b/test/lib/cache-buster.js index 93cbcccde3..57b10a4d83 100644 --- a/test/lib/cache-buster.js +++ b/test/lib/cache-buster.js @@ -5,11 +5,6 @@ 'use strict' -/** - * Utility method to remove a set of modules from the require cache. - * - * @param {string[]} modules The set of module names to remove from the cache. - */ module.exports = { /** * Removes explicitly named modules from the require cache. diff --git a/test/lib/params.js b/test/lib/params.js index 2d81b7b5cd..60f82472e4 100644 --- a/test/lib/params.js +++ b/test/lib/params.js @@ -6,6 +6,9 @@ 'use strict' module.exports = { + kafka_host: process.env.NR_NODE_TEST_KAFKA_HOST || '127.0.0.1', + kafka_port: process.env.NR_NODE_TEST_KAFKA_PORT || 9092, + memcached_host: process.env.NR_NODE_TEST_MEMCACHED_HOST || 'localhost', memcached_port: process.env.NR_NODE_TEST_MEMCACHED_PORT || 11211, diff --git a/test/versioned/kafkajs/kafka.tap.js b/test/versioned/kafkajs/kafka.tap.js new file mode 100644 index 0000000000..125ff57815 --- /dev/null +++ b/test/versioned/kafkajs/kafka.tap.js @@ -0,0 +1,66 @@ +/* + * Copyright 2024 New Relic Corporation. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +'use strict' + +const tap = require('tap') +const helper = require('../../lib/agent_helper') +const params = require('../../lib/params') +const { removeModules } = require('../../lib/cache-buster') +const utils = require('./utils') + +const broker = `${params.kafka_host}:${params.kafka_port}` + +tap.beforeEach(async (t) => { + t.context.agent = helper.instrumentMockedAgent() + + const { Kafka, logLevel } = require('kafkajs') + t.context.Kafka = Kafka + const topic = utils.randomTopic() + t.context.topic = topic + + const kafka = new Kafka({ + clientId: 'kafka-test', + brokers: [broker], + logLevel: logLevel.NOTHING + }) + await utils.createTopic({ topic, kafka }) + + const producer = kafka.producer() + await producer.connect() + t.context.producer = producer + const consumer = kafka.consumer({ groupId: 'kafka' }) + await consumer.connect() + t.context.consumer = consumer +}) + +tap.afterEach(async (t) => { + helper.unloadAgent(t.context.agent) + removeModules(['kafkajs']) + await t.context.consumer.disconnect() + await t.context.producer.disconnect() +}) + +tap.test('stub', async (t) => { + const { consumer, producer, topic } = t.context + const message = 'test message' + + await consumer.subscribe({ topics: [topic], fromBeginning: true }) + const testPromise = new Promise((resolve) => { + consumer.run({ + eachMessage: async ({ message: actualMessage }) => { + t.equal(actualMessage.value.toString(), message) + resolve() + } + }) + }) + await utils.waitForConsumersToJoinGroup({ consumer }) + await producer.send({ + acks: 1, + topic, + messages: [{ key: 'key', value: message }] + }) + await testPromise +}) diff --git a/test/versioned/kafkajs/newrelic.js b/test/versioned/kafkajs/newrelic.js new file mode 100644 index 0000000000..af4b696e2f --- /dev/null +++ b/test/versioned/kafkajs/newrelic.js @@ -0,0 +1,28 @@ +/* + * Copyright 2021 New Relic Corporation. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +'use strict' + +exports.config = { + app_name: ['My Application'], + license_key: 'license key here', + logging: { + level: 'trace', + filepath: '../../newrelic_agent.log' + }, + utilization: { + detect_aws: false, + detect_pcf: false, + detect_azure: false, + detect_gcp: false, + detect_docker: false + }, + distributed_tracing: { + enabled: true + }, + transaction_tracer: { + enabled: true + } +} diff --git a/test/versioned/kafkajs/package.json b/test/versioned/kafkajs/package.json new file mode 100644 index 0000000000..00d51e92c1 --- /dev/null +++ b/test/versioned/kafkajs/package.json @@ -0,0 +1,19 @@ +{ + "name": "kafka-tests", + "targets": [{"name":"kafkajs","minAgentVersion":"11.19.0"}], + "version": "0.0.0", + "private": true, + "tests": [ + { + "engines": { + "node": ">=16" + }, + "dependencies": { + "kafkajs": ">=2.0.0" + }, + "files": [ + "kafka.tap.js" + ] + } + ] +} diff --git a/test/versioned/kafkajs/utils.js b/test/versioned/kafkajs/utils.js new file mode 100644 index 0000000000..f311232154 --- /dev/null +++ b/test/versioned/kafkajs/utils.js @@ -0,0 +1,64 @@ +/* + * Copyright 2024 New Relic Corporation. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +'use strict' +const { makeId } = require('../../../lib/util/hashes') +const utils = module.exports + +/** + * Creates a random topic to be used for testing + * @param {string} [prefix=test-topic] topic prefix + * @returns {string} topic name with random id appended + */ +utils.randomTopic = (prefix = 'test-topic') => { + return `${prefix}-${makeId()}` +} + +/** + * Creates a topic with the admin class + * @param {object} params to function + * @param {object} params.kafka instance of kafka.Kafka + * @param {string} params.topic topic name + */ +utils.createTopic = async ({ kafka, topic }) => { + const admin = kafka.admin() + try { + await admin.connect() + await admin.createTopics({ + waitForLeaders: true, + topics: [{ topic, numPartitions: 1, replicationFactor: 1, configEntries: [] }] + }) + } finally { + await admin.disconnect() + } +} + +/** + * Waits for consumer to join the group + * + * @param {object} params to function + * @param {object} params.consumer instance of kafkajs.Kafka.consumer + * @param {number} [params.maxWait=10000] how long to wait for consumer to join group + * @returns {Promise} + * + */ +utils.waitForConsumersToJoinGroup = ({ consumer, maxWait = 10000 }) => + new Promise((resolve, reject) => { + const timeoutId = setTimeout(() => { + consumer.disconnect().then(() => { + reject() + }) + }, maxWait) + consumer.on(consumer.events.GROUP_JOIN, (event) => { + clearTimeout(timeoutId) + resolve(event) + }) + consumer.on(consumer.events.CRASH, (event) => { + clearTimeout(timeoutId) + consumer.disconnect().then(() => { + reject(event.payload.error) + }) + }) + })