Skip to content

Conversation

@jsumners-nr
Copy link
Contributor

This PR resolves #2245.

@jsumners-nr jsumners-nr marked this pull request as ready for review June 5, 2024 18:53
@jsumners-nr jsumners-nr requested a review from bizob2828 June 5, 2024 18:53
@bizob2828 bizob2828 self-assigned this Jun 5, 2024
Copy link
Member

@bizob2828 bizob2828 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall looks great. just some suggestions on test assertion additions

eachMessage: async ({ message: actualMessage }) => {
t.equal(actualMessage.value.toString(), message)
t.match(actualMessage.headers['x-foo'].toString(), 'foo')
t.equal(actualMessage.headers.newrelic.toString(), '')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are fine but it's only asserting 1 message. you could add this same assertion to consume inside of a transaction which is sending multiple messages

await t.context.producer.disconnect()
})

tap.test('send records correctly', (t) => {
Copy link
Member

@bizob2828 bizob2828 Jun 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also enhance the send passes along DT headers to send multiple messages and then update the util.verifyDistributedTrace to assert multiple consumer transactions to make sure it's doing the right thing

diff --git a/test/versioned/kafkajs/kafka.tap.js b/test/versioned/kafkajs/kafka.tap.js
index ee0160c2d..e0af7c1eb 100644
--- a/test/versioned/kafkajs/kafka.tap.js
+++ b/test/versioned/kafkajs/kafka.tap.js
@@ -121,7 +121,7 @@ tap.test('send passes along DT headers', (t) => {
   agent.config.primary_application_id = 'app_1'
   agent.config.trusted_account_key = 42
   let produceTx = null
-  let consumeTx = null
+  let consumeTxs = [] 
   let txCount = 0
 
   agent.on('transactionFinished', (tx) => {
@@ -130,11 +130,11 @@ tap.test('send passes along DT headers', (t) => {
     if (tx.name === expectedName) {
       produceTx = tx
     } else {
-      consumeTx = tx
+      consumeTxs.push(tx)
     }
 
-    if (txCount === 2) {
-      utils.verifyDistributedTrace({ t, consumeTx, produceTx })
+    if (txCount === 3) {
+      utils.verifyDistributedTrace({ t, consumeTxs, produceTx })
       t.end()
     }
   })
@@ -146,7 +146,7 @@ tap.test('send passes along DT headers', (t) => {
     const promise = new Promise((resolve) => {
       consumer.run({
         eachMessage: async ({ message: actualMessage }) => {
-          t.equal(actualMessage.value.toString(), 'one')
+          //t.equal(actualMessage.value.toString(), 'one')
           resolve()
         }
       })
@@ -156,7 +156,7 @@ tap.test('send passes along DT headers', (t) => {
     await producer.send({
       acks: 1,
       topic,
-      messages: [{ key: 'key', value: 'one' }]
+      messages: [{ key: 'key', value: 'one' }, { key: 'key2', value: 'two'}]
     })
 
     await promise
diff --git a/test/versioned/kafkajs/utils.js b/test/versioned/kafkajs/utils.js
index b97ee32b0..577898be4 100644
--- a/test/versioned/kafkajs/utils.js
+++ b/test/versioned/kafkajs/utils.js
@@ -107,15 +107,17 @@ utils.verifyConsumeTransaction = ({ t, tx, topic, clientId }) => {
  * @param {object} params.consumeTx consumer transaction
  * @param {object} params.produceTx produce transaction
  */
-utils.verifyDistributedTrace = ({ t, consumeTx, produceTx }) => {
+utils.verifyDistributedTrace = ({ t, consumeTxs, produceTx }) => {
   t.ok(produceTx.isDistributedTrace, 'should mark producer as distributed')
-  t.ok(consumeTx.isDistributedTrace, 'should mark consumer as distributed')
+  const produceSegment = produceTx.trace.root.children[3]
+  consumeTxs.forEach((consumeTx) => {
+    t.ok(consumeTx.isDistributedTrace, 'should mark consumer as distributed')
 
-  t.equal(consumeTx.incomingCatId, null, 'should not set old CAT properties')
+    t.equal(consumeTx.incomingCatId, null, 'should not set old CAT properties')
 
-  t.equal(produceTx.id, consumeTx.parentId, 'should have proper parent id')
-  t.equal(produceTx.traceId, consumeTx.traceId, 'should have proper trace id')
-  const produceSegment = produceTx.trace.root.children[3]
-  t.equal(produceSegment.id, consumeTx.parentSpanId, 'should have proper parentSpanId')
-  t.equal(consumeTx.parentTransportType, 'Kafka', 'should have correct transport type')
+    t.equal(produceTx.id, consumeTx.parentId, 'should have proper parent id')
+    t.equal(produceTx.traceId, consumeTx.traceId, 'should have proper trace id')
+    t.equal(produceSegment.id, consumeTx.parentSpanId, 'should have proper parentSpanId')
+    t.equal(consumeTx.parentTransportType, 'Kafka', 'should have correct transport type')
+  })
 }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have verification that the message shim injects headers to multiple messages in the message shim's unit tests. I'm not clear what the suggested change does other than duplicate that existing test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i hear ya. My only concern is if for some reason someone changes the messageHeaders logic in kafkajs and there isn't direct assertions it could cause regression

await t.context.producer.disconnect()
})

tap.test('send records correctly', (t) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i hear ya. My only concern is if for some reason someone changes the messageHeaders logic in kafkajs and there isn't direct assertions it could cause regression

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Archived in project

Development

Successfully merging this pull request may close these issues.

Update message shim to handle inserting headers on n messages in a producer

2 participants