Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Remove UT
  • Loading branch information
geek-bit committed Dec 11, 2025
commit 791a9c17629839a6632a033c20f6d37d9dfe57fe
42 changes: 0 additions & 42 deletions core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4629,48 +4629,6 @@ class ReplicaManagerTest {
}
}

@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testDeltaFollowerWhenLeaderNotInClusterImage(enableRemoteStorage: Boolean): Unit = {
val localId = 1
val leaderId = 99
val topicPartition = new TopicPartition("foo", 0)
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, enableRemoteStorage = enableRemoteStorage)

try {
// Create a delta where localId is follower and leaderId (99) is the leader
// The leader ID 99 does NOT exist in ClusterImageTest.IMAGE1
val delta = new TopicsDelta(TopicsImage.EMPTY)
delta.replay(new TopicRecord().setName("foo").setTopicId(FOO_UUID))
delta.replay(new PartitionRecord()
.setPartitionId(0)
.setTopicId(FOO_UUID)
.setReplicas(util.Arrays.asList(localId, leaderId))
.setIsr(util.Arrays.asList(localId, leaderId))
.setLeader(leaderId) // Leader is broker 99
.setLeaderEpoch(0)
.setPartitionEpoch(0))

// Use the standard cluster image which does NOT contain broker 99
val metadataImage = imageFromTopics(delta.apply())

replicaManager.applyDelta(delta, metadataImage)

// Verify the partition was created and is a follower
val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition)
assertFalse(followerPartition.isLeader)
assertEquals(0, followerPartition.getLeaderEpoch)
assertEquals(Some(leaderId), followerPartition.leaderReplicaIdOpt)

// Verify no fetcher was started since the leader (99) is not in the cluster image
val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition)
assertEquals(None, fetcher, "No fetcher should be started when leader is not in cluster image")

} finally {
replicaManager.shutdown(checkpointHW = false)
}
}

@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testDeltaFollowerToNotReplica(enableRemoteStorage: Boolean): Unit = {
Expand Down