Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
WIP
  • Loading branch information
akrambek committed Sep 17, 2024
commit 22aff20a39e7672f53308d05930e09043c7e7e46
2 changes: 1 addition & 1 deletion runtime/binding-kafka/src/main/zilla/protocol.idl
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ scope protocol
uint8 includeAuthorizedOperations;
}

struct ClusterBroker
struct ClusterBroker
{
int32 brokerId;
string16 host;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaApi;
import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaBeginExFW;
import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaBootstrapBeginExFW;
import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaClusterBrokerFW;
import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaConsumerAssignmentFW;
import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaConsumerBeginExFW;
import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaConsumerDataExFW;
Expand All @@ -81,6 +82,8 @@
import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaDeleteTopicsRequestBeginExFW;
import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaDeleteTopicsResponseBeginExFW;
import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaDescribeBeginExFW;
import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaDescribeClusterRequestBeginExFW;
import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaDescribeClusterResponseBeginExFW;
import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaDescribeDataExFW;
import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaFetchBeginExFW;
import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaFetchDataExFW;
Expand Down Expand Up @@ -1056,6 +1059,13 @@ public KafkaRequestAlterConfigsBeginExBuilder alterConfigs()
return new KafkaRequestAlterConfigsBeginExBuilder();
}

public KafkaDescribeClusterRequestBeginExBuilder describeCluster()
{
requestBeginExRW.kind(KafkaApi.DESCRIBE_CLUSTER.value());

return new KafkaDescribeClusterRequestBeginExBuilder();
}

public KafkaBeginExBuilder build()
{
final KafkaRequestBeginExFW requestBeginEx = requestBeginExRW.build();
Expand Down Expand Up @@ -1275,6 +1285,34 @@ public KafkaRequestAlterConfigsBeginExBuilder build()
}
}
}

public final class KafkaDescribeClusterRequestBeginExBuilder
{
private final KafkaDescribeClusterRequestBeginExFW.Builder describeClusterBeginExRW =
new KafkaDescribeClusterRequestBeginExFW.Builder();

private KafkaDescribeClusterRequestBeginExBuilder()
{
describeClusterBeginExRW.wrap(
writeBuffer,
KafkaBeginExFW.FIELD_OFFSET_REQUEST + KafkaRequestBeginExFW.FIELD_OFFSET_DESCRIBE_CLUSTER,
writeBuffer.capacity());
}

public KafkaDescribeClusterRequestBeginExBuilder includeAuthorizedOperations(
String include)
{
describeClusterBeginExRW.includeAuthorizedOperations(Boolean.parseBoolean(include) ? 1 : 0);
return this;
}

public KafkaBeginExBuilder build()
{
final KafkaDescribeClusterRequestBeginExFW requestBeginEx = describeClusterBeginExRW.build();
beginExRO.wrap(writeBuffer, 0, requestBeginEx.limit());
return KafkaBeginExBuilder.this;
}
}
}

public final class KafkaResponseBeginExBuilder
Expand Down Expand Up @@ -1307,6 +1345,13 @@ public KafkaAlterConfigsResponseBeginExBuilder alterConfigs()
return new KafkaAlterConfigsResponseBeginExBuilder();
}

public KafkaDescribeClusterResponseBeginExBuilder describeCluster()
{
responseBeginExRW.kind(KafkaApi.DESCRIBE_CLUSTER.value());

return new KafkaDescribeClusterResponseBeginExBuilder();
}

public KafkaBeginExBuilder build()
{
final KafkaResponseBeginExFW responseBeginEx = responseBeginExRW.build();
Expand Down Expand Up @@ -1541,6 +1586,125 @@ public KafkaAlterConfigsResponseBeginExBuilder build()
}
}
}

public final class KafkaDescribeClusterResponseBeginExBuilder
{
private final KafkaDescribeClusterResponseBeginExFW.Builder describeClusterBeginExRW =
new KafkaDescribeClusterResponseBeginExFW.Builder();

private KafkaDescribeClusterResponseBeginExBuilder()
{
describeClusterBeginExRW.wrap(
writeBuffer,
KafkaBeginExFW.FIELD_OFFSET_REQUEST + KafkaResponseBeginExFW.FIELD_OFFSET_DESCRIBE_CLUSTER,
writeBuffer.capacity());
}

public KafkaDescribeClusterResponseBeginExBuilder throttle(
int throttle)
{
describeClusterBeginExRW.throttle(throttle);
return this;
}

public KafkaDescribeClusterResponseBeginExBuilder error(
short error)
{
describeClusterBeginExRW.error(error);
return this;
}

public KafkaDescribeClusterResponseBeginExBuilder message(
String message)
{
describeClusterBeginExRW.message(message);
return this;
}

public KafkaDescribeClusterResponseBeginExBuilder clusterId(
String clusterId)
{
describeClusterBeginExRW.clusterId(clusterId);
return this;
}

public KafkaDescribeClusterResponseBeginExBuilder controllerId(
int controllerId)
{
describeClusterBeginExRW.controllerId(controllerId);
return this;
}

public KafkaBrokerBuilder broker()
{
return new KafkaBrokerBuilder();
}

public KafkaDescribeClusterResponseBeginExBuilder authorizedOperations(
int authorizedOperations)
{
describeClusterBeginExRW.authorizedOperations(authorizedOperations);
return this;
}

public KafkaBeginExBuilder build()
{
final KafkaDescribeClusterResponseBeginExFW responseBeginEx = describeClusterBeginExRW.build();
beginExRO.wrap(writeBuffer, 0, responseBeginEx.limit());
return KafkaBeginExBuilder.this;
}

public final class KafkaBrokerBuilder
{
private final KafkaClusterBrokerFW.Builder brokerRW = new KafkaClusterBrokerFW.Builder();

KafkaBrokerBuilder()
{
MutableDirectBuffer topicBuffer = new UnsafeBuffer(new byte[1024 * 8]);
brokerRW.wrap(topicBuffer, 0, topicBuffer.capacity());
}

public KafkaBrokerBuilder brokerId(
int brokerId)
{
brokerRW.brokerId(brokerId);
return this;
}

public KafkaBrokerBuilder host(
String host)
{
brokerRW.host(host);
return this;
}

public KafkaBrokerBuilder port(
int port)
{
brokerRW.port(port);
return this;
}

public KafkaBrokerBuilder rack(
String rack)
{
brokerRW.rack(rack);
return this;
}

public KafkaDescribeClusterResponseBeginExBuilder build()
{
KafkaClusterBrokerFW broker = brokerRW.build();
describeClusterBeginExRW.brokersItem(b -> b
.brokerId(broker.brokerId())
.host(broker.host())
.port(broker.port())
.rack(broker.rack()));

return KafkaDescribeClusterResponseBeginExBuilder.this;
}
}
}
}

public final class KafkaBootstrapBeginExBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ scope kafka
GROUP (253),
BOOTSTRAP (254),
MERGED (255),
DESCRIBE_CLUSTER (60),
ALTER_CONFIGS (33),
INIT_PRODUCER_ID (22),
DELETE_TOPICS (20),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,26 @@ public void shouldGenerateAlterConfigsRequestBeginExtension()
assertEquals(requestBeginEx.alterConfigs().resources().fieldCount(), 2);
}

@Test
public void shouldGenerateDescribeClusterRequestBeginExtension()
{
byte[] build = KafkaFunctions.beginEx()
.typeId(0x01)
.request()
.describeCluster()
.includeAuthorizedOperations("false")
.build()
.build();

DirectBuffer buffer = new UnsafeBuffer(build);
KafkaBeginExFW beginEx = new KafkaBeginExFW().wrap(buffer, 0, buffer.capacity());
assertEquals(0x1, beginEx.typeId());

final KafkaRequestBeginExFW requestBeginEx = beginEx.request();

assertEquals(requestBeginEx.describeCluster().includeAuthorizedOperations(), 0);
}

@Test
public void shouldGenerateCreateTopicsResponseBeginExtension()
{
Expand Down Expand Up @@ -510,6 +530,35 @@ public void shouldGenerateAlterConfigsResponseBeginExtension()
assertEquals(responseBeginEx.alterConfigs().resources().fieldCount(), 2);
}

@Test
public void shouldGenerateDescribeClusterResponseBeginExtension()
{
byte[] build = KafkaFunctions.beginEx()
.typeId(0x01)
.response()
.describeCluster()
.throttle(0)
.error((short) 0)
.clusterId("cluster-0")
.controllerId(0)
.broker()
.brokerId(1)
.host("broker1.example.com")
.port(9092)
.build()
.authorizedOperations(0)
.build()
.build();

DirectBuffer buffer = new UnsafeBuffer(build);
KafkaBeginExFW beginEx = new KafkaBeginExFW().wrap(buffer, 0, buffer.capacity());
assertEquals(0x1, beginEx.typeId());

final KafkaResponseBeginExFW responseBeginEx = beginEx.response();

assertEquals(responseBeginEx.describeCluster().brokers().fieldCount(), 1);
}

@Test
public void shouldGenerateMergedBeginExtension()
{
Expand Down