Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion core/src/main/scala/kafka/server/ControllerApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1059,7 +1059,10 @@ class ControllerApis(
def handleDescribeCluster(request: RequestChannel.Request): CompletableFuture[Unit] = {
// Nearly all RPCs should check MetadataVersion inside the QuorumController. However, this
// RPC is consulting a cache which lives outside the QC. So we check MetadataVersion here.
if (!apiVersionManager.features.metadataVersion().isControllerRegistrationSupported) {
if (apiVersionManager.features.metadataVersion().isEmpty) {
throw new UnsupportedVersionException("There is no finalized MetadataVersion, so " +
"direct-to-controller communication is not supported.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make a separate if/else if clause for if the metadata version is not finalized. We should not change this existing check.

} else if (!apiVersionManager.features.metadataVersion().get.isControllerRegistrationSupported) {
throw new UnsupportedVersionException("Direct-to-controller communication is not " +
"supported with the current MetadataVersion.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package kafka.server.metadata

import java.util.Optional
import java.util.OptionalInt
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.log.LogManager
Expand Down Expand Up @@ -253,7 +254,7 @@ class BrokerMetadataPublisher(

if (delta.featuresDelta != null) {
try {
val newFinalizedFeatures = new FinalizedFeatures(newImage.features.metadataVersionOrThrow, newImage.features.finalizedVersions, newImage.provenance.lastContainedOffset)
val newFinalizedFeatures = new FinalizedFeatures(Optional.of(newImage.features.metadataVersionOrThrow), newImage.features.finalizedVersions, newImage.provenance.lastContainedOffset)
val newFinalizedShareVersion = newFinalizedFeatures.finalizedFeatures().getOrDefault(ShareVersion.FEATURE_NAME, 0.toShort)
// Share version feature has been toggled.
if (newFinalizedShareVersion != finalizedShareVersion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.kafka.metadata.{BrokerRegistration, LeaderAndIsr, MetadataCach
import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, MetadataVersion}

import java.util
import java.util.Optional
import java.util.concurrent.ThreadLocalRandom
import java.util.function.{Predicate, Supplier}
import java.util.stream.Collectors
Expand Down Expand Up @@ -471,7 +472,7 @@ class KRaftMetadataCache(
finalizedFeatures.put(KRaftVersion.FEATURE_NAME, kraftVersionLevel)
}
new FinalizedFeatures(
image.features().metadataVersionOrThrow(),
Optional.of(image.features().metadataVersionOrThrow()),
finalizedFeatures,
image.highestOffsetAndEpoch().offset)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
).thenReturn(Optional.of(brokerNode))
when(metadataCache.features()).thenReturn {
new FinalizedFeatures(
MetadataVersion.latestTesting(),
Optional.of(MetadataVersion.latestTesting()),
util.Map.of(TransactionVersion.FEATURE_NAME, TransactionVersion.TV_2.featureLevel()),
0)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import org.mockito.ArgumentMatchers.{any, anyInt, anyLong, anyShort}
import org.mockito.Mockito.{atLeastOnce, mock, reset, times, verify, when}

import java.util
import java.util.Optional
import scala.collection.{Map, mutable}
import scala.jdk.CollectionConverters._

Expand All @@ -71,7 +72,7 @@ class TransactionStateManagerTest {

when(metadataCache.features()).thenReturn {
new FinalizedFeatures(
MetadataVersion.latestTesting(),
Optional.of(MetadataVersion.latestTesting()),
util.Map.of(TransactionVersion.FEATURE_NAME, TransactionVersion.TV_2.featureLevel()),
0)
}
Expand Down Expand Up @@ -1359,7 +1360,7 @@ class TransactionStateManagerTest {
val metadataCache = mock(classOf[MetadataCache])
when(metadataCache.features()).thenReturn {
new FinalizedFeatures(
MetadataVersion.latestTesting(),
Optional.of(MetadataVersion.latestTesting()),
util.Map.of(TransactionVersion.FEATURE_NAME, transactionVersion.featureLevel()),
0)
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/network/ProcessorTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class ProcessorTest {
val requestHeader = RequestTestUtils.serializeRequestHeader(
new RequestHeader(ApiKeys.INIT_PRODUCER_ID, 0, "clientid", 0))
val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER, true,
() => new FinalizedFeatures(MetadataVersion.latestTesting(), util.Map.of[String, java.lang.Short], 0))
() => new FinalizedFeatures(Optional.of(MetadataVersion.latestTesting()), util.Map.of[String, java.lang.Short], 0))
val e = assertThrows(classOf[InvalidRequestException],
(() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): Executable,
"INIT_PRODUCER_ID with listener type CONTROLLER should throw InvalidRequestException exception")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import java.nio.channels.{SelectionKey, SocketChannel}
import java.nio.charset.StandardCharsets
import java.security.cert.X509Certificate
import java.util
import java.util.Optional
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent._
import java.util.{Properties, Random}
Expand Down Expand Up @@ -85,7 +86,7 @@ class SocketServerTest {
TestUtils.clearYammerMetrics()

private val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER, true,
() => new FinalizedFeatures(MetadataVersion.latestTesting(), util.Map.of[String, java.lang.Short], 0))
() => new FinalizedFeatures(Optional.of(MetadataVersion.latestTesting()), util.Map.of[String, java.lang.Short], 0))
var server: SocketServer = _
val sockets = new ArrayBuffer[Socket]

Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class KafkaApisTest extends Logging {
val apiVersionManager = new SimpleApiVersionManager(
ListenerType.BROKER,
true,
() => new FinalizedFeatures(MetadataVersion.latestTesting(), util.Map.of[String, java.lang.Short], 0))
() => new FinalizedFeatures(Optional.of(MetadataVersion.latestTesting()), util.Map.of[String, java.lang.Short], 0))

setupFeatures(featureVersions)

Expand Down Expand Up @@ -220,7 +220,7 @@ class KafkaApisTest extends Logging {

when(metadataCache.features()).thenReturn {
new FinalizedFeatures(
MetadataVersion.latestTesting,
Optional.of(MetadataVersion.latestTesting),
featureVersions.map { featureVersion =>
featureVersion.featureName -> featureVersion.featureLevel.asInstanceOf[java.lang.Short]
}.toMap.asJava,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@

import org.slf4j.Logger;

import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION;
import java.util.Optional;


public class FeaturesPublisher implements MetadataPublisher {
private final Logger log;
private final FaultHandler faultHandler;
private volatile FinalizedFeatures finalizedFeatures = FinalizedFeatures.fromKRaftVersion(MINIMUM_VERSION);
private volatile FinalizedFeatures finalizedFeatures = FinalizedFeatures.UNKNOWN_FINALIZED_FEATURES;

public FeaturesPublisher(
LogContext logContext,
Expand All @@ -60,7 +60,8 @@ public void onMetadataUpdate(
) {
try {
if (delta.featuresDelta() != null) {
FinalizedFeatures newFinalizedFeatures = new FinalizedFeatures(newImage.features().metadataVersionOrThrow(),
FinalizedFeatures newFinalizedFeatures = new FinalizedFeatures(
Optional.of(newImage.features().metadataVersionOrThrow()),
newImage.features().finalizedVersions(),
newImage.provenance().lastContainedOffset()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,30 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

public record FinalizedFeatures(
MetadataVersion metadataVersion,
Optional<MetadataVersion> metadataVersion,
Map<String, Short> finalizedFeatures,
long finalizedFeaturesEpoch
) {
public static final FinalizedFeatures UNKNOWN_FINALIZED_FEATURES =
new FinalizedFeatures(Optional.empty(), Map.of(), -1);

public static FinalizedFeatures fromKRaftVersion(MetadataVersion version) {
return new FinalizedFeatures(version, Map.of(), -1);
return new FinalizedFeatures(Optional.of(version), Map.of(), -1);
}

public FinalizedFeatures(
MetadataVersion metadataVersion,
Optional<MetadataVersion> metadataVersion,
Map<String, Short> finalizedFeatures,
long finalizedFeaturesEpoch
) {
this.metadataVersion = Objects.requireNonNull(metadataVersion);
this.finalizedFeatures = new HashMap<>(finalizedFeatures);
this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
this.finalizedFeatures.put(MetadataVersion.FEATURE_NAME, metadataVersion.featureLevel());
metadataVersion.ifPresent(mv ->
this.finalizedFeatures.put(MetadataVersion.FEATURE_NAME, mv.featureLevel()));
}

public FinalizedFeatures setFinalizedLevel(String key, short level) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.junit.jupiter.api.Test;

import java.util.Map;
import java.util.Optional;

import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION;
Expand All @@ -29,7 +30,7 @@
class FinalizedFeaturesTest {
@Test
public void testKRaftModeFeatures() {
FinalizedFeatures finalizedFeatures = new FinalizedFeatures(MINIMUM_VERSION,
FinalizedFeatures finalizedFeatures = new FinalizedFeatures(Optional.of(MINIMUM_VERSION),
Map.of("foo", (short) 2), 123);
assertEquals(MINIMUM_VERSION.featureLevel(),
finalizedFeatures.finalizedFeatures().get(FEATURE_NAME));
Expand All @@ -41,7 +42,7 @@ public void testKRaftModeFeatures() {
@Test
public void testSetFinalizedLevel() {
FinalizedFeatures finalizedFeatures = new FinalizedFeatures(
MINIMUM_VERSION,
Optional.of(MINIMUM_VERSION),
Map.of("foo", (short) 2),
123
);
Expand Down