From f2c8ced6ac8aefdfe29f91c2eecf72d8027853e7 Mon Sep 17 00:00:00 2001 From: pawandeep Date: Tue, 9 Dec 2025 06:33:52 +0530 Subject: [PATCH 1/4] KAFKA-19331: Fix misleading log messages when follower's leader is not in metadata image --- .../scala/kafka/server/ReplicaManager.scala | 16 +++++-- .../kafka/server/ReplicaManagerTest.scala | 42 +++++++++++++++++++ 2 files changed, 55 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 4ee24f2e41428..57c8c35561b2f 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -2520,6 +2520,7 @@ class ReplicaManager(val config: KafkaConfig, val listenerName = config.interBrokerListenerName.value val partitionAndOffsets = new mutable.HashMap[TopicPartition, InitialFetchState] + val partitionsWithoutLeader = new mutable.HashMap[TopicPartition, Option[Int]] partitionsToStartFetching.foreachEntry { (topicPartition, partition) => val nodeOpt = partition.leaderReplicaIdOpt @@ -2536,13 +2537,22 @@ class ReplicaManager(val config: KafkaConfig, initialFetchOffset(log) )) case None => - stateChangeLogger.trace(s"Unable to start fetching $topicPartition with topic ID ${partition.topicId} " + - s"from leader ${partition.leaderReplicaIdOpt} because it is not alive.") + partitionsWithoutLeader.put(topicPartition, partition.leaderReplicaIdOpt) + } + } + + // Log partitions whose leader is not alive in the metadata image. + if (partitionsWithoutLeader.nonEmpty) { + partitionsWithoutLeader.foreachEntry { (topicPartition, leaderIdOpt) => + stateChangeLogger.warn(s"Unable to start fetching $topicPartition " + + s"from leader $leaderIdOpt because it is not alive.") } } replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets) - stateChangeLogger.info(s"Started fetchers as part of become-follower for ${partitionsToStartFetching.size} partitions") + if (partitionAndOffsets.nonEmpty) { + stateChangeLogger.info(s"Started fetchers as part of become-follower for ${partitionAndOffsets.size} partitions") + } partitionsToStartFetching.foreach{ case (topicPartition, partition) => completeDelayedOperationsWhenNotPartitionLeader(topicPartition, partition.topicId)} diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 753b831b594df..1cd5998789ed6 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -4629,6 +4629,48 @@ class ReplicaManagerTest { } } + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testDeltaFollowerWhenLeaderNotInClusterImage(enableRemoteStorage: Boolean): Unit = { + val localId = 1 + val leaderId = 99 + val topicPartition = new TopicPartition("foo", 0) + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, enableRemoteStorage = enableRemoteStorage) + + try { + // Create a delta where localId is follower and leaderId (99) is the leader + // The leader ID 99 does NOT exist in ClusterImageTest.IMAGE1 + val delta = new TopicsDelta(TopicsImage.EMPTY) + delta.replay(new TopicRecord().setName("foo").setTopicId(FOO_UUID)) + delta.replay(new PartitionRecord() + .setPartitionId(0) + .setTopicId(FOO_UUID) + .setReplicas(util.Arrays.asList(localId, leaderId)) + .setIsr(util.Arrays.asList(localId, leaderId)) + .setLeader(leaderId) // Leader is broker 99 + .setLeaderEpoch(0) + .setPartitionEpoch(0)) + + // Use the standard cluster image which does NOT contain broker 99 + val metadataImage = imageFromTopics(delta.apply()) + + replicaManager.applyDelta(delta, metadataImage) + + // Verify the partition was created and is a follower + val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition) + assertFalse(followerPartition.isLeader) + assertEquals(0, followerPartition.getLeaderEpoch) + assertEquals(Some(leaderId), followerPartition.leaderReplicaIdOpt) + + // Verify no fetcher was started since the leader (99) is not in the cluster image + val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition) + assertEquals(None, fetcher, "No fetcher should be started when leader is not in cluster image") + + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + @ParameterizedTest @ValueSource(booleans = Array(true, false)) def testDeltaFollowerToNotReplica(enableRemoteStorage: Boolean): Unit = { From 4bfb5c25b5372fbf3c9f17f15e9a7512367d3069 Mon Sep 17 00:00:00 2001 From: pawandeep Date: Tue, 9 Dec 2025 06:46:40 +0530 Subject: [PATCH 2/4] Minor refactoring --- .../src/main/scala/kafka/server/ReplicaManager.scala | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 57c8c35561b2f..924dea795c397 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -2520,7 +2520,6 @@ class ReplicaManager(val config: KafkaConfig, val listenerName = config.interBrokerListenerName.value val partitionAndOffsets = new mutable.HashMap[TopicPartition, InitialFetchState] - val partitionsWithoutLeader = new mutable.HashMap[TopicPartition, Option[Int]] partitionsToStartFetching.foreachEntry { (topicPartition, partition) => val nodeOpt = partition.leaderReplicaIdOpt @@ -2537,15 +2536,8 @@ class ReplicaManager(val config: KafkaConfig, initialFetchOffset(log) )) case None => - partitionsWithoutLeader.put(topicPartition, partition.leaderReplicaIdOpt) - } - } - - // Log partitions whose leader is not alive in the metadata image. - if (partitionsWithoutLeader.nonEmpty) { - partitionsWithoutLeader.foreachEntry { (topicPartition, leaderIdOpt) => - stateChangeLogger.warn(s"Unable to start fetching $topicPartition " + - s"from leader $leaderIdOpt because it is not alive.") + stateChangeLogger.trace(s"Unable to start fetching $topicPartition " + + s"from leader ${partition.leaderReplicaIdOpt} because it is not alive.") } } From 1f6efc90eb1f407d7e1c1692598a70ff3a52d638 Mon Sep 17 00:00:00 2001 From: pawandeep Date: Thu, 11 Dec 2025 05:32:21 +0530 Subject: [PATCH 3/4] nit --- core/src/main/scala/kafka/server/ReplicaManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 924dea795c397..777df3b125c22 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -2536,7 +2536,7 @@ class ReplicaManager(val config: KafkaConfig, initialFetchOffset(log) )) case None => - stateChangeLogger.trace(s"Unable to start fetching $topicPartition " + + stateChangeLogger.trace(s"Unable to start fetching $topicPartition with topic ID ${partition.topicId} " + s"from leader ${partition.leaderReplicaIdOpt} because it is not alive.") } } From 791a9c17629839a6632a033c20f6d37d9dfe57fe Mon Sep 17 00:00:00 2001 From: pawandeep Date: Thu, 11 Dec 2025 05:37:47 +0530 Subject: [PATCH 4/4] Remove UT --- .../kafka/server/ReplicaManagerTest.scala | 42 ------------------- 1 file changed, 42 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 1cd5998789ed6..753b831b594df 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -4629,48 +4629,6 @@ class ReplicaManagerTest { } } - @ParameterizedTest - @ValueSource(booleans = Array(true, false)) - def testDeltaFollowerWhenLeaderNotInClusterImage(enableRemoteStorage: Boolean): Unit = { - val localId = 1 - val leaderId = 99 - val topicPartition = new TopicPartition("foo", 0) - val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, enableRemoteStorage = enableRemoteStorage) - - try { - // Create a delta where localId is follower and leaderId (99) is the leader - // The leader ID 99 does NOT exist in ClusterImageTest.IMAGE1 - val delta = new TopicsDelta(TopicsImage.EMPTY) - delta.replay(new TopicRecord().setName("foo").setTopicId(FOO_UUID)) - delta.replay(new PartitionRecord() - .setPartitionId(0) - .setTopicId(FOO_UUID) - .setReplicas(util.Arrays.asList(localId, leaderId)) - .setIsr(util.Arrays.asList(localId, leaderId)) - .setLeader(leaderId) // Leader is broker 99 - .setLeaderEpoch(0) - .setPartitionEpoch(0)) - - // Use the standard cluster image which does NOT contain broker 99 - val metadataImage = imageFromTopics(delta.apply()) - - replicaManager.applyDelta(delta, metadataImage) - - // Verify the partition was created and is a follower - val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition) - assertFalse(followerPartition.isLeader) - assertEquals(0, followerPartition.getLeaderEpoch) - assertEquals(Some(leaderId), followerPartition.leaderReplicaIdOpt) - - // Verify no fetcher was started since the leader (99) is not in the cluster image - val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition) - assertEquals(None, fetcher, "No fetcher should be started when leader is not in cluster image") - - } finally { - replicaManager.shutdown(checkpointHW = false) - } - } - @ParameterizedTest @ValueSource(booleans = Array(true, false)) def testDeltaFollowerToNotReplica(enableRemoteStorage: Boolean): Unit = {