Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
wip: seeing if this fixes the CI issues
  • Loading branch information
bizob2828 committed May 29, 2024
commit cd56037feef60948a06c1a72abbdc6eff568eb44
19 changes: 13 additions & 6 deletions test/versioned/kafkajs/kafka.tap.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const tap = require('tap')
const helper = require('../../lib/agent_helper')
const params = require('../../lib/params')
const { removeModules } = require('../../lib/cache-buster')
const utils = require('./utils')

const broker = `${params.kafka_host}:${params.kafka_port}`

Expand All @@ -17,11 +18,15 @@ tap.beforeEach(async (t) => {

const { Kafka, logLevel } = require('kafkajs')
t.context.Kafka = Kafka
const topic = utils.randomTopic()
t.context.topic = topic

const kafka = new Kafka({
clientId: 'kafka-test',
brokers: [broker],
logLevel: logLevel.NOTHING
})
utils.createTopic({ topic, kafka })

const producer = kafka.producer()
await producer.connect()
Expand All @@ -39,21 +44,23 @@ tap.afterEach(async (t) => {
})

tap.test('stub', async (t) => {
const { consumer, producer } = t.context
const topic = 'test-topic'
const { consumer, producer, topic } = t.context
const message = 'test message'

await consumer.subscribe({ topics: [topic] })
await consumer.subscribe({ topics: [topic], fromBeginning: true })
const testPromise = new Promise((resolve) => {
consumer.run({
eachMessage: async ({ message }) => {
t.equal(message.value.toString(), 'test message')
eachMessage: async ({ message: actualMessage }) => {
t.equal(actualMessage.value.toString(), message)
resolve()
}
})
})
utils.waitForConsumersToJoinGroup(consumer)
await producer.send({
acks: 1,
topic,
messages: [{ key: 'key', value: 'test message' }]
messages: [{ key: 'key', value: message }]
})
await testPromise
})
44 changes: 44 additions & 0 deletions test/versioned/kafkajs/utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2024 New Relic Corporation. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/

'use strict'
const { makeId } = require('../../../lib/util/hashes')
const utils = module.exports

utils.randomTopic = (prefix = 'test-topic') => {
return `${prefix}-${makeId()}`
}

utils.createTopic = async ({ kafka, topic }) => {
const admin = kafka.admin()
try {
await admin.connect()
await admin.createTopics({
waitForLeaders: true,
topics: [{ topic, numPartitions: 1, replicationFactor: 1, configEntries: [] }]
})
} finally {
await admin.disconnect()
}
}

utils.waitForConsumersToJoinGroup = (consumer, { maxWait = 10000, label = '' } = {}) =>
new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => {
consumer.disconnect().then(() => {
reject(new Error(`Timeout ${label}`.trim()))
})
}, maxWait)
consumer.on(consumer.events.GROUP_JOIN, (event) => {
clearTimeout(timeoutId)
resolve(event)
})
consumer.on(consumer.events.CRASH, (event) => {
clearTimeout(timeoutId)
consumer.disconnect().then(() => {
reject(event.payload.error)
})
})
})