Skip to content

Commit 615da8f

Browse files
authored
Describe cluster API Support (#1250)
1 parent 1ecb3e2 commit 615da8f

13 files changed

Lines changed: 2479 additions & 5 deletions

File tree

runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientDescribeClusterFactory.java

Lines changed: 1531 additions & 0 deletions
Large diffs are not rendered by default.

runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientRequestFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,12 @@ public KafkaClientRequestFactory(
5757
{
5858
final KafkaClientCreateTopicsFactory clientCreateTopicsFactory = new KafkaClientCreateTopicsFactory(
5959
config, context, supplyBinding, supplyDebitor, signaler, streamFactory, resolveSasl);
60+
final KafkaClientDescribeClusterFactory clientDescribeClusterFactory = new KafkaClientDescribeClusterFactory(
61+
config, context, supplyBinding, supplyDebitor, signaler, streamFactory, resolveSasl);
6062

6163
final Int2ObjectHashMap<BindingHandler> factories = new Int2ObjectHashMap<>();
6264
factories.put(KafkaApi.CREATE_TOPICS.value(), clientCreateTopicsFactory);
65+
factories.put(KafkaApi.DESCRIBE_CLUSTER.value(), clientDescribeClusterFactory);
6366

6467
this.kafkaTypeId = context.supplyTypeId(KafkaBinding.NAME);
6568
this.factories = factories;

runtime/binding-kafka/src/main/zilla/protocol.idl

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -581,6 +581,38 @@ scope protocol
581581
}
582582
}
583583

584+
scope describe_cluster
585+
{
586+
struct DescribeClusterRequest // v0
587+
{
588+
uint8 includeAuthorizedOperations;
589+
}
590+
591+
struct DescribeClusterResponse
592+
{
593+
int32 correlationId;
594+
int32 throttle;
595+
int16 error;
596+
string16 message;
597+
string16 clusterId = null;
598+
int32 controllerId;
599+
int32 brokerCount;
600+
}
601+
602+
struct ClusterBroker
603+
{
604+
int32 brokerId;
605+
string16 host;
606+
int32 port;
607+
string16 rack = null;
608+
}
609+
610+
struct DescribeClusterResponsePart2
611+
{
612+
int32 authorizedOperations;
613+
}
614+
}
615+
584616
scope create_topics
585617
{
586618
struct CreateTopicsRequest // v3
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright 2021-2023 Aklivity Inc.
3+
*
4+
* Aklivity licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package io.aklivity.zilla.runtime.binding.kafka.internal.stream;
17+
18+
import static java.util.concurrent.TimeUnit.SECONDS;
19+
import static org.junit.rules.RuleChain.outerRule;
20+
21+
import org.junit.Rule;
22+
import org.junit.Test;
23+
import org.junit.rules.DisableOnDebug;
24+
import org.junit.rules.TestRule;
25+
import org.junit.rules.Timeout;
26+
27+
import io.aklivity.k3po.runtime.junit.annotation.Specification;
28+
import io.aklivity.k3po.runtime.junit.rules.K3poRule;
29+
import io.aklivity.zilla.runtime.engine.test.EngineRule;
30+
import io.aklivity.zilla.runtime.engine.test.annotation.Configuration;
31+
32+
public class DescribeClusterIT
33+
{
34+
private final K3poRule k3po = new K3poRule()
35+
.addScriptRoot("app", "io/aklivity/zilla/specs/binding/kafka/streams/application/describe.cluster")
36+
.addScriptRoot("net", "io/aklivity/zilla/specs/binding/kafka/streams/network/describe.cluster.v0");
37+
38+
private final TestRule timeout = new DisableOnDebug(new Timeout(5, SECONDS));
39+
40+
private final EngineRule engine = new EngineRule()
41+
.directory("target/zilla-itests")
42+
.countersBufferCapacity(8192)
43+
.configurationRoot("io/aklivity/zilla/specs/binding/kafka/config")
44+
.external("net0")
45+
.clean();
46+
47+
@Rule
48+
public final TestRule chain = outerRule(engine).around(k3po).around(timeout);
49+
50+
@Test
51+
@Configuration("client.yaml")
52+
@Specification({
53+
"${app}/cluster.brokers.info/client",
54+
"${net}/cluster.brokers.info/server"})
55+
public void shouldDescribeClusterBrokerInfo() throws Exception
56+
{
57+
k3po.finish();
58+
}
59+
}

0 commit comments

Comments
 (0)