-
Notifications
You must be signed in to change notification settings - Fork 14.8k
KAFKA-19964: API Versions Response returns minimum metadata version for finalized level when no quorum exists #21122
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
kevin-wu24
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes @mannoopj. Left a review of src/main. Can we also add a FeaturesPublisher unit test to verify this new behavior?
| @Override | ||
| public FinalizedFeatures features() { | ||
| return featuresProvider.get(); | ||
| return featuresProvider.get().orElse(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should avoid ever returning null, because this is not safe. Instead, we can change the ApiVersionManager interface's features() method to return an Optional<FinalizedFeatures>. The implementors can return the wrapped optional, and check things based on its presence.
For example, ControllerApis#handleDescribeCluster can now return a more useful error message if the optional is not present. We can also add a test for that in ControllerApisTest.
| () => featuresPublisher.features().setFinalizedLevel( | ||
| KRaftVersion.FEATURE_NAME, | ||
| raftManager.client.kraftVersion().featureLevel()) | ||
| () => featuresPublisher.features().map(f => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a subtlety here with kraft.version. Previously, featuresPublisher.features was always "present", so we can always set the "finalized" kraft version known by the local KafkaRaftClient. However, now the featurePublisher.features() might be an empty optional, in which case the function in the .map call won't be applied until after featuresPublisher.features().isPresent().
This means if a controller node has kraft.version=1, but has not committed the bootstrap metadata version record (i.e. featuresPublisher.features().isEmpty()), the controller will not show that it is kraft.version=1 in the ApiVersionsResponse it sends, even though it previously would have.
I think the intention is, for better or for worse, that the "finalizedFeatures" section of the ApiVersionsResponse to show the local node's kraft.version level, even if it may be uncommitted. Is that correct @jsancio?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If so, one way to fix this is to add a NOT_FINALIZED enum value to MetadataVersion. This option makes the most sense to me, but it is weird that FinalizedFeatures has a NOT_FINALIZED value for metadata version...
Or we can make FinalizedFeatures#metadataVersion field an optional, and keep it so that FinalizedFeatures does not have to be an optional (although this seems confusing when the object is called "FinalizedFeatures").
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also it is weird we have FinalizedFeatures and FinalizedControllerFeatures records.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this behavior is not intentional: KAFKA-18865
| .setSupportedFeatures(brokerFeatures) | ||
| .setFinalizedFeatures(currentFeatures.finalizedFeatures()) | ||
| .setFinalizedFeaturesEpoch(currentFeatures.finalizedFeaturesEpoch()) | ||
| .setFinalizedFeatures(currentFeatures.map(FinalizedFeatures::finalizedFeatures).orElse(Map.of())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, if we pass the kraft.version information here as a Supplier, we can set the finalized features field with the kraft.version here whenever currentFeatures.isEmpty() (i.e. FeaturesPublisher#finalizedFeatures.isEmpty()).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val apiVersionManager = new SimpleApiVersionManager(
ListenerType.CONTROLLER,
config.unstableApiVersionsEnabled,
() => featuresPublisher.features().setFinalizedLevel(
KRaftVersion.FEATURE_NAME,
raftManager.client.kraftVersion().featureLevel())
)
This code in ControllerServer.scala already adds the kraft.version. It might not appear in the output if kraft.version is 0 and hence gets filtered out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the only other places SimpleApiVersionManager gets called are from tests and KRaftMetadataRequestBenchmark so changing the default to FinalizedFeatures.UNKNOWN_FINALIZED_FEATURES shouldn't change any behaviors here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if we want to expose the latest committed kraft version, we should do it in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code in ControllerServer.scala already adds the kraft.version. It might not appear in the output if kraft.version is 0 and hence gets filtered out.
Yes but the current code will expose kraft.version=1 in the APIVersionsResponse even if it has not been committed yet. We can slightly improve on this by not adding the kraft.version if the metadata.version does not exist, but the feature's value in the API versions may still be uncommitted when doing upgrade.
kevin-wu24
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes @mannoopj . Left another review of src/main.
Can we add the suggestion from: #21122 (comment)? That way we can call setFinalizedFeatureLevel for KRaft in the APIVersionsResponse only when the FinalizedFeatures is is not "empty."
| private final Logger log; | ||
| private final FaultHandler faultHandler; | ||
| private volatile FinalizedFeatures finalizedFeatures = FinalizedFeatures.fromKRaftVersion(MINIMUM_VERSION); | ||
| private volatile FinalizedFeatures finalizedFeatures = new FinalizedFeatures( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make new FinalizedFeatures(Optional.empty()...) a static constant in FinalizedFeatures. Maybe something like UNKNOWN_FINALIZED_FEATURES.
| // Nearly all RPCs should check MetadataVersion inside the QuorumController. However, this | ||
| // RPC is consulting a cache which lives outside the QC. So we check MetadataVersion here. | ||
| if (!apiVersionManager.features.metadataVersion().isControllerRegistrationSupported) { | ||
| if (!apiVersionManager.features.metadataVersion().map(_.isControllerRegistrationSupported).orElse(false)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better if we could return a more accurate error message in the case where metadataVersion().isEmpty(). Returning a message like "There is no finalized MetadataVersion, so direct-to-controller communication is not supported" is more accurate in this case.
| "supported with the current MetadataVersion.") | ||
| if (!apiVersionManager.features.metadataVersion().map(_.isControllerRegistrationSupported).orElse(false)) { | ||
| throw new UnsupportedVersionException("There is no finalized MetadataVersion, so " + | ||
| "direct-to-controller communication is not supported.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make a separate if/else if clause for if the metadata version is not finalized. We should not change this existing check.
| .setSupportedFeatures(brokerFeatures) | ||
| .setFinalizedFeatures(currentFeatures.finalizedFeatures()) | ||
| .setFinalizedFeaturesEpoch(currentFeatures.finalizedFeaturesEpoch()) | ||
| .setFinalizedFeatures(currentFeatures.map(FinalizedFeatures::finalizedFeatures).orElse(Map.of())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if we want to expose the latest committed kraft version, we should do it in a separate PR.
FeaturesPublisher's default finalizedFeatures assumes a metadata version of 7 when there is no finalized level for metadata.version. Instead of defaulting to 7, it should not report a value or report -1 indicating it is unknown.
See https://lists.apache.org/thread/ytmo7b9vmy46n6p47l1sxx6ftvdhht5b for the context as part of the KIP-1170 discussion.
Implemented by changing
finalizedFeaturesto Optional inFeaturesPublisher