diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientDescribeClusterFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientDescribeClusterFactory.java new file mode 100644 index 0000000000..4c1dd223cc --- /dev/null +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientDescribeClusterFactory.java @@ -0,0 +1,1531 @@ +/* + * Copyright 2021-2023 Aklivity Inc. + * + * Aklivity licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.aklivity.zilla.runtime.binding.kafka.internal.stream; + +import static io.aklivity.zilla.runtime.binding.kafka.internal.types.ProxyAddressProtocol.STREAM; +import static io.aklivity.zilla.runtime.engine.budget.BudgetCreditor.NO_BUDGET_ID; +import static io.aklivity.zilla.runtime.engine.budget.BudgetDebitor.NO_DEBITOR_INDEX; +import static io.aklivity.zilla.runtime.engine.buffer.BufferPool.NO_SLOT; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.LongFunction; +import java.util.function.UnaryOperator; + +import org.agrona.DirectBuffer; +import org.agrona.MutableDirectBuffer; +import org.agrona.collections.LongLongConsumer; +import org.agrona.concurrent.UnsafeBuffer; + +import io.aklivity.zilla.runtime.binding.kafka.config.KafkaSaslConfig; +import io.aklivity.zilla.runtime.binding.kafka.config.KafkaServerConfig; +import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaBinding; +import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaConfiguration; +import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaBindingConfig; +import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaRouteConfig; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.Flyweight; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.OctetsFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.String16FW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.codec.RequestHeaderFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.codec.ResponseHeaderFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.codec.describe_cluster.ClusterBrokerFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.codec.describe_cluster.DescribeClusterRequestFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.codec.describe_cluster.DescribeClusterResponseFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.codec.describe_cluster.DescribeClusterResponsePart2FW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.AbortFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.BeginFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.DataFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.EndFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.ExtensionFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaBeginExFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaDescribeClusterRequestBeginExFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaResetExFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.ProxyBeginExFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.ResetFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.SignalFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.WindowFW; +import io.aklivity.zilla.runtime.engine.EngineContext; +import io.aklivity.zilla.runtime.engine.binding.BindingHandler; +import io.aklivity.zilla.runtime.engine.binding.function.MessageConsumer; +import io.aklivity.zilla.runtime.engine.budget.BudgetDebitor; +import io.aklivity.zilla.runtime.engine.buffer.BufferPool; +import io.aklivity.zilla.runtime.engine.concurrent.Signaler; + +public final class KafkaClientDescribeClusterFactory extends KafkaClientSaslHandshaker implements BindingHandler +{ + private static final int ERROR_NONE = 0; + private static final int SIGNAL_NEXT_REQUEST = 1; + + private static final DirectBuffer EMPTY_BUFFER = new UnsafeBuffer(); + private static final OctetsFW EMPTY_OCTETS = new OctetsFW().wrap(EMPTY_BUFFER, 0, 0); + private static final Consumer EMPTY_EXTENSION = ex -> {}; + + private static final short CREATE_TOPICS_API_KEY = 60; + private static final short CREATE_TOPICS_API_VERSION = 0; + + private final BeginFW beginRO = new BeginFW(); + private final DataFW dataRO = new DataFW(); + private final EndFW endRO = new EndFW(); + private final AbortFW abortRO = new AbortFW(); + private final ResetFW resetRO = new ResetFW(); + private final WindowFW windowRO = new WindowFW(); + private final SignalFW signalRO = new SignalFW(); + private final ExtensionFW extensionRO = new ExtensionFW(); + private final KafkaBeginExFW kafkaBeginExRO = new KafkaBeginExFW(); + + private final BeginFW.Builder beginRW = new BeginFW.Builder(); + private final DataFW.Builder dataRW = new DataFW.Builder(); + private final EndFW.Builder endRW = new EndFW.Builder(); + private final AbortFW.Builder abortRW = new AbortFW.Builder(); + private final ResetFW.Builder resetRW = new ResetFW.Builder(); + private final WindowFW.Builder windowRW = new WindowFW.Builder(); + private final ProxyBeginExFW.Builder proxyBeginExRW = new ProxyBeginExFW.Builder(); + private final KafkaBeginExFW.Builder kafkaBeginExRW = new KafkaBeginExFW.Builder(); + private final KafkaResetExFW.Builder kafkaResetExRW = new KafkaResetExFW.Builder(); + + private final RequestHeaderFW.Builder requestHeaderRW = new RequestHeaderFW.Builder(); + private final DescribeClusterRequestFW.Builder describeClusterRequestRW = new DescribeClusterRequestFW.Builder(); + + private final ResponseHeaderFW responseHeaderRO = new ResponseHeaderFW(); + private final DescribeClusterResponseFW describeClusterResponseRO = new DescribeClusterResponseFW(); + private final ClusterBrokerFW brokerResponseRO = new ClusterBrokerFW(); + private final DescribeClusterResponsePart2FW describeClusterResponsePart2RO = new DescribeClusterResponsePart2FW(); + + private final KafkaDescribeClusterClientDecoder decodeSaslHandshakeResponse = this::decodeSaslHandshakeResponse; + private final KafkaDescribeClusterClientDecoder decodeSaslHandshake = this::decodeSaslHandshake; + private final KafkaDescribeClusterClientDecoder decodeSaslHandshakeMechanisms = this::decodeSaslHandshakeMechanisms; + private final KafkaDescribeClusterClientDecoder decodeSaslHandshakeMechanism = this::decodeSaslHandshakeMechanism; + private final KafkaDescribeClusterClientDecoder decodeSaslAuthenticateResponse = this::decodeSaslAuthenticateResponse; + private final KafkaDescribeClusterClientDecoder decodeSaslAuthenticate = this::decodeSaslAuthenticate; + private final KafkaDescribeClusterClientDecoder decodeDescribeClusterResponse = this::decodeDescribeClusterResponse; + private final KafkaDescribeClusterClientDecoder decodeIgnoreAll = this::decodeIgnoreAll; + private final KafkaDescribeClusterClientDecoder decodeReject = this::decodeReject; + + private final int kafkaTypeId; + private final int proxyTypeId; + private final MutableDirectBuffer writeBuffer; + private final MutableDirectBuffer extBuffer; + private final BufferPool decodePool; + private final BufferPool encodePool; + private final Signaler signaler; + private final BindingHandler streamFactory; + private final UnaryOperator resolveSasl; + private final LongFunction supplyBinding; + private final LongFunction supplyDebitor; + private final List brokers; + + public KafkaClientDescribeClusterFactory( + KafkaConfiguration config, + EngineContext context, + LongFunction supplyBinding, + LongFunction supplyDebitor, + Signaler signaler, + BindingHandler streamFactory, + UnaryOperator resolveSasl) + { + super(config, context); + this.kafkaTypeId = context.supplyTypeId(KafkaBinding.NAME); + this.proxyTypeId = context.supplyTypeId("proxy"); + this.signaler = signaler; + this.streamFactory = streamFactory; + this.resolveSasl = resolveSasl; + this.writeBuffer = new UnsafeBuffer(new byte[context.writeBuffer().capacity()]); + this.extBuffer = new UnsafeBuffer(new byte[context.writeBuffer().capacity()]); + this.decodePool = context.bufferPool(); + this.encodePool = context.bufferPool(); + this.supplyBinding = supplyBinding; + this.supplyDebitor = supplyDebitor; + this.brokers = new ArrayList<>(); + } + + @Override + public MessageConsumer newStream( + int msgTypeId, + DirectBuffer buffer, + int index, + int length, + MessageConsumer application) + { + final BeginFW begin = beginRO.wrap(buffer, index, index + length); + final long originId = begin.originId(); + final long routedId = begin.routedId(); + final long initialId = begin.streamId(); + final long affinity = begin.affinity(); + final long authorization = begin.authorization(); + final OctetsFW extension = begin.extension(); + final ExtensionFW beginEx = extensionRO.tryWrap(extension.buffer(), extension.offset(), extension.limit()); + final KafkaBeginExFW kafkaBeginEx = beginEx != null && beginEx.typeId() == kafkaTypeId ? + kafkaBeginExRO.tryWrap(extension.buffer(), extension.offset(), extension.limit()) : null; + + assert kafkaBeginEx.kind() == KafkaBeginExFW.KIND_REQUEST; + final KafkaDescribeClusterRequestBeginExFW kafkaDescribeClusterBeginEx = kafkaBeginEx.request().describeCluster(); + + MessageConsumer newStream = null; + + final KafkaBindingConfig binding = supplyBinding.apply(routedId); + final KafkaRouteConfig resolved = binding != null + ? binding.resolve(authorization, null) + : null; + + if (resolved != null) + { + final long resolvedId = resolved.id; + final KafkaSaslConfig sasl = resolveSasl.apply(binding.sasl()); + + int authorizedOperations = kafkaDescribeClusterBeginEx.includeAuthorizedOperations(); + newStream = new KafkaDescribeClusterStream( + application, + originId, + routedId, + initialId, + affinity, + resolvedId, + authorizedOperations, + binding.servers(), + sasl)::onApplication; + } + + return newStream; + } + + private MessageConsumer newStream( + MessageConsumer sender, + long originId, + long routedId, + long streamId, + long sequence, + long acknowledge, + int maximum, + long traceId, + long authorization, + long affinity, + Consumer extension) + { + final BeginFW begin = beginRW.wrap(writeBuffer, 0, writeBuffer.capacity()) + .originId(originId) + .routedId(routedId) + .streamId(streamId) + .sequence(sequence) + .acknowledge(acknowledge) + .maximum(maximum) + .traceId(traceId) + .authorization(authorization) + .affinity(affinity) + .extension(extension) + .build(); + + final MessageConsumer receiver = + streamFactory.newStream(begin.typeId(), begin.buffer(), begin.offset(), begin.sizeof(), sender); + + receiver.accept(begin.typeId(), begin.buffer(), begin.offset(), begin.sizeof()); + + return receiver; + } + + private void doBegin( + MessageConsumer receiver, + long originId, + long routedId, + long streamId, + long sequence, + long acknowledge, + int maximum, + long traceId, + long authorization, + long affinity, + Consumer extension) + { + final BeginFW begin = beginRW.wrap(writeBuffer, 0, writeBuffer.capacity()) + .originId(originId) + .routedId(routedId) + .streamId(streamId) + .sequence(sequence) + .acknowledge(acknowledge) + .maximum(maximum) + .traceId(traceId) + .authorization(authorization) + .affinity(affinity) + .extension(extension) + .build(); + + receiver.accept(begin.typeId(), begin.buffer(), begin.offset(), begin.sizeof()); + } + + private void doData( + MessageConsumer receiver, + long originId, + long routedId, + long streamId, + long sequence, + long acknowledge, + int maximum, + long traceId, + long authorization, + long budgetId, + int reserved, + DirectBuffer payload, + int offset, + int length, + Consumer extension) + { + final DataFW data = dataRW.wrap(writeBuffer, 0, writeBuffer.capacity()) + .originId(originId) + .routedId(routedId) + .streamId(streamId) + .sequence(sequence) + .acknowledge(acknowledge) + .maximum(maximum) + .traceId(traceId) + .authorization(authorization) + .budgetId(budgetId) + .reserved(reserved) + .payload(payload, offset, length) + .extension(extension) + .build(); + + receiver.accept(data.typeId(), data.buffer(), data.offset(), data.sizeof()); + } + + private void doData( + MessageConsumer receiver, + long originId, + long routedId, + long streamId, + long sequence, + long acknowledge, + int maximum, + long traceId, + long authorization, + long budgetId, + int reserved, + DirectBuffer payload, + int offset, + int length, + Flyweight extension) + { + final DataFW data = dataRW.wrap(writeBuffer, 0, writeBuffer.capacity()) + .originId(originId) + .routedId(routedId) + .streamId(streamId) + .sequence(sequence) + .acknowledge(acknowledge) + .maximum(maximum) + .traceId(traceId) + .authorization(authorization) + .budgetId(budgetId) + .reserved(reserved) + .payload(payload, offset, length) + .extension(extension.buffer(), extension.offset(), extension.sizeof()) + .build(); + + receiver.accept(data.typeId(), data.buffer(), data.offset(), data.sizeof()); + } + + private void doEnd( + MessageConsumer receiver, + long originId, + long routedId, + long streamId, + long sequence, + long acknowledge, + int maximum, + long traceId, + long authorization, + Consumer extension) + { + final EndFW end = endRW.wrap(writeBuffer, 0, writeBuffer.capacity()) + .originId(originId) + .routedId(routedId) + .streamId(streamId) + .sequence(sequence) + .acknowledge(acknowledge) + .maximum(maximum) + .traceId(traceId) + .authorization(authorization) + .extension(extension) + .build(); + + receiver.accept(end.typeId(), end.buffer(), end.offset(), end.sizeof()); + } + + private void doAbort( + MessageConsumer receiver, + long originId, + long routedId, + long streamId, + long sequence, + long acknowledge, + int maximum, + long traceId, + long authorization, + Consumer extension) + { + final AbortFW abort = abortRW.wrap(writeBuffer, 0, writeBuffer.capacity()) + .originId(originId) + .routedId(routedId) + .streamId(streamId) + .sequence(sequence) + .acknowledge(acknowledge) + .maximum(maximum) + .traceId(traceId) + .authorization(authorization) + .extension(extension) + .build(); + + receiver.accept(abort.typeId(), abort.buffer(), abort.offset(), abort.sizeof()); + } + + private void doWindow( + MessageConsumer sender, + long originId, + long routedId, + long streamId, + long sequence, + long acknowledge, + int maximum, + long traceId, + long authorization, + long budgetId, + int padding) + { + final WindowFW window = windowRW.wrap(writeBuffer, 0, writeBuffer.capacity()) + .originId(originId) + .routedId(routedId) + .streamId(streamId) + .sequence(sequence) + .acknowledge(acknowledge) + .maximum(maximum) + .traceId(traceId) + .authorization(authorization) + .budgetId(budgetId) + .padding(padding) + .build(); + + sender.accept(window.typeId(), window.buffer(), window.offset(), window.sizeof()); + } + + private void doReset( + MessageConsumer sender, + long originId, + long routedId, + long streamId, + long sequence, + long acknowledge, + int maximum, + long traceId, + long authorization, + Flyweight extension) + { + final ResetFW reset = resetRW.wrap(writeBuffer, 0, writeBuffer.capacity()) + .originId(originId) + .routedId(routedId) + .streamId(streamId) + .sequence(sequence) + .acknowledge(acknowledge) + .maximum(maximum) + .traceId(traceId) + .authorization(authorization) + .extension(extension.buffer(), extension.offset(), extension.sizeof()) + .build(); + + sender.accept(reset.typeId(), reset.buffer(), reset.offset(), reset.sizeof()); + } + + @FunctionalInterface + private interface KafkaDescribeClusterClientDecoder + { + int decode( + KafkaDescribeClusterClient client, + long traceId, + long authorization, + long budgetId, + int reserved, + MutableDirectBuffer buffer, + int offset, + int progress, + int limit); + } + + private int decodeDescribeClusterResponse( + KafkaDescribeClusterClient client, + long traceId, + long authorization, + long budgetId, + int reserved, + DirectBuffer buffer, + int offset, + int progress, + int limit) + { + final int length = limit - progress; + + decode: + if (length != 0) + { + final ResponseHeaderFW responseHeader = responseHeaderRO.tryWrap(buffer, progress, limit); + if (responseHeader == null) + { + break decode; + } + + progress = responseHeader.limit(); + + final DescribeClusterResponseFW describeClusterResponse = describeClusterResponseRO.tryWrap(buffer, progress, limit); + if (describeClusterResponse == null) + { + break decode; + } + + progress = describeClusterResponse.limit(); + + final int throttle = describeClusterResponse.throttle(); + final short error = describeClusterResponse.error(); + final String16FW message = describeClusterResponse.message(); + final String16FW clusterId = describeClusterResponse.clusterId(); + final int controllerId = describeClusterResponse.controllerId(); + final int brokerCount = describeClusterResponse.brokerCount(); + + brokers.clear(); + for (int brokerIndex = 0; brokerIndex < brokerCount; brokerIndex++) + { + final ClusterBrokerFW broker = brokerResponseRO.tryWrap(buffer, progress, limit); + if (broker == null) + { + client.decoder = decodeIgnoreAll; + break decode; + } + + progress = broker.limit(); + + brokers.add(new ClusterBrokerInfo( + broker.brokerId(), broker.host().asString(), broker.port(), broker.rack().asString())); + } + + final DescribeClusterResponsePart2FW responsePart2 = + describeClusterResponsePart2RO.wrap(buffer, progress, limit); + final int authorizedOperations = responsePart2.authorizedOperations(); + + client.onDecodeDescribeClusterResponse(traceId, authorization, throttle, + error, message, clusterId, controllerId, brokers, authorizedOperations); + } + + return progress; + } + + private int decodeReject( + KafkaDescribeClusterClient client, + long traceId, + long authorization, + long budgetId, + int reserved, + DirectBuffer buffer, + int offset, + int progress, + int limit) + { + client.cleanupNetwork(traceId); + client.decoder = decodeIgnoreAll; + return limit; + } + + private int decodeIgnoreAll( + KafkaDescribeClusterClient client, + long traceId, + long authorization, + long budgetId, + int reserved, + DirectBuffer buffer, + int offset, + int progress, + int limit) + { + return limit; + } + + private final class KafkaDescribeClusterStream + { + private final MessageConsumer application; + private final long originId; + private final long routedId; + private final long initialId; + private final long replyId; + private final long affinity; + private final KafkaDescribeClusterClient client; + + private int state; + + private long initialSeq; + private long initialAck; + private int initialMax; + + private long replySeq; + private long replyAck; + private int replyMax; + private int replyPad; + + private long replyBudgetId; + + KafkaDescribeClusterStream( + MessageConsumer application, + long originId, + long routedId, + long initialId, + long affinity, + long resolvedId, + int authorizedOperations, + List servers, + KafkaSaslConfig sasl) + { + this.application = application; + this.originId = originId; + this.routedId = routedId; + this.initialId = initialId; + this.replyId = supplyReplyId.applyAsLong(initialId); + this.affinity = affinity; + this.client = new KafkaDescribeClusterClient(this, routedId, resolvedId, authorizedOperations, servers, sasl); + } + + private void onApplication( + int msgTypeId, + DirectBuffer buffer, + int index, + int length) + { + switch (msgTypeId) + { + case BeginFW.TYPE_ID: + final BeginFW begin = beginRO.wrap(buffer, index, index + length); + onApplicationBegin(begin); + break; + case DataFW.TYPE_ID: + final DataFW data = dataRO.wrap(buffer, index, index + length); + onApplicationData(data); + break; + case EndFW.TYPE_ID: + final EndFW end = endRO.wrap(buffer, index, index + length); + onApplicationEnd(end); + break; + case AbortFW.TYPE_ID: + final AbortFW abort = abortRO.wrap(buffer, index, index + length); + onApplicationAbort(abort); + break; + case WindowFW.TYPE_ID: + final WindowFW window = windowRO.wrap(buffer, index, index + length); + onApplicationWindow(window); + break; + case ResetFW.TYPE_ID: + final ResetFW reset = resetRO.wrap(buffer, index, index + length); + onApplicationReset(reset); + break; + default: + break; + } + } + + private void onApplicationBegin( + BeginFW begin) + { + final long traceId = begin.traceId(); + final long authorization = begin.authorization(); + + state = KafkaState.openingInitial(state); + + client.doNetworkBegin(traceId, authorization, affinity); + + doApplicationWindow(traceId, 0L, 0, 0, 0); + } + + private void onApplicationData( + DataFW data) + { + final long traceId = data.traceId(); + + client.cleanupNetwork(traceId); + } + + private void onApplicationEnd( + EndFW end) + { + final long traceId = end.traceId(); + final long authorization = end.authorization(); + + state = KafkaState.closedInitial(state); + + client.doNetworkEnd(traceId, authorization); + } + + private void onApplicationAbort( + AbortFW abort) + { + final long traceId = abort.traceId(); + + state = KafkaState.closedInitial(state); + + client.doNetworkAbort(traceId); + } + + private void onApplicationWindow( + WindowFW window) + { + final long sequence = window.sequence(); + final long acknowledge = window.acknowledge(); + final int maximum = window.maximum(); + final long budgetId = window.budgetId(); + final int padding = window.padding(); + + assert acknowledge <= sequence; + assert sequence <= replySeq; + assert acknowledge >= replyAck; + assert maximum >= replyMax; + + this.replyAck = acknowledge; + this.replyMax = maximum; + this.replyPad = padding; + this.replyBudgetId = budgetId; + + assert replyAck <= replySeq; + } + + private void onApplicationReset( + ResetFW reset) + { + final long traceId = reset.traceId(); + + state = KafkaState.closedReply(state); + + client.doNetworkReset(traceId); + } + + private boolean isApplicationReplyOpen() + { + return KafkaState.replyOpening(state); + } + + private void doApplicationBegin( + long traceId, + long authorization, + int throttle, + short error, + String16FW message, + String16FW clusterId, + int controllerId, + List brokers, + int authorizedOperations) + { + if (!KafkaState.replyOpening(state)) + { + state = KafkaState.openingReply(state); + + doBegin(application, originId, routedId, replyId, replySeq, replyAck, replyMax, + traceId, authorization, affinity, + ex -> ex.set((b, o, l) -> kafkaBeginExRW.wrap(b, o, l) + .typeId(kafkaTypeId) + .response(r -> r + .describeCluster( + dc -> dc + .throttle(throttle) + .error(error) + .message(message) + .clusterId(clusterId) + .controllerId(controllerId) + .brokers(t -> + brokers.forEach(bs -> + t.item(i -> i + .brokerId(bs.brokerId) + .host(bs.host) + .port(bs.port) + .rack(bs.rack)))) + .authorizedOperations(authorizedOperations))) + .build() + .sizeof())); + } + } + + + private void doApplicationEnd( + long traceId) + { + state = KafkaState.closedReply(state); + doEnd(application, originId, routedId, replyId, replySeq, replyAck, replyMax, + traceId, client.authorization, EMPTY_EXTENSION); + } + + private void doApplicationAbort( + long traceId) + { + if (KafkaState.replyOpening(state) && !KafkaState.replyClosed(state)) + { + state = KafkaState.closedReply(state); + doAbort(application, originId, routedId, replyId, replySeq, replyAck, replyMax, + traceId, client.authorization, EMPTY_EXTENSION); + } + } + + private void doApplicationWindow( + long traceId, + long budgetId, + int minInitialNoAck, + int minInitialPad, + int minInitialMax) + { + final long newInitialAck = Math.max(initialSeq - minInitialNoAck, initialAck); + + if (newInitialAck > initialAck || minInitialMax > initialMax || !KafkaState.initialOpened(state)) + { + initialAck = newInitialAck; + assert initialAck <= initialSeq; + + initialMax = minInitialMax; + + state = KafkaState.openedInitial(state); + + doWindow(application, originId, routedId, initialId, initialSeq, initialAck, initialMax, + traceId, client.authorization, budgetId, minInitialPad); + } + } + + private void doApplicationReset( + long traceId, + Flyweight extension) + { + if (KafkaState.initialOpening(state) && !KafkaState.initialClosed(state)) + { + state = KafkaState.closedInitial(state); + + doReset(application, originId, routedId, initialId, initialSeq, initialAck, initialMax, + traceId, client.authorization, extension); + } + } + + private void cleanupApplication( + long traceId, + int error) + { + final KafkaResetExFW kafkaResetEx = kafkaResetExRW.wrap(extBuffer, 0, extBuffer.capacity()) + .typeId(kafkaTypeId) + .error(error) + .build(); + + cleanupApplication(traceId, kafkaResetEx); + } + + private void cleanupApplication( + long traceId, + Flyweight extension) + { + doApplicationReset(traceId, extension); + doApplicationAbort(traceId); + } + } + + private final class KafkaDescribeClusterClient extends KafkaSaslClient + { + private final LongLongConsumer encodeSaslHandshakeRequest = this::doEncodeSaslHandshakeRequest; + private final LongLongConsumer encodeSaslAuthenticateRequest = this::doEncodeSaslAuthenticateRequest; + private final LongLongConsumer encodeDescribeClusterRequest = this::doEncodeDescribeClusterRequest; + + private final KafkaDescribeClusterStream delegate; + private final int authorizedOperations; + + private MessageConsumer network; + private int state; + private long authorization; + + private long initialSeq; + private long initialAck; + private int initialMax; + private int initialMin; + private int initialPad; + private long initialBudgetId = NO_BUDGET_ID; + private long initialDebIndex = NO_DEBITOR_INDEX; + + private long replySeq; + private long replyAck; + private int replyMax; + + private int encodeSlot = NO_SLOT; + private int encodeSlotOffset; + private long encodeSlotTraceId; + + private int decodeSlot = NO_SLOT; + private int decodeSlotOffset; + private int decodeSlotReserved; + + private int nextResponseId; + + private BudgetDebitor initialDeb; + private KafkaDescribeClusterClientDecoder decoder; + private LongLongConsumer encoder; + + KafkaDescribeClusterClient( + KafkaDescribeClusterStream delegate, + long originId, + long routedId, + int authorizedOperations, + List servers, + KafkaSaslConfig sasl) + { + super(servers, sasl, originId, routedId); + this.delegate = delegate; + this.authorizedOperations = authorizedOperations; + this.encoder = sasl != null ? encodeSaslHandshakeRequest : encodeDescribeClusterRequest; + + this.decoder = decodeReject; + } + + private void onNetwork( + int msgTypeId, + DirectBuffer buffer, + int index, + int length) + { + switch (msgTypeId) + { + case BeginFW.TYPE_ID: + final BeginFW begin = beginRO.wrap(buffer, index, index + length); + onNetworkBegin(begin); + break; + case DataFW.TYPE_ID: + final DataFW data = dataRO.wrap(buffer, index, index + length); + onNetworkData(data); + break; + case EndFW.TYPE_ID: + final EndFW end = endRO.wrap(buffer, index, index + length); + onNetworkEnd(end); + break; + case AbortFW.TYPE_ID: + final AbortFW abort = abortRO.wrap(buffer, index, index + length); + onNetworkAbort(abort); + break; + case ResetFW.TYPE_ID: + final ResetFW reset = resetRO.wrap(buffer, index, index + length); + onNetworkReset(reset); + break; + case WindowFW.TYPE_ID: + final WindowFW window = windowRO.wrap(buffer, index, index + length); + onNetworkWindow(window); + break; + case SignalFW.TYPE_ID: + final SignalFW signal = signalRO.wrap(buffer, index, index + length); + onNetworkSignal(signal); + break; + default: + break; + } + } + + private void onNetworkBegin( + BeginFW begin) + { + final long traceId = begin.traceId(); + + authorization = begin.authorization(); + state = KafkaState.openingReply(state); + + doNetworkWindow(traceId, 0L, 0, 0, decodePool.slotCapacity()); + } + + private void onNetworkData( + DataFW data) + { + final long sequence = data.sequence(); + final long acknowledge = data.acknowledge(); + final long traceId = data.traceId(); + final long budgetId = data.budgetId(); + + assert acknowledge <= sequence; + assert sequence >= replySeq; + + replySeq = sequence + data.reserved(); + authorization = data.authorization(); + + assert replyAck <= replySeq; + + if (replySeq > replyAck + replyMax) + { + cleanupNetwork(traceId); + } + else + { + if (decodeSlot == NO_SLOT) + { + decodeSlot = decodePool.acquire(initialId); + } + + if (decodeSlot == NO_SLOT) + { + cleanupNetwork(traceId); + } + else + { + final OctetsFW payload = data.payload(); + int reserved = data.reserved(); + int offset = payload.offset(); + int limit = payload.limit(); + + final MutableDirectBuffer buffer = decodePool.buffer(decodeSlot); + buffer.putBytes(decodeSlotOffset, payload.buffer(), offset, limit - offset); + decodeSlotOffset += limit - offset; + decodeSlotReserved += reserved; + + offset = 0; + limit = decodeSlotOffset; + reserved = decodeSlotReserved; + + decodeNetwork(traceId, authorization, budgetId, reserved, buffer, offset, limit); + } + } + } + + private void onNetworkEnd( + EndFW end) + { + final long traceId = end.traceId(); + + state = KafkaState.closedReply(state); + + cleanupDecodeSlotIfNecessary(); + + if (!delegate.isApplicationReplyOpen()) + { + cleanupNetwork(traceId); + } + else if (decodeSlot == NO_SLOT) + { + delegate.doApplicationEnd(traceId); + } + } + + private void onNetworkAbort( + AbortFW abort) + { + final long traceId = abort.traceId(); + + state = KafkaState.closedReply(state); + + cleanupNetwork(traceId); + } + + private void onNetworkReset( + ResetFW reset) + { + final long traceId = reset.traceId(); + + state = KafkaState.closedInitial(state); + + cleanupNetwork(traceId); + } + + private void onNetworkWindow( + WindowFW window) + { + final long sequence = window.sequence(); + final long acknowledge = window.acknowledge(); + final int minimum = window.minimum(); + final int maximum = window.maximum(); + final long traceId = window.traceId(); + final long budgetId = window.budgetId(); + final int padding = window.padding(); + + assert acknowledge <= sequence; + assert sequence <= initialSeq; + assert acknowledge >= initialAck; + assert maximum + acknowledge >= initialMax + initialAck; + + this.initialAck = acknowledge; + this.initialMax = maximum; + this.initialPad = padding; + this.initialMin = minimum; + this.initialBudgetId = budgetId; + + assert initialAck <= initialSeq; + + this.authorization = window.authorization(); + + state = KafkaState.openedInitial(state); + + if (initialBudgetId != NO_BUDGET_ID && initialDebIndex == NO_DEBITOR_INDEX) + { + initialDeb = supplyDebitor.apply(initialBudgetId); + initialDebIndex = initialDeb.acquire(initialBudgetId, initialId, this::doNetworkData); + assert initialDebIndex != NO_DEBITOR_INDEX; + } + + doNetworkData(budgetId); + + doEncodeRequest(traceId, budgetId); + } + + private void doNetworkData( + long traceId) + { + if (encodeSlot != NO_SLOT) + { + final MutableDirectBuffer buffer = encodePool.buffer(encodeSlot); + final int limit = encodeSlotOffset; + + encodeNetwork(traceId, authorization, initialBudgetId, buffer, 0, limit); + } + } + + private void onNetworkSignal( + SignalFW signal) + { + final long traceId = signal.traceId(); + final int signalId = signal.signalId(); + + if (signalId == SIGNAL_NEXT_REQUEST) + { + doEncodeRequest(traceId, initialBudgetId); + } + } + + private void doNetworkBegin( + long traceId, + long authorization, + long affinity) + { + state = KafkaState.openingInitial(state); + + Consumer extension = EMPTY_EXTENSION; + + if (server != null) + { + extension = e -> e.set((b, o, l) -> proxyBeginExRW.wrap(b, o, l) + .typeId(proxyTypeId) + .address(a -> a.inet(i -> i.protocol(p -> p.set(STREAM)) + .source("0.0.0.0") + .destination(server.host) + .sourcePort(0) + .destinationPort(server.port))) + .infos(i -> i.item(ii -> ii.authority(server.host))) + .build() + .sizeof()); + } + + network = newStream(this::onNetwork, originId, routedId, initialId, initialSeq, initialAck, initialMax, + traceId, authorization, affinity, extension); + } + + @Override + protected void doNetworkData( + long traceId, + long budgetId, + DirectBuffer buffer, + int offset, + int limit) + { + if (encodeSlot != NO_SLOT) + { + final MutableDirectBuffer encodeBuffer = encodePool.buffer(encodeSlot); + encodeBuffer.putBytes(encodeSlotOffset, buffer, offset, limit - offset); + encodeSlotOffset += limit - offset; + encodeSlotTraceId = traceId; + + buffer = encodeBuffer; + offset = 0; + limit = encodeSlotOffset; + } + + encodeNetwork(traceId, authorization, budgetId, buffer, offset, limit); + } + + private void doNetworkEnd( + long traceId, + long authorization) + { + state = KafkaState.closedInitial(state); + + cleanupEncodeSlotIfNecessary(); + cleanupBudgetIfNecessary(); + + doEnd(network, originId, routedId, initialId, initialSeq, initialAck, initialMax, + traceId, authorization, EMPTY_EXTENSION); + } + + private void doNetworkAbort( + long traceId) + { + if (!KafkaState.initialClosed(state)) + { + doAbort(network, originId, routedId, initialId, initialSeq, initialAck, initialMax, + traceId, authorization, EMPTY_EXTENSION); + state = KafkaState.closedInitial(state); + } + + cleanupEncodeSlotIfNecessary(); + cleanupBudgetIfNecessary(); + } + + private void doNetworkReset( + long traceId) + { + if (!KafkaState.replyClosed(state)) + { + doReset(network, originId, routedId, replyId, replySeq, replyAck, replyMax, + traceId, authorization, EMPTY_OCTETS); + state = KafkaState.closedReply(state); + } + + cleanupDecodeSlotIfNecessary(); + } + + private void doNetworkWindow( + long traceId, + long budgetId, + int minReplyNoAck, + int minReplyPad, + int minReplyMax) + { + final long newReplyAck = Math.max(replySeq - minReplyNoAck, replyAck); + + if (newReplyAck > replyAck || minReplyMax > replyMax || !KafkaState.replyOpened(state)) + { + replyAck = newReplyAck; + assert replyAck <= replySeq; + + replyMax = minReplyMax; + + state = KafkaState.openedReply(state); + + doWindow(network, originId, routedId, replyId, replySeq, replyAck, replyMax, + traceId, authorization, budgetId, minReplyPad); + } + } + + private void doEncodeRequest( + long traceId, + long budgetId) + { + if (nextRequestId == nextResponseId) + { + encoder.accept(traceId, budgetId); + } + } + + private void doEncodeDescribeClusterRequest( + long traceId, + long budgetId) + { + final MutableDirectBuffer encodeBuffer = writeBuffer; + final int encodeOffset = DataFW.FIELD_OFFSET_PAYLOAD; + final int encodeLimit = encodeBuffer.capacity(); + + int encodeProgress = encodeOffset; + + final RequestHeaderFW requestHeader = requestHeaderRW.wrap(encodeBuffer, encodeProgress, encodeLimit) + .length(0) + .apiKey(CREATE_TOPICS_API_KEY) + .apiVersion(CREATE_TOPICS_API_VERSION) + .correlationId(0) + .clientId(clientId) + .build(); + + encodeProgress = requestHeader.limit(); + + final DescribeClusterRequestFW describeClusterRequest = + describeClusterRequestRW.wrap(encodeBuffer, encodeProgress, encodeLimit) + .includeAuthorizedOperations(authorizedOperations) + .build(); + + encodeProgress = describeClusterRequest.limit(); + + final int requestId = nextRequestId++; + final int requestSize = encodeProgress - encodeOffset - RequestHeaderFW.FIELD_OFFSET_API_KEY; + + requestHeaderRW.wrap(encodeBuffer, requestHeader.offset(), requestHeader.limit()) + .length(requestSize) + .apiKey(requestHeader.apiKey()) + .apiVersion(requestHeader.apiVersion()) + .correlationId(requestId) + .clientId(requestHeader.clientId()) + .build(); + + doNetworkData(traceId, budgetId, encodeBuffer, encodeOffset, encodeProgress); + + decoder = decodeDescribeClusterResponse; + } + + private void encodeNetwork( + long traceId, + long authorization, + long budgetId, + DirectBuffer buffer, + int offset, + int limit) + { + final int length = limit - offset; + final int initialBudget = Math.max(initialMax - (int)(initialSeq - initialAck), 0); + final int reservedMax = Math.max(Math.min(length + initialPad, initialBudget), initialMin); + + int reserved = reservedMax; + + flush: + if (reserved > 0) + { + boolean claimed = false; + + if (initialDebIndex != NO_DEBITOR_INDEX) + { + reserved = initialDeb.claim(traceId, initialDebIndex, initialId, reserved, reserved, 0); + claimed = reserved > 0; + } + + if (reserved < initialPad || reserved == initialPad && length > 0) + { + break flush; + } + + doData(network, originId, routedId, initialId, initialSeq, initialAck, initialMax, + traceId, authorization, budgetId, reserved, buffer, offset, length, EMPTY_EXTENSION); + + initialSeq += reserved; + + assert initialAck <= initialSeq; + } + + final int flushed = Math.max(reserved - initialPad, 0); + final int remaining = length - flushed; + if (remaining > 0) + { + if (encodeSlot == NO_SLOT) + { + encodeSlot = encodePool.acquire(initialId); + } + + if (encodeSlot == NO_SLOT) + { + cleanupNetwork(traceId); + } + else + { + final MutableDirectBuffer encodeBuffer = encodePool.buffer(encodeSlot); + encodeBuffer.putBytes(0, buffer, offset + flushed, remaining); + encodeSlotOffset = remaining; + } + } + else + { + cleanupEncodeSlotIfNecessary(); + } + } + + private void decodeNetwork( + long traceId, + long authorization, + long budgetId, + int reserved, + MutableDirectBuffer buffer, + int offset, + int limit) + { + KafkaDescribeClusterClientDecoder previous = null; + int progress = offset; + while (progress <= limit && previous != decoder) + { + previous = decoder; + progress = decoder.decode(this, traceId, authorization, budgetId, reserved, buffer, offset, progress, limit); + } + + if (progress < limit) + { + if (decodeSlot == NO_SLOT) + { + decodeSlot = decodePool.acquire(initialId); + } + + if (decodeSlot == NO_SLOT) + { + cleanupNetwork(traceId); + } + else + { + final MutableDirectBuffer decodeBuffer = decodePool.buffer(decodeSlot); + decodeBuffer.putBytes(0, buffer, progress, limit - progress); + decodeSlotOffset = limit - progress; + decodeSlotReserved = (limit - progress) * reserved / (limit - offset); + } + + doNetworkWindow(traceId, budgetId, decodeSlotOffset, 0, replyMax); + } + else + { + cleanupDecodeSlotIfNecessary(); + + if (KafkaState.replyClosing(state)) + { + delegate.doApplicationEnd(traceId); + } + else if (reserved > 0) + { + doNetworkWindow(traceId, budgetId, 0, 0, replyMax); + } + } + } + + @Override + protected void doDecodeSaslHandshakeResponse( + long traceId) + { + decoder = decodeSaslHandshakeResponse; + } + + @Override + protected void doDecodeSaslHandshake( + long traceId) + { + decoder = decodeSaslHandshake; + } + + @Override + protected void doDecodeSaslHandshakeMechanisms( + long traceId) + { + decoder = decodeSaslHandshakeMechanisms; + } + + @Override + protected void doDecodeSaslHandshakeMechansim( + long traceId) + { + decoder = decodeSaslHandshakeMechanism; + } + + @Override + protected void doDecodeSaslAuthenticateResponse( + long traceId) + { + decoder = decodeSaslAuthenticateResponse; + } + + @Override + protected void doDecodeSaslAuthenticate( + long traceId) + { + decoder = decodeSaslAuthenticate; + } + + @Override + protected void onDecodeSaslHandshakeResponse( + long traceId, + long authorization, + int errorCode) + { + switch (errorCode) + { + case ERROR_NONE: + encoder = encodeSaslAuthenticateRequest; + decoder = decodeSaslAuthenticateResponse; + break; + default: + delegate.cleanupApplication(traceId, errorCode); + doNetworkEnd(traceId, authorization); + break; + } + } + + @Override + protected void onDecodeSaslAuthenticateResponse( + long traceId, + long authorization, + int errorCode) + { + switch (errorCode) + { + case ERROR_NONE: + encoder = encodeDescribeClusterRequest; + decoder = decodeDescribeClusterResponse; + break; + default: + delegate.cleanupApplication(traceId, errorCode); + doNetworkEnd(traceId, authorization); + break; + } + } + + @Override + protected void onDecodeSaslResponse( + long traceId) + { + nextResponseId++; + signaler.signalNow(originId, routedId, initialId, traceId, SIGNAL_NEXT_REQUEST, 0); + } + + private void onDecodeDescribeClusterResponse( + long traceId, + long authorization, + int throttle, + short error, + String16FW message, + String16FW clusterId, + int controllerId, + List brokers, + int authorizedOperations) + { + delegate.doApplicationBegin(traceId, authorization, throttle, error, message, + clusterId, controllerId, brokers, authorizedOperations); + } + + private void cleanupNetwork( + long traceId) + { + doNetworkReset(traceId); + doNetworkAbort(traceId); + + delegate.cleanupApplication(traceId, ERROR_NONE); + } + + private void cleanupDecodeSlotIfNecessary() + { + if (decodeSlot != NO_SLOT) + { + decodePool.release(decodeSlot); + decodeSlot = NO_SLOT; + decodeSlotOffset = 0; + decodeSlotReserved = 0; + } + } + + private void cleanupEncodeSlotIfNecessary() + { + if (encodeSlot != NO_SLOT) + { + encodePool.release(encodeSlot); + encodeSlot = NO_SLOT; + encodeSlotOffset = 0; + encodeSlotTraceId = 0; + } + } + + private void cleanupBudgetIfNecessary() + { + if (initialDebIndex != NO_DEBITOR_INDEX) + { + initialDeb.release(initialDebIndex, initialId); + initialDebIndex = NO_DEBITOR_INDEX; + } + } + } + + private record ClusterBrokerInfo( + int brokerId, + String host, + int port, + String rack) + { + } +} diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientRequestFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientRequestFactory.java index f21da275fe..67e585d9fe 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientRequestFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientRequestFactory.java @@ -57,9 +57,12 @@ public KafkaClientRequestFactory( { final KafkaClientCreateTopicsFactory clientCreateTopicsFactory = new KafkaClientCreateTopicsFactory( config, context, supplyBinding, supplyDebitor, signaler, streamFactory, resolveSasl); + final KafkaClientDescribeClusterFactory clientDescribeClusterFactory = new KafkaClientDescribeClusterFactory( + config, context, supplyBinding, supplyDebitor, signaler, streamFactory, resolveSasl); final Int2ObjectHashMap factories = new Int2ObjectHashMap<>(); factories.put(KafkaApi.CREATE_TOPICS.value(), clientCreateTopicsFactory); + factories.put(KafkaApi.DESCRIBE_CLUSTER.value(), clientDescribeClusterFactory); this.kafkaTypeId = context.supplyTypeId(KafkaBinding.NAME); this.factories = factories; diff --git a/runtime/binding-kafka/src/main/zilla/protocol.idl b/runtime/binding-kafka/src/main/zilla/protocol.idl index e13b379a4b..ff606c0107 100644 --- a/runtime/binding-kafka/src/main/zilla/protocol.idl +++ b/runtime/binding-kafka/src/main/zilla/protocol.idl @@ -581,6 +581,38 @@ scope protocol } } + scope describe_cluster + { + struct DescribeClusterRequest // v0 + { + uint8 includeAuthorizedOperations; + } + + struct DescribeClusterResponse + { + int32 correlationId; + int32 throttle; + int16 error; + string16 message; + string16 clusterId = null; + int32 controllerId; + int32 brokerCount; + } + + struct ClusterBroker + { + int32 brokerId; + string16 host; + int32 port; + string16 rack = null; + } + + struct DescribeClusterResponsePart2 + { + int32 authorizedOperations; + } + } + scope create_topics { struct CreateTopicsRequest // v3 diff --git a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/DescribeClusterIT.java b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/DescribeClusterIT.java new file mode 100644 index 0000000000..fbf5662d5d --- /dev/null +++ b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/DescribeClusterIT.java @@ -0,0 +1,59 @@ +/* + * Copyright 2021-2023 Aklivity Inc. + * + * Aklivity licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.aklivity.zilla.runtime.binding.kafka.internal.stream; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.rules.RuleChain.outerRule; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.DisableOnDebug; +import org.junit.rules.TestRule; +import org.junit.rules.Timeout; + +import io.aklivity.k3po.runtime.junit.annotation.Specification; +import io.aklivity.k3po.runtime.junit.rules.K3poRule; +import io.aklivity.zilla.runtime.engine.test.EngineRule; +import io.aklivity.zilla.runtime.engine.test.annotation.Configuration; + +public class DescribeClusterIT +{ + private final K3poRule k3po = new K3poRule() + .addScriptRoot("app", "io/aklivity/zilla/specs/binding/kafka/streams/application/describe.cluster") + .addScriptRoot("net", "io/aklivity/zilla/specs/binding/kafka/streams/network/describe.cluster.v0"); + + private final TestRule timeout = new DisableOnDebug(new Timeout(5, SECONDS)); + + private final EngineRule engine = new EngineRule() + .directory("target/zilla-itests") + .countersBufferCapacity(8192) + .configurationRoot("io/aklivity/zilla/specs/binding/kafka/config") + .external("net0") + .clean(); + + @Rule + public final TestRule chain = outerRule(engine).around(k3po).around(timeout); + + @Test + @Configuration("client.yaml") + @Specification({ + "${app}/cluster.brokers.info/client", + "${net}/cluster.brokers.info/server"}) + public void shouldDescribeClusterBrokerInfo() throws Exception + { + k3po.finish(); + } +} diff --git a/specs/binding-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctions.java b/specs/binding-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctions.java index c02d6bd0cc..a9b873a47e 100644 --- a/specs/binding-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctions.java +++ b/specs/binding-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctions.java @@ -68,6 +68,7 @@ import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaApi; import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaBeginExFW; import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaBootstrapBeginExFW; +import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaClusterBrokerFW; import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaConsumerAssignmentFW; import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaConsumerBeginExFW; import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaConsumerDataExFW; @@ -81,6 +82,8 @@ import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaDeleteTopicsRequestBeginExFW; import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaDeleteTopicsResponseBeginExFW; import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaDescribeBeginExFW; +import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaDescribeClusterRequestBeginExFW; +import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaDescribeClusterResponseBeginExFW; import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaDescribeDataExFW; import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaFetchBeginExFW; import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaFetchDataExFW; @@ -1056,6 +1059,13 @@ public KafkaRequestAlterConfigsBeginExBuilder alterConfigs() return new KafkaRequestAlterConfigsBeginExBuilder(); } + public KafkaDescribeClusterRequestBeginExBuilder describeCluster() + { + requestBeginExRW.kind(KafkaApi.DESCRIBE_CLUSTER.value()); + + return new KafkaDescribeClusterRequestBeginExBuilder(); + } + public KafkaBeginExBuilder build() { final KafkaRequestBeginExFW requestBeginEx = requestBeginExRW.build(); @@ -1275,6 +1285,34 @@ public KafkaRequestAlterConfigsBeginExBuilder build() } } } + + public final class KafkaDescribeClusterRequestBeginExBuilder + { + private final KafkaDescribeClusterRequestBeginExFW.Builder describeClusterBeginExRW = + new KafkaDescribeClusterRequestBeginExFW.Builder(); + + private KafkaDescribeClusterRequestBeginExBuilder() + { + describeClusterBeginExRW.wrap( + writeBuffer, + KafkaBeginExFW.FIELD_OFFSET_REQUEST + KafkaRequestBeginExFW.FIELD_OFFSET_DESCRIBE_CLUSTER, + writeBuffer.capacity()); + } + + public KafkaDescribeClusterRequestBeginExBuilder includeAuthorizedOperations( + String include) + { + describeClusterBeginExRW.includeAuthorizedOperations(Boolean.parseBoolean(include) ? 1 : 0); + return this; + } + + public KafkaBeginExBuilder build() + { + final KafkaDescribeClusterRequestBeginExFW requestBeginEx = describeClusterBeginExRW.build(); + beginExRO.wrap(writeBuffer, 0, requestBeginEx.limit()); + return KafkaBeginExBuilder.this; + } + } } public final class KafkaResponseBeginExBuilder @@ -1307,6 +1345,13 @@ public KafkaAlterConfigsResponseBeginExBuilder alterConfigs() return new KafkaAlterConfigsResponseBeginExBuilder(); } + public KafkaDescribeClusterResponseBeginExBuilder describeCluster() + { + responseBeginExRW.kind(KafkaApi.DESCRIBE_CLUSTER.value()); + + return new KafkaDescribeClusterResponseBeginExBuilder(); + } + public KafkaBeginExBuilder build() { final KafkaResponseBeginExFW responseBeginEx = responseBeginExRW.build(); @@ -1541,6 +1586,125 @@ public KafkaAlterConfigsResponseBeginExBuilder build() } } } + + public final class KafkaDescribeClusterResponseBeginExBuilder + { + private final KafkaDescribeClusterResponseBeginExFW.Builder describeClusterBeginExRW = + new KafkaDescribeClusterResponseBeginExFW.Builder(); + + private KafkaDescribeClusterResponseBeginExBuilder() + { + describeClusterBeginExRW.wrap( + writeBuffer, + KafkaBeginExFW.FIELD_OFFSET_REQUEST + KafkaResponseBeginExFW.FIELD_OFFSET_DESCRIBE_CLUSTER, + writeBuffer.capacity()); + } + + public KafkaDescribeClusterResponseBeginExBuilder throttle( + int throttle) + { + describeClusterBeginExRW.throttle(throttle); + return this; + } + + public KafkaDescribeClusterResponseBeginExBuilder error( + short error) + { + describeClusterBeginExRW.error(error); + return this; + } + + public KafkaDescribeClusterResponseBeginExBuilder message( + String message) + { + describeClusterBeginExRW.message(message); + return this; + } + + public KafkaDescribeClusterResponseBeginExBuilder clusterId( + String clusterId) + { + describeClusterBeginExRW.clusterId(clusterId); + return this; + } + + public KafkaDescribeClusterResponseBeginExBuilder controllerId( + int controllerId) + { + describeClusterBeginExRW.controllerId(controllerId); + return this; + } + + public KafkaBrokerBuilder broker() + { + return new KafkaBrokerBuilder(); + } + + public KafkaDescribeClusterResponseBeginExBuilder authorizedOperations( + int authorizedOperations) + { + describeClusterBeginExRW.authorizedOperations(authorizedOperations); + return this; + } + + public KafkaBeginExBuilder build() + { + final KafkaDescribeClusterResponseBeginExFW responseBeginEx = describeClusterBeginExRW.build(); + beginExRO.wrap(writeBuffer, 0, responseBeginEx.limit()); + return KafkaBeginExBuilder.this; + } + + public final class KafkaBrokerBuilder + { + private final KafkaClusterBrokerFW.Builder brokerRW = new KafkaClusterBrokerFW.Builder(); + + KafkaBrokerBuilder() + { + MutableDirectBuffer topicBuffer = new UnsafeBuffer(new byte[1024 * 8]); + brokerRW.wrap(topicBuffer, 0, topicBuffer.capacity()); + } + + public KafkaBrokerBuilder brokerId( + int brokerId) + { + brokerRW.brokerId(brokerId); + return this; + } + + public KafkaBrokerBuilder host( + String host) + { + brokerRW.host(host); + return this; + } + + public KafkaBrokerBuilder port( + int port) + { + brokerRW.port(port); + return this; + } + + public KafkaBrokerBuilder rack( + String rack) + { + brokerRW.rack(rack); + return this; + } + + public KafkaDescribeClusterResponseBeginExBuilder build() + { + KafkaClusterBrokerFW broker = brokerRW.build(); + describeClusterBeginExRW.brokersItem(b -> b + .brokerId(broker.brokerId()) + .host(broker.host()) + .port(broker.port()) + .rack(broker.rack())); + + return KafkaDescribeClusterResponseBeginExBuilder.this; + } + } + } } public final class KafkaBootstrapBeginExBuilder @@ -6633,13 +6797,20 @@ public KafkaAlterConfigsRequestMatcherBuilder alterConfigs() return alterConfigsMatcher; } + public KafkaDescribeClusterRequestMatcherBuilder describeCluster() + { + KafkaDescribeClusterRequestMatcherBuilder describeClusterMatcher = + new KafkaDescribeClusterRequestMatcherBuilder(); + KafkaBeginExMatcherBuilder.this.caseMatcher = describeClusterMatcher::match; + return describeClusterMatcher; + } + public final class KafkaCreateTopicsRequestMatcherBuilder { private Array32FW.Builder topicsRW; private Integer timeout; private Boolean validateOnly; - private KafkaCreateTopicsRequestMatcherBuilder() { } @@ -6918,6 +7089,41 @@ private boolean matchValidateOnly( return validateOnly == null || validateOnly == (alterConfigsRequestBeginEx.validateOnly() != 0); } } + + public final class KafkaDescribeClusterRequestMatcherBuilder + { + private Boolean includeAuthorizedOperations; + + private KafkaDescribeClusterRequestMatcherBuilder() + { + } + + public KafkaDescribeClusterRequestMatcherBuilder includeAuthorizedOperations( + String includeAuthorizedOperations) + { + this.includeAuthorizedOperations = Boolean.valueOf(includeAuthorizedOperations); + return this; + } + + public KafkaBeginExMatcherBuilder build() + { + return KafkaBeginExMatcherBuilder.this; + } + + private boolean match( + KafkaBeginExFW beginEx) + { + KafkaDescribeClusterRequestBeginExFW describeCluster = beginEx.request().describeCluster(); + return matchIncludeAuthorizedOperations(describeCluster); + } + + private boolean matchIncludeAuthorizedOperations( + final KafkaDescribeClusterRequestBeginExFW describeClusterRequestBeginEx) + { + return includeAuthorizedOperations == null || + includeAuthorizedOperations == (describeClusterRequestBeginEx.includeAuthorizedOperations() != 0); + } + } } public final class KafkaResponseBeginExMatcherBuilder @@ -6952,6 +7158,14 @@ public KafkaAlterConfigsResponseMatcherBuilder alterConfigs() return alterConfigsMatcher; } + public KafkaDescribeClusterResponseMatcherBuilder describeCluster() + { + KafkaDescribeClusterResponseMatcherBuilder describeClusterMatcher = + new KafkaDescribeClusterResponseMatcherBuilder(); + KafkaBeginExMatcherBuilder.this.caseMatcher = describeClusterMatcher::match; + return describeClusterMatcher; + } + public final class KafkaCreateTopicsResponseMatcherBuilder { private Array32FW.Builder topicsRW; @@ -7231,6 +7445,186 @@ private boolean matchResources( return resourcesRW == null || resourcesRW.build().equals(alterConfigsResponseBeginEx.resources()); } } + + public final class KafkaDescribeClusterResponseMatcherBuilder + { + private Integer throttle; + private Short error; + private String16FW message; + private String16FW clusterId; + private Integer controllerId; + private Array32FW.Builder brokersRW; + private Integer authorizedOperations; + + private KafkaDescribeClusterResponseMatcherBuilder() + { + } + + public KafkaDescribeClusterResponseMatcherBuilder throttle( + int throttle) + { + this.throttle = throttle; + return this; + } + + public KafkaDescribeClusterResponseMatcherBuilder error( + short error) + { + this.error = error; + return this; + } + + public KafkaDescribeClusterResponseMatcherBuilder message( + String message) + { + this.message = new String16FW(message); + return this; + } + + public KafkaDescribeClusterResponseMatcherBuilder clusterId( + String clusterId) + { + this.clusterId = new String16FW(clusterId); + return this; + } + + public KafkaDescribeClusterResponseMatcherBuilder controllerId( + int controllerId) + { + this.controllerId = controllerId; + return this; + } + + public KafkaBrokerBuilder broker() + { + if (brokersRW == null) + { + brokersRW = new Array32FW.Builder<>(new KafkaClusterBrokerFW.Builder(), new KafkaClusterBrokerFW()) + .wrap(new UnsafeBuffer(new byte[1024]), 0, 1024); + } + + return new KafkaBrokerBuilder(); + } + + public KafkaDescribeClusterResponseMatcherBuilder authorizedOperations( + int authorizedOperations) + { + this.authorizedOperations = authorizedOperations; + return this; + } + + public KafkaBeginExMatcherBuilder build() + { + return KafkaBeginExMatcherBuilder.this; + } + + public final class KafkaBrokerBuilder + { + private final KafkaClusterBrokerFW.Builder brokerRW = new KafkaClusterBrokerFW.Builder(); + + KafkaBrokerBuilder() + { + MutableDirectBuffer topicBuffer = new UnsafeBuffer(new byte[1024 * 8]); + brokerRW.wrap(topicBuffer, 0, topicBuffer.capacity()); + } + + public KafkaBrokerBuilder brokerId( + int brokerId) + { + brokerRW.brokerId(brokerId); + return this; + } + + public KafkaBrokerBuilder host( + String host) + { + brokerRW.host(host); + return this; + } + + public KafkaBrokerBuilder port( + int port) + { + brokerRW.port(port); + return this; + } + + public KafkaBrokerBuilder rack( + String rack) + { + brokerRW.rack(rack); + return this; + } + + public KafkaDescribeClusterResponseMatcherBuilder build() + { + KafkaClusterBrokerFW broker = brokerRW.build(); + brokersRW.item(b -> b + .brokerId(broker.brokerId()) + .host(broker.host()) + .port(broker.port()) + .rack(broker.rack())); + + return KafkaDescribeClusterResponseMatcherBuilder.this; + } + } + + private boolean match( + KafkaBeginExFW beginEx) + { + final KafkaDescribeClusterResponseBeginExFW describeCluster = beginEx.response().describeCluster(); + return matchThrottle(describeCluster) && + matchError(describeCluster) && + matchMessage(describeCluster) && + matchClusterId(describeCluster) && + matchControllerId(describeCluster) && + matchBrokers(describeCluster) && + matchAuthorizedOperations(describeCluster); + } + + private boolean matchThrottle( + final KafkaDescribeClusterResponseBeginExFW describeClusterResponseBeginEx) + { + return throttle == null || throttle == describeClusterResponseBeginEx.throttle(); + } + + private boolean matchError( + final KafkaDescribeClusterResponseBeginExFW describeClusterResponseBeginEx) + { + return error == null || error == describeClusterResponseBeginEx.error(); + } + + private boolean matchMessage( + final KafkaDescribeClusterResponseBeginExFW describeClusterResponseBeginEx) + { + return message == null || message.equals(describeClusterResponseBeginEx.message()); + } + + private boolean matchClusterId( + final KafkaDescribeClusterResponseBeginExFW describeClusterResponseBeginEx) + { + return clusterId == null || clusterId.equals(describeClusterResponseBeginEx.clusterId()); + } + + private boolean matchControllerId( + final KafkaDescribeClusterResponseBeginExFW describeClusterResponseBeginEx) + { + return controllerId == null || controllerId == describeClusterResponseBeginEx.controllerId(); + } + + private boolean matchBrokers( + final KafkaDescribeClusterResponseBeginExFW describeClusterResponseBeginEx) + { + return brokersRW == null || brokersRW.build().equals(describeClusterResponseBeginEx.brokers()); + } + + private boolean matchAuthorizedOperations( + final KafkaDescribeClusterResponseBeginExFW describeClusterResponseBeginEx) + { + return authorizedOperations == null || + authorizedOperations == describeClusterResponseBeginEx.authorizedOperations(); + } + } } } diff --git a/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl b/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl index 0dec073605..5d14ff4db1 100644 --- a/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl +++ b/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl @@ -179,6 +179,7 @@ scope kafka GROUP (253), BOOTSTRAP (254), MERGED (255), + DESCRIBE_CLUSTER (60), ALTER_CONFIGS (33), INIT_PRODUCER_ID (22), DELETE_TOPICS (20), @@ -550,11 +551,17 @@ scope kafka uint8 validateOnly; } + struct KafkaDescribeClusterRequestBeginEx + { + uint8 includeAuthorizedOperations; + } + union KafkaRequestBeginEx switch (uint8) { - case 33: kafka::stream::KafkaAlterConfigsRequestBeginEx alterConfigs; - case 20: kafka::stream::KafkaDeleteTopicsRequestBeginEx deleteTopics; case 19: kafka::stream::KafkaCreateTopicsRequestBeginEx createTopics; + case 20: kafka::stream::KafkaDeleteTopicsRequestBeginEx deleteTopics; + case 33: kafka::stream::KafkaAlterConfigsRequestBeginEx alterConfigs; + case 60: kafka::stream::KafkaDescribeClusterRequestBeginEx describeCluster; } struct KafkaCreateTopicStatus @@ -596,11 +603,31 @@ scope kafka KafkaResourceStatus[] resources; } + struct KafkaClusterBroker + { + int32 brokerId; + string16 host; + int32 port; + string16 rack = null; + } + + struct KafkaDescribeClusterResponseBeginEx + { + int32 throttle; + int16 error; + string16 message = null; + string16 clusterId = null; + int32 controllerId; + KafkaClusterBroker[] brokers; + int32 authorizedOperations; + } + union KafkaResponseBeginEx switch (uint8) { - case 33: kafka::stream::KafkaAlterConfigsResponseBeginEx alterConfigs; - case 20: kafka::stream::KafkaDeleteTopicsResponseBeginEx deleteTopics; case 19: kafka::stream::KafkaCreateTopicsResponseBeginEx createTopics; + case 20: kafka::stream::KafkaDeleteTopicsResponseBeginEx deleteTopics; + case 33: kafka::stream::KafkaAlterConfigsResponseBeginEx alterConfigs; + case 60: kafka::stream::KafkaDescribeClusterResponseBeginEx describeCluster; } } diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/describe.cluster/cluster.brokers.info/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/describe.cluster/cluster.brokers.info/client.rpt new file mode 100644 index 0000000000..9b4a8a9db9 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/describe.cluster/cluster.brokers.info/client.rpt @@ -0,0 +1,54 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .request() + .describeCluster() + .includeAuthorizedOperations("false") + .build() + .build()} + +connected + +read zilla:begin.ext ${kafka:matchBeginEx() + .typeId(zilla:id("kafka")) + .response() + .describeCluster() + .throttle(0) + .error(0) + .clusterId("cluster-0") + .controllerId(0) + .broker() + .brokerId(1) + .host("broker1.example.com") + .port(9092) + .build() + .broker() + .brokerId(2) + .host("broker2.example.com") + .port(9092) + .build() + .authorizedOperations(0) + .build() + .build()} + +write close +read closed diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/describe.cluster/cluster.brokers.info/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/describe.cluster/cluster.brokers.info/server.rpt new file mode 100644 index 0000000000..7dbb348328 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/describe.cluster/cluster.brokers.info/server.rpt @@ -0,0 +1,59 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property serverAddress "zilla://streams/app0" + +accept ${serverAddress} + option zilla:window 8192 + option zilla:transmission "half-duplex" + +accepted + +read zilla:begin.ext ${kafka:matchBeginEx() + .typeId(zilla:id("kafka")) + .request() + .describeCluster() + .includeAuthorizedOperations("false") + .build() + .build()} + +connected + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .response() + .describeCluster() + .throttle(0) + .error(0) + .clusterId("cluster-0") + .controllerId(0) + .broker() + .brokerId(1) + .host("broker1.example.com") + .port(9092) + .build() + .broker() + .brokerId(2) + .host("broker2.example.com") + .port(9092) + .build() + .authorizedOperations(0) + .build() + .build()} +write flush + +read closed +write close diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/describe.cluster.v0/cluster.brokers.info/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/describe.cluster.v0/cluster.brokers.info/client.rpt new file mode 100644 index 0000000000..37fe52f5a9 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/describe.cluster.v0/cluster.brokers.info/client.rpt @@ -0,0 +1,54 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property networkConnectWindow 8192 + +property newRequestId ${kafka:newRequestId()} +property fetchWaitMax 500 +property fetchBytesMax 65535 +property partitionBytesMax 8192 + +connect "zilla://streams/net0" + option zilla:window ${networkConnectWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +connected + +write 16 # size + 60s # describe cluster + 0s # v0 + ${newRequestId} + 5s "zilla" # client id + [0x00] # resources + +read 97 # size + (int:newRequestId) + 0 # throttle time ms + 0s # error code + -1s # error message + 9s "cluster-0" # cluster id + 0 # controller id + 2 # brokers + 1 # broker id + 19s "broker1.example.com" # host + 9092 # port + -1s # rack + 2 # broker id + 19s "broker2.example.com" # host + 9092 # port + -1s # rack + 0 # cluster authorized operations diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/describe.cluster.v0/cluster.brokers.info/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/describe.cluster.v0/cluster.brokers.info/server.rpt new file mode 100644 index 0000000000..473762e7dc --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/describe.cluster.v0/cluster.brokers.info/server.rpt @@ -0,0 +1,52 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property networkAcceptWindow 8192 + +accept "zilla://streams/net0" + option zilla:window ${networkAcceptWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +accepted + +connected + +read 16 # size + 60s # describe cluster + 0s # v0 + (int:newRequestId) + 5s "zilla" # client id + [0x00] # resources + + +write 97 # size + ${newRequestId} + 0 # throttle time ms + 0s # error code + -1s # error message + 9s "cluster-0" # cluster id + 0 # controller id + 2 # brokers + 1 # broker id + 19s "broker1.example.com" # host + 9092 # port + -1s # rack + 2 # broker id + 19s "broker2.example.com" # host + 9092 # port + -1s # rack + 0 # cluster authorized operations diff --git a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctionsTest.java b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctionsTest.java index 615fb3f255..d74c7d9fbd 100644 --- a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctionsTest.java +++ b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctionsTest.java @@ -424,6 +424,26 @@ public void shouldGenerateAlterConfigsRequestBeginExtension() assertEquals(requestBeginEx.alterConfigs().resources().fieldCount(), 2); } + @Test + public void shouldGenerateDescribeClusterRequestBeginExtension() + { + byte[] build = KafkaFunctions.beginEx() + .typeId(0x01) + .request() + .describeCluster() + .includeAuthorizedOperations("false") + .build() + .build(); + + DirectBuffer buffer = new UnsafeBuffer(build); + KafkaBeginExFW beginEx = new KafkaBeginExFW().wrap(buffer, 0, buffer.capacity()); + assertEquals(0x1, beginEx.typeId()); + + final KafkaRequestBeginExFW requestBeginEx = beginEx.request(); + + assertEquals(requestBeginEx.describeCluster().includeAuthorizedOperations(), 0); + } + @Test public void shouldGenerateCreateTopicsResponseBeginExtension() { @@ -510,6 +530,35 @@ public void shouldGenerateAlterConfigsResponseBeginExtension() assertEquals(responseBeginEx.alterConfigs().resources().fieldCount(), 2); } + @Test + public void shouldGenerateDescribeClusterResponseBeginExtension() + { + byte[] build = KafkaFunctions.beginEx() + .typeId(0x01) + .response() + .describeCluster() + .throttle(0) + .error((short) 0) + .clusterId("cluster-0") + .controllerId(0) + .broker() + .brokerId(1) + .host("broker1.example.com") + .port(9092) + .build() + .authorizedOperations(0) + .build() + .build(); + + DirectBuffer buffer = new UnsafeBuffer(build); + KafkaBeginExFW beginEx = new KafkaBeginExFW().wrap(buffer, 0, buffer.capacity()); + assertEquals(0x1, beginEx.typeId()); + + final KafkaResponseBeginExFW responseBeginEx = beginEx.response(); + + assertEquals(responseBeginEx.describeCluster().brokers().fieldCount(), 1); + } + @Test public void shouldGenerateMergedBeginExtension() { @@ -2827,6 +2876,29 @@ public void shouldMatchAlterConfigsRequestBeginExtension() throws Exception assertNotNull(matcher.match(byteBuf)); } + @Test + public void shouldMatchDescribeClusterRequestBeginExtension() throws Exception + { + BytesMatcher matcher = KafkaFunctions.matchBeginEx() + .typeId(0x01) + .request() + .describeCluster() + .includeAuthorizedOperations("false") + .build() + .build(); + + ByteBuffer byteBuf = ByteBuffer.allocate(1024); + + new KafkaBeginExFW.Builder() + .wrap(new UnsafeBuffer(byteBuf), 0, byteBuf.capacity()) + .typeId(0x01) + .request(r -> r + .describeCluster(d -> d.includeAuthorizedOperations(0)) + .build()); + + assertNotNull(matcher.match(byteBuf)); + } + @Test public void shouldMatchCreateTopicsResponseBeginExtension() throws Exception { @@ -2925,6 +2997,47 @@ public void shouldMatchAlterConfigsResponseBeginExtension() throws Exception assertNotNull(matcher.match(byteBuf)); } + @Test + public void shouldMatchDescribeClusterResponseBeginExtension() throws Exception + { + BytesMatcher matcher = KafkaFunctions.matchBeginEx() + .typeId(0x01) + .response() + .describeCluster() + .throttle(0) + .error((short) 0) + .clusterId("cluster-0") + .controllerId(0) + .broker() + .brokerId(1) + .host("broker1.example.com") + .port(9092) + .build() + .authorizedOperations(0) + .build() + .build(); + + ByteBuffer byteBuf = ByteBuffer.allocate(1024); + + new KafkaBeginExFW.Builder() + .wrap(new UnsafeBuffer(byteBuf), 0, byteBuf.capacity()) + .typeId(0x01) + .response(r -> r + .describeCluster(d -> d + .throttle(0) + .error((short) 0) + .clusterId("cluster-0") + .controllerId(0) + .brokers(t -> t.item(i -> i + .brokerId(1) + .host("broker1.example.com") + .port(9092))) + .authorizedOperations(0))) + .build(); + + assertNotNull(matcher.match(byteBuf)); + } + @Test public void shouldMatchFetchBeginExtensionTopic() throws Exception { diff --git a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/DescribeClusterIT.java b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/DescribeClusterIT.java new file mode 100644 index 0000000000..3aeecf8123 --- /dev/null +++ b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/DescribeClusterIT.java @@ -0,0 +1,48 @@ +/* + * Copyright 2021-2023 Aklivity Inc. + * + * Aklivity licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.aklivity.zilla.specs.binding.kafka.streams.application; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.rules.RuleChain.outerRule; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.DisableOnDebug; +import org.junit.rules.TestRule; +import org.junit.rules.Timeout; + +import io.aklivity.k3po.runtime.junit.annotation.Specification; +import io.aklivity.k3po.runtime.junit.rules.K3poRule; + +public class DescribeClusterIT +{ + private final K3poRule k3po = new K3poRule() + .addScriptRoot("app", "io/aklivity/zilla/specs/binding/kafka/streams/application/describe.cluster"); + + private final TestRule timeout = new DisableOnDebug(new Timeout(5, SECONDS)); + + @Rule + public final TestRule chain = outerRule(k3po).around(timeout); + + @Test + @Specification({ + "${app}/cluster.brokers.info/client", + "${app}/cluster.brokers.info/server"}) + public void shouldDescribeClusterBrokerInfo() throws Exception + { + k3po.finish(); + } +} diff --git a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/network/DescribeClusterIT.java b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/network/DescribeClusterIT.java new file mode 100644 index 0000000000..416d2e8bf1 --- /dev/null +++ b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/network/DescribeClusterIT.java @@ -0,0 +1,48 @@ +/* + * Copyright 2021-2023 Aklivity Inc. + * + * Aklivity licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.aklivity.zilla.specs.binding.kafka.streams.network; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.rules.RuleChain.outerRule; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.DisableOnDebug; +import org.junit.rules.TestRule; +import org.junit.rules.Timeout; + +import io.aklivity.k3po.runtime.junit.annotation.Specification; +import io.aklivity.k3po.runtime.junit.rules.K3poRule; + +public class DescribeClusterIT +{ + private final K3poRule k3po = new K3poRule() + .addScriptRoot("net", "io/aklivity/zilla/specs/binding/kafka/streams/network/describe.cluster.v0"); + + private final TestRule timeout = new DisableOnDebug(new Timeout(5, SECONDS)); + + @Rule + public final TestRule chain = outerRule(k3po).around(timeout); + + @Test + @Specification({ + "${net}/cluster.brokers.info/client", + "${net}/cluster.brokers.info/server"}) + public void shouldDescribeClusterBrokerInfo() throws Exception + { + k3po.finish(); + } +}