diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala index f10b769d9c12f..aced661a641ae 100644 --- a/core/src/main/scala/kafka/server/ControllerApis.scala +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -1059,7 +1059,10 @@ class ControllerApis( def handleDescribeCluster(request: RequestChannel.Request): CompletableFuture[Unit] = { // Nearly all RPCs should check MetadataVersion inside the QuorumController. However, this // RPC is consulting a cache which lives outside the QC. So we check MetadataVersion here. - if (!apiVersionManager.features.metadataVersion().isControllerRegistrationSupported) { + if (apiVersionManager.features.metadataVersion().isEmpty) { + throw new UnsupportedVersionException("There is no finalized MetadataVersion, so " + + "direct-to-controller communication is not supported.") + } else if (!apiVersionManager.features.metadataVersion().get.isControllerRegistrationSupported) { throw new UnsupportedVersionException("Direct-to-controller communication is not " + "supported with the current MetadataVersion.") } diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala index c233d5f45dc41..52bd3ad723296 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala @@ -17,6 +17,7 @@ package kafka.server.metadata +import java.util.Optional import java.util.OptionalInt import kafka.coordinator.transaction.TransactionCoordinator import kafka.log.LogManager @@ -253,7 +254,7 @@ class BrokerMetadataPublisher( if (delta.featuresDelta != null) { try { - val newFinalizedFeatures = new FinalizedFeatures(newImage.features.metadataVersionOrThrow, newImage.features.finalizedVersions, newImage.provenance.lastContainedOffset) + val newFinalizedFeatures = new FinalizedFeatures(Optional.of(newImage.features.metadataVersionOrThrow), newImage.features.finalizedVersions, newImage.provenance.lastContainedOffset) val newFinalizedShareVersion = newFinalizedFeatures.finalizedFeatures().getOrDefault(ShareVersion.FEATURE_NAME, 0.toShort) // Share version feature has been toggled. if (newFinalizedShareVersion != finalizedShareVersion) { diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala index 88b2cf07012a6..d6fe4df68c46a 100644 --- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala +++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala @@ -33,6 +33,7 @@ import org.apache.kafka.metadata.{BrokerRegistration, LeaderAndIsr, MetadataCach import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, MetadataVersion} import java.util +import java.util.Optional import java.util.concurrent.ThreadLocalRandom import java.util.function.{Predicate, Supplier} import java.util.stream.Collectors @@ -471,7 +472,7 @@ class KRaftMetadataCache( finalizedFeatures.put(KRaftVersion.FEATURE_NAME, kraftVersionLevel) } new FinalizedFeatures( - image.features().metadataVersionOrThrow(), + Optional.of(image.features().metadataVersionOrThrow()), finalizedFeatures, image.highestOffsetAndEpoch().offset) } diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala index 79957b01fb77b..77da2f01f73da 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala @@ -86,7 +86,7 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren ).thenReturn(Optional.of(brokerNode)) when(metadataCache.features()).thenReturn { new FinalizedFeatures( - MetadataVersion.latestTesting(), + Optional.of(MetadataVersion.latestTesting()), util.Map.of(TransactionVersion.FEATURE_NAME, TransactionVersion.TV_2.featureLevel()), 0) } diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index f9953bae24ff4..85624a94cfbc2 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -50,6 +50,7 @@ import org.mockito.ArgumentMatchers.{any, anyInt, anyLong, anyShort} import org.mockito.Mockito.{atLeastOnce, mock, reset, times, verify, when} import java.util +import java.util.Optional import scala.collection.{Map, mutable} import scala.jdk.CollectionConverters._ @@ -71,7 +72,7 @@ class TransactionStateManagerTest { when(metadataCache.features()).thenReturn { new FinalizedFeatures( - MetadataVersion.latestTesting(), + Optional.of(MetadataVersion.latestTesting()), util.Map.of(TransactionVersion.FEATURE_NAME, TransactionVersion.TV_2.featureLevel()), 0) } @@ -1359,7 +1360,7 @@ class TransactionStateManagerTest { val metadataCache = mock(classOf[MetadataCache]) when(metadataCache.features()).thenReturn { new FinalizedFeatures( - MetadataVersion.latestTesting(), + Optional.of(MetadataVersion.latestTesting()), util.Map.of(TransactionVersion.FEATURE_NAME, transactionVersion.featureLevel()), 0) } diff --git a/core/src/test/scala/unit/kafka/network/ProcessorTest.scala b/core/src/test/scala/unit/kafka/network/ProcessorTest.scala index 54bbd0bf2018a..91fa339b9bbed 100644 --- a/core/src/test/scala/unit/kafka/network/ProcessorTest.scala +++ b/core/src/test/scala/unit/kafka/network/ProcessorTest.scala @@ -42,7 +42,7 @@ class ProcessorTest { val requestHeader = RequestTestUtils.serializeRequestHeader( new RequestHeader(ApiKeys.INIT_PRODUCER_ID, 0, "clientid", 0)) val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER, true, - () => new FinalizedFeatures(MetadataVersion.latestTesting(), util.Map.of[String, java.lang.Short], 0)) + () => new FinalizedFeatures(Optional.of(MetadataVersion.latestTesting()), util.Map.of[String, java.lang.Short], 0)) val e = assertThrows(classOf[InvalidRequestException], (() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): Executable, "INIT_PRODUCER_ID with listener type CONTROLLER should throw InvalidRequestException exception") diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 6a4b8d8ca672e..e7ed55a3b5886 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -57,6 +57,7 @@ import java.nio.channels.{SelectionKey, SocketChannel} import java.nio.charset.StandardCharsets import java.security.cert.X509Certificate import java.util +import java.util.Optional import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent._ import java.util.{Properties, Random} @@ -85,7 +86,7 @@ class SocketServerTest { TestUtils.clearYammerMetrics() private val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER, true, - () => new FinalizedFeatures(MetadataVersion.latestTesting(), util.Map.of[String, java.lang.Short], 0)) + () => new FinalizedFeatures(Optional.of(MetadataVersion.latestTesting()), util.Map.of[String, java.lang.Short], 0)) var server: SocketServer = _ val sockets = new ArrayBuffer[Socket] diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 3917438abbbc0..92c88114b0fcf 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -185,7 +185,7 @@ class KafkaApisTest extends Logging { val apiVersionManager = new SimpleApiVersionManager( ListenerType.BROKER, true, - () => new FinalizedFeatures(MetadataVersion.latestTesting(), util.Map.of[String, java.lang.Short], 0)) + () => new FinalizedFeatures(Optional.of(MetadataVersion.latestTesting()), util.Map.of[String, java.lang.Short], 0)) setupFeatures(featureVersions) @@ -220,7 +220,7 @@ class KafkaApisTest extends Logging { when(metadataCache.features()).thenReturn { new FinalizedFeatures( - MetadataVersion.latestTesting, + Optional.of(MetadataVersion.latestTesting), featureVersions.map { featureVersion => featureVersion.featureName -> featureVersion.featureLevel.asInstanceOf[java.lang.Short] }.toMap.asJava, diff --git a/metadata/src/main/java/org/apache/kafka/metadata/publisher/FeaturesPublisher.java b/metadata/src/main/java/org/apache/kafka/metadata/publisher/FeaturesPublisher.java index 9ccddbe810966..5907753781eb1 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/publisher/FeaturesPublisher.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/publisher/FeaturesPublisher.java @@ -27,13 +27,13 @@ import org.slf4j.Logger; -import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION; +import java.util.Optional; public class FeaturesPublisher implements MetadataPublisher { private final Logger log; private final FaultHandler faultHandler; - private volatile FinalizedFeatures finalizedFeatures = FinalizedFeatures.fromKRaftVersion(MINIMUM_VERSION); + private volatile FinalizedFeatures finalizedFeatures = FinalizedFeatures.UNKNOWN_FINALIZED_FEATURES; public FeaturesPublisher( LogContext logContext, @@ -60,7 +60,8 @@ public void onMetadataUpdate( ) { try { if (delta.featuresDelta() != null) { - FinalizedFeatures newFinalizedFeatures = new FinalizedFeatures(newImage.features().metadataVersionOrThrow(), + FinalizedFeatures newFinalizedFeatures = new FinalizedFeatures( + Optional.of(newImage.features().metadataVersionOrThrow()), newImage.features().finalizedVersions(), newImage.provenance().lastContainedOffset() ); diff --git a/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java b/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java index 9133f0db79853..3fc8f2ad30024 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java @@ -19,25 +19,30 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.Optional; public record FinalizedFeatures( - MetadataVersion metadataVersion, + Optional metadataVersion, Map finalizedFeatures, long finalizedFeaturesEpoch ) { + public static final FinalizedFeatures UNKNOWN_FINALIZED_FEATURES = + new FinalizedFeatures(Optional.empty(), Map.of(), -1); + public static FinalizedFeatures fromKRaftVersion(MetadataVersion version) { - return new FinalizedFeatures(version, Map.of(), -1); + return new FinalizedFeatures(Optional.of(version), Map.of(), -1); } public FinalizedFeatures( - MetadataVersion metadataVersion, + Optional metadataVersion, Map finalizedFeatures, long finalizedFeaturesEpoch ) { this.metadataVersion = Objects.requireNonNull(metadataVersion); this.finalizedFeatures = new HashMap<>(finalizedFeatures); this.finalizedFeaturesEpoch = finalizedFeaturesEpoch; - this.finalizedFeatures.put(MetadataVersion.FEATURE_NAME, metadataVersion.featureLevel()); + metadataVersion.ifPresent(mv -> + this.finalizedFeatures.put(MetadataVersion.FEATURE_NAME, mv.featureLevel())); } public FinalizedFeatures setFinalizedLevel(String key, short level) { diff --git a/server-common/src/test/java/org/apache/kafka/server/common/FinalizedFeaturesTest.java b/server-common/src/test/java/org/apache/kafka/server/common/FinalizedFeaturesTest.java index 1c4c9547cf07f..23d2096439808 100644 --- a/server-common/src/test/java/org/apache/kafka/server/common/FinalizedFeaturesTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/common/FinalizedFeaturesTest.java @@ -20,6 +20,7 @@ import org.junit.jupiter.api.Test; import java.util.Map; +import java.util.Optional; import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME; import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION; @@ -29,7 +30,7 @@ class FinalizedFeaturesTest { @Test public void testKRaftModeFeatures() { - FinalizedFeatures finalizedFeatures = new FinalizedFeatures(MINIMUM_VERSION, + FinalizedFeatures finalizedFeatures = new FinalizedFeatures(Optional.of(MINIMUM_VERSION), Map.of("foo", (short) 2), 123); assertEquals(MINIMUM_VERSION.featureLevel(), finalizedFeatures.finalizedFeatures().get(FEATURE_NAME)); @@ -41,7 +42,7 @@ public void testKRaftModeFeatures() { @Test public void testSetFinalizedLevel() { FinalizedFeatures finalizedFeatures = new FinalizedFeatures( - MINIMUM_VERSION, + Optional.of(MINIMUM_VERSION), Map.of("foo", (short) 2), 123 );