diff --git a/lib/instrumentation/kafkajs/consumer.js b/lib/instrumentation/kafkajs/consumer.js index 5826f1d8a5..c8a71ada44 100644 --- a/lib/instrumentation/kafkajs/consumer.js +++ b/lib/instrumentation/kafkajs/consumer.js @@ -7,6 +7,8 @@ const { kafkaCtx } = require('../../symbols') const { MessageSpec, MessageSubscribeSpec, RecorderSpec } = require('../../shim/specs') const { DESTINATIONS } = require('../../config/attribute-filter') +const recordMethodMetric = require('./record-method-metric') +const recordLinkingMetrics = require('./record-linking-metrics') const CONSUMER_METHODS = [ 'connect', 'disconnect', @@ -19,58 +21,65 @@ const CONSUMER_METHODS = [ ] const SEGMENT_PREFIX = 'kafkajs.Kafka.consumer#' -module.exports = function instrumentConsumer({ shim, kafkajs, recordMethodMetric }) { - const { agent } = shim - shim.wrap(kafkajs.Kafka.prototype, 'consumer', function wrapConsumer(shim, orig) { - return function wrappedConsumer() { - const args = shim.argsToArray.apply(shim, arguments) - const consumer = orig.apply(this, args) - consumer.on(consumer.events.REQUEST, function listener(data) { - // storing broker for when we add `host`, `port` to messaging spans - consumer[kafkaCtx] = { - clientId: data?.payload?.clientId, - broker: data?.payload.broker - } +module.exports = wrapConsumer + +function wrapConsumer(shim, orig) { + return function wrappedConsumer() { + const args = shim.argsToArray.apply(shim, arguments) + const consumer = orig.apply(this, args) + consumer[kafkaCtx] = this[kafkaCtx] + + consumer.on(consumer.events.REQUEST, function listener(data) { + consumer[kafkaCtx].clientId = data?.payload?.clientId + }) + shim.record(consumer, CONSUMER_METHODS, function wrapper(shim, fn, name) { + return new RecorderSpec({ + name: `${SEGMENT_PREFIX}${name}`, + promise: true }) - shim.record(consumer, CONSUMER_METHODS, function wrapper(shim, fn, name) { - return new RecorderSpec({ - name: `${SEGMENT_PREFIX}${name}`, - promise: true - }) + }) + shim.recordSubscribedConsume( + consumer, + 'run', + new MessageSubscribeSpec({ + name: `${SEGMENT_PREFIX}#run`, + destinationType: shim.TOPIC, + promise: true, + consumer: shim.FIRST, + functions: ['eachMessage'], + messageHandler: handler({ consumer }) }) - shim.recordSubscribedConsume( - consumer, - 'run', - new MessageSubscribeSpec({ - name: `${SEGMENT_PREFIX}#run`, - destinationType: shim.TOPIC, - promise: true, - consumer: shim.FIRST, - functions: ['eachMessage'], - messageHandler: handler({ consumer, recordMethodMetric }) - }) - ) + ) - shim.wrap(consumer, 'run', function wrapRun(shim, fn) { - return function wrappedRun() { - const runArgs = shim.argsToArray.apply(shim, arguments) - if (runArgs?.[0]?.eachBatch) { - runArgs[0].eachBatch = shim.wrap( - runArgs[0].eachBatch, - function wrapEachBatch(shim, eachBatch) { - return function wrappedEachBatch() { - recordMethodMetric({ agent, name: 'eachBatch' }) - return eachBatch.apply(this, arguments) - } - } - ) + shim.wrap(consumer, 'run', wrapRun) + return consumer + } +} + +function wrapRun(shim, fn) { + const agent = shim.agent + return function wrappedRun() { + const runArgs = shim.argsToArray.apply(shim, arguments) + const brokers = this[kafkaCtx].brokers + if (runArgs?.[0]?.eachBatch) { + runArgs[0].eachBatch = shim.wrap( + runArgs[0].eachBatch, + function wrapEachBatch(shim, eachBatch) { + return function wrappedEachBatch() { + recordMethodMetric({ agent, name: 'eachBatch' }) + recordLinkingMetrics({ + agent, + brokers, + topic: arguments[0].batch.topic, + producer: false + }) + return eachBatch.apply(this, arguments) } - return fn.apply(this, runArgs) } - }) - return consumer + ) } - }) + return fn.apply(this, runArgs) + } } /** @@ -80,10 +89,9 @@ module.exports = function instrumentConsumer({ shim, kafkajs, recordMethodMetric * * @param {object} params to function * @param {object} params.consumer consumer being instrumented - * @param {function} params.recordMethodMetric helper method for logging tracking metrics * @returns {function} message handler for setting metrics and spec for the consumer transaction */ -function handler({ consumer, recordMethodMetric }) { +function handler({ consumer }) { /** * Message handler that extracts the topic and headers from message being consumed. * @@ -96,10 +104,18 @@ function handler({ consumer, recordMethodMetric }) { */ return function messageHandler(shim, args) { recordMethodMetric({ agent: shim.agent, name: 'eachMessage' }) + const [data] = args const { topic } = data const segment = shim.getActiveSegment() + recordLinkingMetrics({ + agent: shim.agent, + brokers: consumer[kafkaCtx].brokers, + topic, + producer: false + }) + if (segment?.transaction) { const tx = segment.transaction const byteLength = data?.message.value?.byteLength diff --git a/lib/instrumentation/kafkajs/index.js b/lib/instrumentation/kafkajs/index.js index 57b2dd0263..9910318ea8 100644 --- a/lib/instrumentation/kafkajs/index.js +++ b/lib/instrumentation/kafkajs/index.js @@ -7,7 +7,8 @@ const instrumentProducer = require('./producer') const instrumentConsumer = require('./consumer') -const { KAFKA } = require('../../metrics/names') +const { ClassWrapSpec } = require('../../shim/specs') +const { kafkaCtx } = require('../../symbols') module.exports = function initialize(agent, kafkajs, _moduleName, shim) { if (agent.config.feature_flag.kafkajs_instrumentation === false) { @@ -18,17 +19,16 @@ module.exports = function initialize(agent, kafkajs, _moduleName, shim) { } shim.setLibrary(shim.KAFKA) - instrumentConsumer({ shim, kafkajs, recordMethodMetric }) - instrumentProducer({ shim, kafkajs, recordMethodMetric }) -} -/** - * Convenience method for logging the tracking metrics for producer and consumer - * - * @param {object} params to function - * @param {Agent} params.agent instance of agent - * @param {string} params.name name of function getting instrumented - */ -function recordMethodMetric({ agent, name }) { - agent.metrics.getOrCreateMetric(`${KAFKA.PREFIX}/${name}`).incrementCallCount() + shim.wrapClass( + kafkajs, + 'Kafka', + new ClassWrapSpec({ + post: function nrConstructorWrapper(shim, wrappedClass, name, args) { + this[kafkaCtx] = { brokers: args[0].brokers } + shim.wrap(this, 'producer', instrumentProducer) + shim.wrap(this, 'consumer', instrumentConsumer) + } + }) + ) } diff --git a/lib/instrumentation/kafkajs/producer.js b/lib/instrumentation/kafkajs/producer.js index 7a9404c43b..ee30b2083a 100644 --- a/lib/instrumentation/kafkajs/producer.js +++ b/lib/instrumentation/kafkajs/producer.js @@ -7,68 +7,84 @@ const { MessageSpec } = require('../../shim/specs') const getByPath = require('../../util/get') +const recordMethodMetric = require('./record-method-metric') +const recordLinkingMetrics = require('./record-linking-metrics') +const { kafkaCtx } = require('../../symbols') -module.exports = function instrumentProducer({ shim, kafkajs, recordMethodMetric }) { - const { agent } = shim - shim.wrap(kafkajs.Kafka.prototype, 'producer', function nrProducerWrapper(shim, orig) { - return function nrProducer() { - const params = shim.argsToArray.apply(shim, arguments) - const producer = orig.apply(this, params) +module.exports = nrProducerWrapper - // The `.producer()` method returns an object with `send` and `sendBatch` - // methods. The `send` method is merely a wrapper around `sendBatch`, but - // we cannot simply wrap `sendBatch` because the `send` method does not - // use the object scoped instance (i.e. `this.sendBatch`); it utilizes - // the closure scoped instance of `sendBatch`. So we must wrap each - // method. +function nrProducerWrapper(shim, orig) { + return function nrProducer() { + const params = shim.argsToArray.apply(shim, arguments) + const producer = orig.apply(this, params) + producer[kafkaCtx] = this[kafkaCtx] - shim.recordProduce(producer, 'send', function nrSend(shim, fn, name, args) { - recordMethodMetric({ agent, name }) - const data = args[0] - return new MessageSpec({ - promise: true, - destinationName: data.topic, - destinationType: shim.TOPIC, - messageHeaders: (inject) => { - return data.messages.map((msg) => { - if (msg.headers) { - return inject(msg.headers) - } - msg.headers = {} - return inject(msg.headers) - }) - } - }) - }) + // The `.producer()` method returns an object with `send` and `sendBatch` + // methods. The `send` method is merely a wrapper around `sendBatch`, but + // we cannot simply wrap `sendBatch` because the `send` method does not + // use the object scoped instance (i.e. `this.sendBatch`); it utilizes + // the closure scoped instance of `sendBatch`. So we must wrap each + // method. + shim.recordProduce(producer, 'send', nrSend) + shim.recordProduce(producer, 'sendBatch', nrSendBatch) + + return producer + } +} + +function nrSend(shim, fn, name, args) { + const agent = shim.agent + recordMethodMetric({ agent, name }) + const data = args[0] - shim.recordProduce(producer, 'sendBatch', function nrSendBatch(shim, fn, name, args) { - recordMethodMetric({ agent, name }) - const data = args[0] - const firstMessage = getByPath(data, 'topicMessages[0].messages[0]') + recordLinkingMetrics({ agent, brokers: this[kafkaCtx].brokers, topic: data.topic }) - if (firstMessage) { - firstMessage.headers = firstMessage.headers ?? {} + return new MessageSpec({ + promise: true, + destinationName: data.topic, + destinationType: shim.TOPIC, + messageHeaders: (inject) => { + return data.messages.map((msg) => { + if (msg.headers) { + return inject(msg.headers) } + msg.headers = {} + return inject(msg.headers) + }) + } + }) +} - return new MessageSpec({ - promise: true, - destinationName: data.topicMessages[0].topic, - destinationType: shim.TOPIC, - messageHeaders: (inject) => { - return data.topicMessages.map((tm) => { - return tm.messages.map((m) => { - if (m.headers) { - return inject(m.headers) - } - m.headers = {} - return inject(m.headers) - }) - }) +function nrSendBatch(shim, fn, name, args) { + const agent = shim.agent + recordMethodMetric({ agent, name }) + const data = args[0] + const firstMessage = getByPath(data, 'topicMessages[0].messages[0]') + + recordLinkingMetrics({ + agent, + brokers: this[kafkaCtx].brokers, + topic: data.topicMessages[0].topic + }) + + if (firstMessage) { + firstMessage.headers = firstMessage.headers ?? {} + } + + return new MessageSpec({ + promise: true, + destinationName: data.topicMessages[0].topic, + destinationType: shim.TOPIC, + messageHeaders: (inject) => { + return data.topicMessages.map((tm) => { + return tm.messages.map((m) => { + if (m.headers) { + return inject(m.headers) } + m.headers = {} + return inject(m.headers) }) }) - - return producer } }) } diff --git a/lib/instrumentation/kafkajs/record-linking-metrics.js b/lib/instrumentation/kafkajs/record-linking-metrics.js new file mode 100644 index 0000000000..9f51eab433 --- /dev/null +++ b/lib/instrumentation/kafkajs/record-linking-metrics.js @@ -0,0 +1,17 @@ +/* + * Copyright 2024 New Relic Corporation. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +'use strict' + +module.exports = recordLinkingMetrics + +function recordLinkingMetrics({ agent, brokers, topic, producer = true }) { + const kind = producer === true ? 'Produce' : 'Consume' + for (const broker of brokers) { + agent.metrics + .getOrCreateMetric(`MessageBroker/Kafka/Nodes/${broker}/${kind}/Named/${topic}`) + .incrementCallCount() + } +} diff --git a/lib/instrumentation/kafkajs/record-method-metric.js b/lib/instrumentation/kafkajs/record-method-metric.js new file mode 100644 index 0000000000..6beea0edbd --- /dev/null +++ b/lib/instrumentation/kafkajs/record-method-metric.js @@ -0,0 +1,21 @@ +/* + * Copyright 2024 New Relic Corporation. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +'use strict' + +const { KAFKA } = require('../../metrics/names') + +module.exports = recordMethodMetric + +/** + * Convenience method for logging the tracking metrics for producer and consumer + * + * @param {object} params to function + * @param {Agent} params.agent instance of agent + * @param {string} params.name name of function getting instrumented + */ +function recordMethodMetric({ agent, name }) { + agent.metrics.getOrCreateMetric(`${KAFKA.PREFIX}/${name}`).incrementCallCount() +} diff --git a/test/versioned/kafkajs/kafka.tap.js b/test/versioned/kafkajs/kafka.tap.js index 7026590d35..705af5a4d1 100644 --- a/test/versioned/kafkajs/kafka.tap.js +++ b/test/versioned/kafkajs/kafka.tap.js @@ -51,7 +51,7 @@ tap.afterEach(async (t) => { }) tap.test('send records correctly', (t) => { - t.plan(7) + t.plan(8) const { agent, consumer, producer, topic } = t.context const message = 'test message' @@ -73,6 +73,11 @@ tap.test('send records correctly', (t) => { 'Supportability/Features/Instrumentation/kafkajs/send' ) t.equal(sendMetric.callCount, 1) + + const produceTrackingMetric = agent.metrics.getMetric( + `MessageBroker/Kafka/Nodes/${broker}/Produce/Named/${topic}` + ) + t.equal(produceTrackingMetric.callCount, 1) } if (txCount === 2) { @@ -176,7 +181,7 @@ tap.test('send passes along DT headers', (t) => { }) tap.test('sendBatch records correctly', (t) => { - t.plan(8) + t.plan(9) const { agent, consumer, producer, topic } = t.context const message = 'test message' @@ -199,6 +204,11 @@ tap.test('sendBatch records correctly', (t) => { ) t.equal(sendMetric.callCount, 1) + const produceTrackingMetric = agent.metrics.getMetric( + `MessageBroker/Kafka/Nodes/${broker}/Produce/Named/${topic}` + ) + t.equal(produceTrackingMetric.callCount, 1) + t.end() } }) @@ -250,6 +260,12 @@ tap.test('consume outside of a transaction', async (t) => { 'Supportability/Features/Instrumentation/kafkajs/eachMessage' ) t.equal(sendMetric.callCount, 1) + + const consumeTrackingMetric = agent.metrics.getMetric( + `MessageBroker/Kafka/Nodes/${broker}/Consume/Named/${topic}` + ) + t.equal(consumeTrackingMetric.callCount, 1) + resolve() }) }) @@ -358,6 +374,12 @@ tap.test('consume batch inside of a transaction', async (t) => { 'Supportability/Features/Instrumentation/kafkajs/eachBatch' ) t.equal(sendMetric.callCount, 1) + + const consumeTrackingMetric = agent.metrics.getMetric( + `MessageBroker/Kafka/Nodes/${broker}/Consume/Named/${topic}` + ) + t.equal(consumeTrackingMetric.callCount, 1) + resolve() } })