Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 240 additions & 0 deletions src/admin/__tests__/fetchConsumerGroupOffsets.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
const createAdmin = require('../index')
const createProducer = require('../../producer')
const createConsumer = require('../../consumer')
const {
secureRandom,
createCluster,
newLogger,
createTopic,
createModPartitioner,
waitForConsumerToJoinGroup,
generateMessages,
testIfKafkaAtLeast_0_11,
} = require('testHelpers')

describe('Admin', () => {
let admin, cluster, groupId, logger, topicName, anotherTopicName

beforeEach(async () => {
topicName = `test-topic-${secureRandom()}`
anotherTopicName = `another-topic-${secureRandom()}`
groupId = `consumer-group-id-${secureRandom()}`

await createTopic({ topic: topicName })
await createTopic({ topic: anotherTopicName })

logger = newLogger()
cluster = createCluster()
admin = createAdmin({ cluster, logger })

await admin.connect()
})

afterEach(async () => {
admin && (await admin.disconnect())
})

describe('fetchOffsets', () => {
test('throws an error if the groupId is invalid', async () => {
await expect(admin.fetchOffsets({ groupId: null })).rejects.toHaveProperty(
'message',
'Invalid groupId null'
)
})

test('returns unresolved consumer group offsets', async () => {
const offsets = await admin.fetchOffsets({
groupId,
topic: topicName,
})

expect(offsets).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '-1', metadata: null }] },
])
})

test('returns the current consumer group offset', async () => {
await admin.setOffsets({
groupId,
topic: topicName,
partitions: [{ partition: 0, offset: 13 }],
})

const offsets = await admin.fetchOffsets({
groupId,
topic: topicName,
})

expect(offsets).toEqual([{ partition: 0, offset: '13', metadata: null }])
})

test('returns consumer group offsets for all topics', async () => {
await admin.setOffsets({
groupId,
topic: topicName,
partitions: [{ partition: 0, offset: 13 }],
})
await admin.setOffsets({
groupId,
topic: anotherTopicName,
partitions: [{ partition: 0, offset: 42 }],
})

const offsets = await admin.fetchOffsets({
groupId,
})

expect(offsets).toEqual([
{ topic: topicName, partitions: [{ partition: 0, offset: '13', metadata: null }] },
{ topic: anotherTopicName, partitions: [{ partition: 0, offset: '42', metadata: null }] },
])
})

describe('when used with the resolvedOffsets option', () => {
let producer, consumer

beforeEach(async done => {
producer = createProducer({
cluster,
createPartitioner: createModPartitioner,
logger,
})
await producer.connect()

consumer = createConsumer({
cluster,
groupId,
maxWaitTimeInMs: 100,
logger,
})

await consumer.connect()
await consumer.subscribe({ topic: topicName, fromBeginning: true })
consumer.run({ eachMessage: () => {} })
await waitForConsumerToJoinGroup(consumer)

consumer.on(consumer.events.END_BATCH_PROCESS, async () => {
// stop the consumer after the first batch, so only 5 are committed
await consumer.stop()
// send batch #2
await producer.send({
acks: 1,
topic: topicName,
messages: generateMessages({ number: 5 }),
})
done()
})

// send batch #1
await producer.send({
acks: 1,
topic: topicName,
messages: generateMessages({ number: 5 }),
})
})

afterEach(async () => {
producer && (await producer.disconnect())
consumer && (await consumer.disconnect())
})

test('no reset: returns latest *committed* consumer offsets', async () => {
const offsetsBeforeResolving = await admin.fetchOffsets({
groupId,
topic: topicName,
})
const offsetsUponResolving = await admin.fetchOffsets({
groupId,
topic: topicName,
resolveOffsets: true,
})
const offsetsAfterResolving = await admin.fetchOffsets({
groupId,
topic: topicName,
})

expect(offsetsBeforeResolving).toEqual([{ partition: 0, offset: '5', metadata: null }])
expect(offsetsUponResolving).toEqual([{ partition: 0, offset: '5', metadata: null }])
expect(offsetsAfterResolving).toEqual([{ partition: 0, offset: '5', metadata: null }])
})

test('reset to latest: returns latest *topic* offsets after resolving', async () => {
await admin.resetOffsets({ groupId, topic: topicName })

const offsetsBeforeResolving = await admin.fetchOffsets({
groupId,
topic: topicName,
})
const offsetsUponResolving = await admin.fetchOffsets({
groupId,
topic: topicName,
resolveOffsets: true,
})
const offsetsAfterResolving = await admin.fetchOffsets({
groupId,
topic: topicName,
})

expect(offsetsBeforeResolving).toEqual([{ partition: 0, offset: '-1', metadata: null }])
expect(offsetsUponResolving).toEqual([{ partition: 0, offset: '10', metadata: null }])
expect(offsetsAfterResolving).toEqual([{ partition: 0, offset: '10', metadata: null }])
})

test('reset to earliest: returns earliest *topic* offsets after resolving', async () => {
await admin.resetOffsets({ groupId, topic: topicName, earliest: true })

const offsetsBeforeResolving = await admin.fetchOffsets({
groupId,
topic: topicName,
})
const offsetsUponResolving = await admin.fetchOffsets({
groupId,
topic: topicName,
resolveOffsets: true,
})
const offsetsAfterResolving = await admin.fetchOffsets({
groupId,
topic: topicName,
})

expect(offsetsBeforeResolving).toEqual([{ partition: 0, offset: '-2', metadata: null }])
expect(offsetsUponResolving).toEqual([{ partition: 0, offset: '0', metadata: null }])
expect(offsetsAfterResolving).toEqual([{ partition: 0, offset: '0', metadata: null }])
})

testIfKafkaAtLeast_0_11(
'will return the correct earliest offset when it is greater than 0',
async () => {
// simulate earliest offset = 7, by deleting first 7 messages from the topic
const messagesToDelete = [
{
partition: 0,
offset: '7',
},
]

await admin.deleteTopicRecords({ topic: topicName, partitions: messagesToDelete })
await admin.resetOffsets({ groupId, topic: topicName, earliest: true })

const offsetsBeforeResolving = await admin.fetchOffsets({
groupId,
topic: topicName,
})
const offsetsUponResolving = await admin.fetchOffsets({
groupId,
topic: topicName,
resolveOffsets: true,
})
const offsetsAfterResolving = await admin.fetchOffsets({
groupId,
topic: topicName,
})

expect(offsetsBeforeResolving).toEqual([{ partition: 0, offset: '-2', metadata: null }])
expect(offsetsUponResolving).toEqual([{ partition: 0, offset: '7', metadata: null }])
expect(offsetsAfterResolving).toEqual([{ partition: 0, offset: '7', metadata: null }])
}
)
})
})
})
77 changes: 77 additions & 0 deletions src/admin/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,82 @@ module.exports = ({
.pop()
}

/**
* @param {string} groupId
* @param {string[]]} topics
* @param {boolean} [resolveOffsets=false]
* @return {Promise}
*/
const fetchConsumerGroupOffsets = async ({ groupId, topics, resolveOffsets = false }) => {
if (!groupId) {
throw new KafkaJSNonRetriableError(`Invalid groupId ${groupId}`)
}

const coordinator = await cluster.findGroupCoordinator({ groupId })

let consumerOffsets
if (topics.length) {
const topicsToFetch = Promise.all(
topics.map(async topic => {
const partitions = await findTopicPartitions(cluster, topic)
const partitionsToFetch = partitions.map(partition => ({ partition }))
return { topic, partitions: partitionsToFetch }
})
)
const { responses } = await coordinator.offsetFetch({
groupId,
topicsToFetch,
})
consumerOffsets = responses
} else {
const { responses } = await coordinator.offsetFetch({ groupId })
consumerOffsets = responses
}

if (resolveOffsets) {
consumerOffsets = Promise.all(
consumerOffsets.map(async ({ topic, partitions }) => {
const indexedOffsets = indexByPartition(await fetchTopicOffsets(topic))
const recalculatedPartitions = partitions.map(({ offset, partition, ...props }) => {
let resolvedOffset = offset
if (Number(offset) === EARLIEST_OFFSET) {
resolvedOffset = indexedOffsets[partition].low
}
if (Number(offset) === LATEST_OFFSET) {
resolvedOffset = indexedOffsets[partition].high
}
return {
partition,
offset: resolvedOffset,
...props,
}
})

return {
topic,
partitions: recalculatedPartitions,
}
})
)

await Promise.all(
consumerOffsets.map(({ topic, partitions }) => setOffsets({ groupId, topic, partitions }))
)
}

return consumerOffsets
.filter(response => !topics.length || response.topic in topics)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't topics an array? in checks for the existence of a key in an object, not whether an item is in an array. Maybe you meant topics.includes(response.topic)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it was just a draft PR and I'm not super familiar with javascript :)
I've removed this line altogether since I realized why would response not always include exactly the topics we requested?

.map(({ topic, partitions }) => {
const completePartitions = partitions.map(({ partition, offset, metadata }) => ({
partition,
offset,
metadata: metadata || null,
}))

return { topic, partitions: completePartitions }
})
}

/**
* @param {string} groupId
* @param {string} topic
Expand Down Expand Up @@ -1459,6 +1535,7 @@ module.exports = ({
describeCluster,
events,
fetchOffsets,
fetchConsumerGroupOffsets,
fetchTopicOffsets,
fetchTopicOffsetsByTimestamp,
setOffsets,
Expand Down