Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/ControllerApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1059,7 +1059,7 @@ class ControllerApis(
def handleDescribeCluster(request: RequestChannel.Request): CompletableFuture[Unit] = {
// Nearly all RPCs should check MetadataVersion inside the QuorumController. However, this
// RPC is consulting a cache which lives outside the QC. So we check MetadataVersion here.
if (!apiVersionManager.features.metadataVersion().isControllerRegistrationSupported) {
if (!apiVersionManager.features.metadataVersion().map(_.isControllerRegistrationSupported).orElse(false)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better if we could return a more accurate error message in the case where metadataVersion().isEmpty(). Returning a message like "There is no finalized MetadataVersion, so direct-to-controller communication is not supported" is more accurate in this case.

throw new UnsupportedVersionException("Direct-to-controller communication is not " +
"supported with the current MetadataVersion.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package kafka.server.metadata

import java.util.Optional
import java.util.OptionalInt
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.log.LogManager
Expand Down Expand Up @@ -253,7 +254,7 @@ class BrokerMetadataPublisher(

if (delta.featuresDelta != null) {
try {
val newFinalizedFeatures = new FinalizedFeatures(newImage.features.metadataVersionOrThrow, newImage.features.finalizedVersions, newImage.provenance.lastContainedOffset)
val newFinalizedFeatures = new FinalizedFeatures(Optional.of(newImage.features.metadataVersionOrThrow), newImage.features.finalizedVersions, newImage.provenance.lastContainedOffset)
val newFinalizedShareVersion = newFinalizedFeatures.finalizedFeatures().getOrDefault(ShareVersion.FEATURE_NAME, 0.toShort)
// Share version feature has been toggled.
if (newFinalizedShareVersion != finalizedShareVersion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.kafka.metadata.{BrokerRegistration, LeaderAndIsr, MetadataCach
import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, MetadataVersion}

import java.util
import java.util.Optional
import java.util.concurrent.ThreadLocalRandom
import java.util.function.{Predicate, Supplier}
import java.util.stream.Collectors
Expand Down Expand Up @@ -471,7 +472,7 @@ class KRaftMetadataCache(
finalizedFeatures.put(KRaftVersion.FEATURE_NAME, kraftVersionLevel)
}
new FinalizedFeatures(
image.features().metadataVersionOrThrow(),
Optional.of(image.features().metadataVersionOrThrow()),
finalizedFeatures,
image.highestOffsetAndEpoch().offset)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
).thenReturn(Optional.of(brokerNode))
when(metadataCache.features()).thenReturn {
new FinalizedFeatures(
MetadataVersion.latestTesting(),
Optional.of(MetadataVersion.latestTesting()),
util.Map.of(TransactionVersion.FEATURE_NAME, TransactionVersion.TV_2.featureLevel()),
0)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import org.mockito.ArgumentMatchers.{any, anyInt, anyLong, anyShort}
import org.mockito.Mockito.{atLeastOnce, mock, reset, times, verify, when}

import java.util
import java.util.Optional
import scala.collection.{Map, mutable}
import scala.jdk.CollectionConverters._

Expand All @@ -71,7 +72,7 @@ class TransactionStateManagerTest {

when(metadataCache.features()).thenReturn {
new FinalizedFeatures(
MetadataVersion.latestTesting(),
Optional.of(MetadataVersion.latestTesting()),
util.Map.of(TransactionVersion.FEATURE_NAME, TransactionVersion.TV_2.featureLevel()),
0)
}
Expand Down Expand Up @@ -1359,7 +1360,7 @@ class TransactionStateManagerTest {
val metadataCache = mock(classOf[MetadataCache])
when(metadataCache.features()).thenReturn {
new FinalizedFeatures(
MetadataVersion.latestTesting(),
Optional.of(MetadataVersion.latestTesting()),
util.Map.of(TransactionVersion.FEATURE_NAME, transactionVersion.featureLevel()),
0)
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/network/ProcessorTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class ProcessorTest {
val requestHeader = RequestTestUtils.serializeRequestHeader(
new RequestHeader(ApiKeys.INIT_PRODUCER_ID, 0, "clientid", 0))
val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER, true,
() => new FinalizedFeatures(MetadataVersion.latestTesting(), util.Map.of[String, java.lang.Short], 0))
() => new FinalizedFeatures(Optional.of(MetadataVersion.latestTesting()), util.Map.of[String, java.lang.Short], 0))
val e = assertThrows(classOf[InvalidRequestException],
(() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): Executable,
"INIT_PRODUCER_ID with listener type CONTROLLER should throw InvalidRequestException exception")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import java.nio.channels.{SelectionKey, SocketChannel}
import java.nio.charset.StandardCharsets
import java.security.cert.X509Certificate
import java.util
import java.util.Optional
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent._
import java.util.{Properties, Random}
Expand Down Expand Up @@ -85,7 +86,7 @@ class SocketServerTest {
TestUtils.clearYammerMetrics()

private val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER, true,
() => new FinalizedFeatures(MetadataVersion.latestTesting(), util.Map.of[String, java.lang.Short], 0))
() => new FinalizedFeatures(Optional.of(MetadataVersion.latestTesting()), util.Map.of[String, java.lang.Short], 0))
var server: SocketServer = _
val sockets = new ArrayBuffer[Socket]

Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class KafkaApisTest extends Logging {
val apiVersionManager = new SimpleApiVersionManager(
ListenerType.BROKER,
true,
() => new FinalizedFeatures(MetadataVersion.latestTesting(), util.Map.of[String, java.lang.Short], 0))
() => new FinalizedFeatures(Optional.of(MetadataVersion.latestTesting()), util.Map.of[String, java.lang.Short], 0))

setupFeatures(featureVersions)

Expand Down Expand Up @@ -220,7 +220,7 @@ class KafkaApisTest extends Logging {

when(metadataCache.features()).thenReturn {
new FinalizedFeatures(
MetadataVersion.latestTesting,
Optional.of(MetadataVersion.latestTesting),
featureVersions.map { featureVersion =>
featureVersion.featureName -> featureVersion.featureLevel.asInstanceOf[java.lang.Short]
}.toMap.asJava,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@

import org.slf4j.Logger;

import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION;
import java.util.Map;
import java.util.Optional;


public class FeaturesPublisher implements MetadataPublisher {
private final Logger log;
private final FaultHandler faultHandler;
private volatile FinalizedFeatures finalizedFeatures = FinalizedFeatures.fromKRaftVersion(MINIMUM_VERSION);
private volatile FinalizedFeatures finalizedFeatures = new FinalizedFeatures(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make new FinalizedFeatures(Optional.empty()...) a static constant in FinalizedFeatures. Maybe something like UNKNOWN_FINALIZED_FEATURES.

Optional.empty(), Map.of(), -1);

public FeaturesPublisher(
LogContext logContext,
Expand All @@ -60,7 +62,8 @@ public void onMetadataUpdate(
) {
try {
if (delta.featuresDelta() != null) {
FinalizedFeatures newFinalizedFeatures = new FinalizedFeatures(newImage.features().metadataVersionOrThrow(),
FinalizedFeatures newFinalizedFeatures = new FinalizedFeatures(
Optional.of(newImage.features().metadataVersionOrThrow()),
newImage.features().finalizedVersions(),
newImage.provenance().lastContainedOffset()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,27 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

public record FinalizedFeatures(
MetadataVersion metadataVersion,
Optional<MetadataVersion> metadataVersion,
Map<String, Short> finalizedFeatures,
long finalizedFeaturesEpoch
) {
public static FinalizedFeatures fromKRaftVersion(MetadataVersion version) {
return new FinalizedFeatures(version, Map.of(), -1);
return new FinalizedFeatures(Optional.of(version), Map.of(), -1);
}

public FinalizedFeatures(
MetadataVersion metadataVersion,
Optional<MetadataVersion> metadataVersion,
Map<String, Short> finalizedFeatures,
long finalizedFeaturesEpoch
) {
this.metadataVersion = Objects.requireNonNull(metadataVersion);
this.finalizedFeatures = new HashMap<>(finalizedFeatures);
this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
this.finalizedFeatures.put(MetadataVersion.FEATURE_NAME, metadataVersion.featureLevel());
metadataVersion.ifPresent(mv ->
this.finalizedFeatures.put(MetadataVersion.FEATURE_NAME, mv.featureLevel()));
}

public FinalizedFeatures setFinalizedLevel(String key, short level) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.junit.jupiter.api.Test;

import java.util.Map;
import java.util.Optional;

import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION;
Expand All @@ -29,7 +30,7 @@
class FinalizedFeaturesTest {
@Test
public void testKRaftModeFeatures() {
FinalizedFeatures finalizedFeatures = new FinalizedFeatures(MINIMUM_VERSION,
FinalizedFeatures finalizedFeatures = new FinalizedFeatures(Optional.of(MINIMUM_VERSION),
Map.of("foo", (short) 2), 123);
assertEquals(MINIMUM_VERSION.featureLevel(),
finalizedFeatures.finalizedFeatures().get(FEATURE_NAME));
Expand All @@ -41,7 +42,7 @@ public void testKRaftModeFeatures() {
@Test
public void testSetFinalizedLevel() {
FinalizedFeatures finalizedFeatures = new FinalizedFeatures(
MINIMUM_VERSION,
Optional.of(MINIMUM_VERSION),
Map.of("foo", (short) 2),
123
);
Expand Down