Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,10 @@ class ControllerServer(
val apiVersionManager = new SimpleApiVersionManager(
ListenerType.CONTROLLER,
config.unstableApiVersionsEnabled,
() => featuresPublisher.features().setFinalizedLevel(
KRaftVersion.FEATURE_NAME,
raftManager.client.kraftVersion().featureLevel())
() => featuresPublisher.features().map(f =>
Copy link
Contributor

@kevin-wu24 kevin-wu24 Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a subtlety here with kraft.version. Previously, featuresPublisher.features was always "present", so we can always set the "finalized" kraft version known by the local KafkaRaftClient. However, now the featurePublisher.features() might be an empty optional, in which case the function in the .map call won't be applied until after featuresPublisher.features().isPresent().

This means if a controller node has kraft.version=1, but has not committed the bootstrap metadata version record (i.e. featuresPublisher.features().isEmpty()), the controller will not show that it is kraft.version=1 in the ApiVersionsResponse it sends, even though it previously would have.

I think the intention is, for better or for worse, that the "finalizedFeatures" section of the ApiVersionsResponse to show the local node's kraft.version level, even if it may be uncommitted. Is that correct @jsancio?

Copy link
Contributor

@kevin-wu24 kevin-wu24 Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, one way to fix this is to add a NOT_FINALIZED enum value to MetadataVersion. This option makes the most sense to me, but it is weird that FinalizedFeatures has a NOT_FINALIZED value for metadata version...

Or we can make FinalizedFeatures#metadataVersion field an optional, and keep it so that FinalizedFeatures does not have to be an optional (although this seems confusing when the object is called "FinalizedFeatures").

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also it is weird we have FinalizedFeatures and FinalizedControllerFeatures records.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this behavior is not intentional: KAFKA-18865

f.setFinalizedLevel(
KRaftVersion.FEATURE_NAME,
raftManager.client.kraftVersion().featureLevel()))
)

// metrics will be set to null when closing a controller, so we should recreate it for testing
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/tools/TestRaftServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package kafka.tools

import java.net.InetSocketAddress
import java.util.Optional
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import java.util.concurrent.{CompletableFuture, CountDownLatch, LinkedBlockingDeque, TimeUnit}
import joptsimple.{OptionException, OptionSpec}
Expand Down Expand Up @@ -83,7 +84,7 @@ class TestRaftServer(
val apiVersionManager = new SimpleApiVersionManager(
ListenerType.CONTROLLER,
true,
() => FinalizedFeatures.fromKRaftVersion(MetadataVersion.MINIMUM_VERSION))
() => Optional.of(FinalizedFeatures.fromKRaftVersion(MetadataVersion.MINIMUM_VERSION)))
socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)

val endpoints = Endpoints.fromInetSocketAddresses(
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/network/ProcessorTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class ProcessorTest {
val requestHeader = RequestTestUtils.serializeRequestHeader(
new RequestHeader(ApiKeys.INIT_PRODUCER_ID, 0, "clientid", 0))
val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER, true,
() => new FinalizedFeatures(MetadataVersion.latestTesting(), util.Map.of[String, java.lang.Short], 0))
() => Optional.of(new FinalizedFeatures(MetadataVersion.latestTesting(), util.Map.of[String, java.lang.Short], 0)))
val e = assertThrows(classOf[InvalidRequestException],
(() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): Executable,
"INIT_PRODUCER_ID with listener type CONTROLLER should throw InvalidRequestException exception")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import java.nio.channels.{SelectionKey, SocketChannel}
import java.nio.charset.StandardCharsets
import java.security.cert.X509Certificate
import java.util
import java.util.Optional
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent._
import java.util.{Properties, Random}
Expand Down Expand Up @@ -85,7 +86,7 @@ class SocketServerTest {
TestUtils.clearYammerMetrics()

private val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER, true,
() => new FinalizedFeatures(MetadataVersion.latestTesting(), util.Map.of[String, java.lang.Short], 0))
() => Optional.of(new FinalizedFeatures(MetadataVersion.latestTesting(), util.Map.of[String, java.lang.Short], 0)))
var server: SocketServer = _
val sockets = new ArrayBuffer[Socket]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class ControllerApisTest {
new SimpleApiVersionManager(
ListenerType.CONTROLLER,
true,
() => FinalizedFeatures.fromKRaftVersion(MetadataVersion.latestTesting())),
() => Optional.of(FinalizedFeatures.fromKRaftVersion(MetadataVersion.latestTesting()))),
metadataCache
)
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class KafkaApisTest extends Logging {
val apiVersionManager = new SimpleApiVersionManager(
ListenerType.BROKER,
true,
() => new FinalizedFeatures(MetadataVersion.latestTesting(), util.Map.of[String, java.lang.Short], 0))
() => Optional.of(new FinalizedFeatures(MetadataVersion.latestTesting(), util.Map.of[String, java.lang.Short], 0)))

setupFeatures(featureVersions)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ private KafkaApis createKafkaApis() {
setApiVersionManager(new SimpleApiVersionManager(
ApiMessageType.ListenerType.BROKER,
false,
() -> FinalizedFeatures.fromKRaftVersion(MetadataVersion.latestTesting()))).
() -> Optional.of(FinalizedFeatures.fromKRaftVersion(MetadataVersion.latestTesting())))).
setGroupConfigManager(groupConfigManager).
build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@

import org.slf4j.Logger;

import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION;
import java.util.Optional;


public class FeaturesPublisher implements MetadataPublisher {
private final Logger log;
private final FaultHandler faultHandler;
private volatile FinalizedFeatures finalizedFeatures = FinalizedFeatures.fromKRaftVersion(MINIMUM_VERSION);
private volatile Optional<FinalizedFeatures> finalizedFeatures = Optional.empty();

public FeaturesPublisher(
LogContext logContext,
Expand All @@ -43,7 +43,7 @@ public FeaturesPublisher(
this.faultHandler = faultHandler;
}

public FinalizedFeatures features() {
public Optional<FinalizedFeatures> features() {
return finalizedFeatures;
}

Expand All @@ -64,9 +64,9 @@ public void onMetadataUpdate(
newImage.features().finalizedVersions(),
newImage.provenance().lastContainedOffset()
);
if (!newFinalizedFeatures.equals(finalizedFeatures)) {
if (!finalizedFeatures.isPresent() || !newFinalizedFeatures.equals(finalizedFeatures.get())) {
log.info("Loaded new metadata {}.", newFinalizedFeatures);
finalizedFeatures = newFinalizedFeatures;
finalizedFeatures = Optional.of(newFinalizedFeatures);
}
}
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.server.common.FinalizedFeatures;

import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;

/**
Expand All @@ -35,19 +37,19 @@ public class SimpleApiVersionManager implements ApiVersionManager {
private final ApiMessageType.ListenerType listenerType;
private final Features<SupportedVersionRange> brokerFeatures;
private final boolean enableUnstableLastVersion;
private final Supplier<FinalizedFeatures> featuresProvider;
private final Supplier<Optional<FinalizedFeatures>> featuresProvider;
private final ApiVersionsResponseData.ApiVersionCollection apiVersions;

/**
* SimpleApiVersionManager constructor
* @param listenerType the listener type
* @param enableUnstableLastVersion whether to enable unstable last version, see
* {@link org.apache.kafka.server.config.ServerConfigs#UNSTABLE_API_VERSIONS_ENABLE_CONFIG}
* @param featuresProvider a provider to the finalized features supported
* @param featuresProvider a provider to the finalized features supported (may return Optional.empty() if not yet initialized)
*/
public SimpleApiVersionManager(ApiMessageType.ListenerType listenerType,
boolean enableUnstableLastVersion,
Supplier<FinalizedFeatures> featuresProvider) {
Supplier<Optional<FinalizedFeatures>> featuresProvider) {
this.listenerType = listenerType;
this.brokerFeatures = BrokerFeatures.defaultSupportedFeatures(enableUnstableLastVersion);
this.enableUnstableLastVersion = enableUnstableLastVersion;
Expand All @@ -67,19 +69,19 @@ public ApiMessageType.ListenerType listenerType() {

@Override
public ApiVersionsResponse apiVersionResponse(int throttleTimeMs, boolean alterFeatureLevel0) {
FinalizedFeatures currentFeatures = features();
Optional<FinalizedFeatures> currentFeatures = featuresProvider.get();
return new ApiVersionsResponse.Builder()
.setThrottleTimeMs(throttleTimeMs)
.setApiVersions(apiVersions)
.setSupportedFeatures(brokerFeatures)
.setFinalizedFeatures(currentFeatures.finalizedFeatures())
.setFinalizedFeaturesEpoch(currentFeatures.finalizedFeaturesEpoch())
.setFinalizedFeatures(currentFeatures.map(FinalizedFeatures::finalizedFeatures).orElse(Map.of()))
Copy link
Contributor

@kevin-wu24 kevin-wu24 Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, if we pass the kraft.version information here as a Supplier, we can set the finalized features field with the kraft.version here whenever currentFeatures.isEmpty() (i.e. FeaturesPublisher#finalizedFeatures.isEmpty()).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val apiVersionManager = new SimpleApiVersionManager(
        ListenerType.CONTROLLER,
        config.unstableApiVersionsEnabled,
        () => featuresPublisher.features().setFinalizedLevel(
          KRaftVersion.FEATURE_NAME,
          raftManager.client.kraftVersion().featureLevel())
      )

This code in ControllerServer.scala already adds the kraft.version. It might not appear in the output if kraft.version is 0 and hence gets filtered out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the only other places SimpleApiVersionManager gets called are from tests and KRaftMetadataRequestBenchmark so changing the default to FinalizedFeatures.UNKNOWN_FINALIZED_FEATURES shouldn't change any behaviors here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we want to expose the latest committed kraft version, we should do it in a separate PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code in ControllerServer.scala already adds the kraft.version. It might not appear in the output if kraft.version is 0 and hence gets filtered out.

Yes but the current code will expose kraft.version=1 in the APIVersionsResponse even if it has not been committed yet. We can slightly improve on this by not adding the kraft.version if the metadata.version does not exist, but the feature's value in the API versions may still be uncommitted when doing upgrade.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.setFinalizedFeaturesEpoch(currentFeatures.map(FinalizedFeatures::finalizedFeaturesEpoch).orElse(-1L))
.setAlterFeatureLevel0(alterFeatureLevel0)
.build();
}

@Override
public FinalizedFeatures features() {
return featuresProvider.get();
return featuresProvider.get().orElse(null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid ever returning null, because this is not safe. Instead, we can change the ApiVersionManager interface's features() method to return an Optional<FinalizedFeatures>. The implementors can return the wrapped optional, and check things based on its presence.

For example, ControllerApis#handleDescribeCluster can now return a more useful error message if the optional is not present. We can also add a test for that in ControllerApisTest.

}
}