Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
KAFKA-19331: Fix misleading log messages when follower's leader is no…
…t in metadata image
  • Loading branch information
geek-bit committed Dec 9, 2025
commit f2c8ced6ac8aefdfe29f91c2eecf72d8027853e7
16 changes: 13 additions & 3 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2520,6 +2520,7 @@ class ReplicaManager(val config: KafkaConfig,

val listenerName = config.interBrokerListenerName.value
val partitionAndOffsets = new mutable.HashMap[TopicPartition, InitialFetchState]
val partitionsWithoutLeader = new mutable.HashMap[TopicPartition, Option[Int]]

partitionsToStartFetching.foreachEntry { (topicPartition, partition) =>
val nodeOpt = partition.leaderReplicaIdOpt
Expand All @@ -2536,13 +2537,22 @@ class ReplicaManager(val config: KafkaConfig,
initialFetchOffset(log)
))
case None =>
stateChangeLogger.trace(s"Unable to start fetching $topicPartition with topic ID ${partition.topicId} " +
s"from leader ${partition.leaderReplicaIdOpt} because it is not alive.")
partitionsWithoutLeader.put(topicPartition, partition.leaderReplicaIdOpt)
}
}

// Log partitions whose leader is not alive in the metadata image.
if (partitionsWithoutLeader.nonEmpty) {
partitionsWithoutLeader.foreachEntry { (topicPartition, leaderIdOpt) =>
stateChangeLogger.warn(s"Unable to start fetching $topicPartition " +
s"from leader $leaderIdOpt because it is not alive.")
}
}

replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets)
stateChangeLogger.info(s"Started fetchers as part of become-follower for ${partitionsToStartFetching.size} partitions")
if (partitionAndOffsets.nonEmpty) {
stateChangeLogger.info(s"Started fetchers as part of become-follower for ${partitionAndOffsets.size} partitions")
}

partitionsToStartFetching.foreach{ case (topicPartition, partition) =>
completeDelayedOperationsWhenNotPartitionLeader(topicPartition, partition.topicId)}
Expand Down
42 changes: 42 additions & 0 deletions core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4629,6 +4629,48 @@ class ReplicaManagerTest {
}
}

@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testDeltaFollowerWhenLeaderNotInClusterImage(enableRemoteStorage: Boolean): Unit = {
val localId = 1
val leaderId = 99
val topicPartition = new TopicPartition("foo", 0)
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, enableRemoteStorage = enableRemoteStorage)

try {
// Create a delta where localId is follower and leaderId (99) is the leader
// The leader ID 99 does NOT exist in ClusterImageTest.IMAGE1
val delta = new TopicsDelta(TopicsImage.EMPTY)
delta.replay(new TopicRecord().setName("foo").setTopicId(FOO_UUID))
delta.replay(new PartitionRecord()
.setPartitionId(0)
.setTopicId(FOO_UUID)
.setReplicas(util.Arrays.asList(localId, leaderId))
.setIsr(util.Arrays.asList(localId, leaderId))
.setLeader(leaderId) // Leader is broker 99
.setLeaderEpoch(0)
.setPartitionEpoch(0))

// Use the standard cluster image which does NOT contain broker 99
val metadataImage = imageFromTopics(delta.apply())

replicaManager.applyDelta(delta, metadataImage)

// Verify the partition was created and is a follower
val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition)
assertFalse(followerPartition.isLeader)
assertEquals(0, followerPartition.getLeaderEpoch)
assertEquals(Some(leaderId), followerPartition.leaderReplicaIdOpt)

// Verify no fetcher was started since the leader (99) is not in the cluster image
val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition)
assertEquals(None, fetcher, "No fetcher should be started when leader is not in cluster image")

} finally {
replicaManager.shutdown(checkpointHW = false)
}
}

@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testDeltaFollowerToNotReplica(enableRemoteStorage: Boolean): Unit = {
Expand Down