Skip to content
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
2cef977
Adjust padding to accommodate good enough headers and don't include …
akrambek Oct 25, 2023
d201582
Merge branch 'develop' into feature/consumer-group-cont
akrambek Oct 25, 2023
76bf9de
Merge branch 'feature/consumer-group-cont' into develop
akrambek Oct 26, 2023
29ae79c
Merge branch 'aklivity:develop' into develop
akrambek Oct 30, 2023
ec1b39e
Merge branch 'aklivity:develop' into develop
akrambek Oct 30, 2023
51a9f0e
Merge branch 'aklivity:develop' into develop
akrambek Oct 31, 2023
4394783
Merge branch 'aklivity:develop' into develop
akrambek Oct 31, 2023
e8696ce
Merge branch 'aklivity:develop' into develop
akrambek Nov 2, 2023
51c37b1
Merge branch 'aklivity:develop' into develop
akrambek Nov 2, 2023
5da5f04
Merge branch 'aklivity:develop' into develop
akrambek Nov 2, 2023
db1e17c
Merge branch 'aklivity:develop' into develop
akrambek Nov 4, 2023
40f73dc
Merge branch 'aklivity:develop' into develop
akrambek Nov 6, 2023
d1a0492
Merge branch 'aklivity:develop' into develop
akrambek Nov 23, 2023
45799ce
Merge branch 'aklivity:develop' into develop
akrambek Nov 29, 2023
b6f7884
WIP
akrambek Nov 29, 2023
3660eec
Fix offset commit
akrambek Nov 29, 2023
b26ea7f
Merge branch 'bug/commit' into feature/kafka-grpc-group
akrambek Nov 29, 2023
d98fb23
Adjust the scripts
akrambek Nov 29, 2023
cf2cbb9
Refactor
akrambek Nov 30, 2023
ef40877
Merge branch 'bug/commit' into feature/kafka-grpc-group
akrambek Nov 30, 2023
a613924
WIP
akrambek Nov 30, 2023
1e55162
Merge branch 'aklivity:develop' into develop
akrambek Nov 30, 2023
946f01d
WIP
akrambek Nov 30, 2023
45dc9e8
Merge branch 'develop' into feature/kafka-grpc-group
akrambek Nov 30, 2023
211d7c7
WIP
akrambek Nov 30, 2023
7281108
Fix typo
akrambek Nov 30, 2023
2a07377
WIP
akrambek Dec 2, 2023
fedc41f
Merge branch 'aklivity:develop' into develop
akrambek Dec 4, 2023
18a8d74
Merge branch 'aklivity:develop' into develop
akrambek Dec 4, 2023
f160aad
Merge branch 'aklivity:develop' into develop
akrambek Dec 4, 2023
e0e7d5a
Merge branch 'aklivity:develop' into develop
akrambek Dec 6, 2023
9f4a8a6
Merge branch 'aklivity:develop' into develop
akrambek Dec 8, 2023
c0d096f
Merge branch 'develop' into feature/kafka-grpc-group
akrambek Dec 8, 2023
512bedc
Apply feedback from PR
akrambek Dec 8, 2023
456f111
Merge branch 'aklivity:develop' into develop
akrambek Dec 8, 2023
d466096
Merge branch 'develop' into feature/kafka-grpc-group
akrambek Dec 8, 2023
8c2be5c
Apply feedback from PR
akrambek Dec 9, 2023
0d27262
Merge branch 'aklivity:develop' into develop
akrambek Dec 9, 2023
4436e2d
Merge branch 'develop' into feature/kafka-grpc-group
akrambek Dec 9, 2023
0457c75
Apply feedback from PR
akrambek Dec 9, 2023
1c0a33c
Fix formatting
akrambek Dec 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@
*/
package io.aklivity.zilla.runtime.binding.kafka.grpc.internal;


import io.aklivity.zilla.runtime.engine.Configuration;

public class KafkaGrpcConfiguration extends Configuration
{
private static final ConfigurationDef KAFKA_GRPC_CONFIG;
public static final PropertyDef<String> KAFKA_GROUP_ID;


static
{
final ConfigurationDef config = new ConfigurationDef("zilla.binding.kafka.grpc");
KAFKA_GROUP_ID = config.property("group.id.format", "zilla:%s-%s");
KAFKA_GRPC_CONFIG = config;
}

Expand All @@ -31,4 +35,9 @@ public KafkaGrpcConfiguration(
{
super(KAFKA_GRPC_CONFIG, config);
}

public String groupIdFormat()
{
return KAFKA_GROUP_ID.get(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.aklivity.zilla.runtime.binding.kafka.grpc.config.KafkaGrpcCorrelationConfig;
import io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.Array32FW;
import io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.KafkaHeaderFW;
import io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.KafkaOffsetFW;
import io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.OctetsFW;
import io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.stream.KafkaDataExFW;
import io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.stream.KafkaMergedFetchDataExFW;
Expand All @@ -32,6 +33,8 @@ public final class KafkaGrpcFetchHeaderHelper
private final OctetsFW methodRO = new OctetsFW();
private final OctetsFW replyToRO = new OctetsFW();
private final OctetsFW correlatedIdRO = new OctetsFW();
public int partitionId;
public long partitionOffset;

public OctetsFW service;
public OctetsFW method;
Expand Down Expand Up @@ -65,6 +68,11 @@ public void visit(
{
final KafkaMergedFetchDataExFW kafkaMergedFetchDataEx = dataEx.merged().fetch();
final Array32FW<KafkaHeaderFW> headers = kafkaMergedFetchDataEx.headers();
final KafkaOffsetFW partition = kafkaMergedFetchDataEx.partition();

partitionId = partition.partitionId();
partitionOffset = partition.partitionOffset();

headers.forEach(this::dispatch);
}
}
Expand Down

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions runtime/binding-kafka-grpc/src/main/zilla/internal.idl
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ scope internal
octets[methodLength] method = null;
int64 traceId;
int64 authorization;
int32 partitionId;
int64 partitionOffset;
uint8 flags = 3; // 0x01 FIN, 0x02 INIT, 0x04 INCOMPLETE, 0x08 SKIP
int32 reserved;
int32 valueLength;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1591,7 +1591,7 @@ private void doMergedReplyBegin(
if (capabilities == FETCH_ONLY)
{
doBegin(sender, originId, routedId, replyId, replySeq, replyAck, replyMax,
traceId, authorization, affinity, httpBeginExToKafka());
traceId, authorization, affinity, beginExToKafka());
}
else
{
Expand All @@ -1602,7 +1602,7 @@ private void doMergedReplyBegin(
doUnmergedFetchReplyWindowsIfNecessary(traceId);
}

private Flyweight.Builder.Visitor httpBeginExToKafka()
private Flyweight.Builder.Visitor beginExToKafka()
{
return (buffer, offset, maxLimit) ->
kafkaBeginExRW.wrap(buffer, offset, maxLimit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ write zilla:begin.ext ${kafka:beginEx()
.merged()
.capabilities("FETCH_ONLY")
.topic("requests")
.groupId("zilla-test-remote_server0")
.partition(-1, -2)
.filter()
.key("59410e57-3e0f-4b61-9328-f645a7968ac8-d41d8cd98f00b204e9800998ecf8427e")
Expand Down Expand Up @@ -75,7 +76,7 @@ read zilla:data.ext ${kafka:matchDataEx()
.typeId(zilla:id("kafka"))
.merged()
.fetch()
.partition(0, 3, 2)
.partition(0, 3, 3)
.progress(0, 4)
.progress(1, 1)
.key("59410e57-3e0f-4b61-9328-f645a7968ac8-d41d8cd98f00b204e9800998ecf8427e")
Expand All @@ -88,6 +89,30 @@ read zilla:data.ext ${kafka:matchDataEx()
.build()}
read zilla:data.null

write advise zilla:flush ${kafka:flushEx()
.typeId(zilla:id("kafka"))
.merged()
.consumer()
.progress(0, 2)
.build()
.build()}

write advise zilla:flush ${kafka:flushEx()
.typeId(zilla:id("kafka"))
.merged()
.consumer()
.progress(0, 3)
.build()
.build()}

write advise zilla:flush ${kafka:flushEx()
.typeId(zilla:id("kafka"))
.merged()
.consumer()
.progress(0, 4)
.build()
.build()}

write notify SENT_ASYNC_REQUEST

connect await SENT_ASYNC_REQUEST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ read zilla:begin.ext ${kafka:beginEx()
.merged()
.capabilities("FETCH_ONLY")
.topic("requests")
.groupId("zilla-test-remote_server0")
.partition(-1, -2)
.filter()
.key("59410e57-3e0f-4b61-9328-f645a7968ac8-d41d8cd98f00b204e9800998ecf8427e")
Expand Down Expand Up @@ -82,7 +83,7 @@ write zilla:data.ext ${kafka:dataEx()
.merged()
.fetch()
.timestamp(kafka:timestamp())
.partition(0, 3, 2)
.partition(0, 3, 3)
.progress(0, 4)
.progress(1, 1)
.key("59410e57-3e0f-4b61-9328-f645a7968ac8-d41d8cd98f00b204e9800998ecf8427e")
Expand All @@ -95,6 +96,30 @@ write zilla:data.ext ${kafka:dataEx()
.build()}
write flush

read advised zilla:flush ${kafka:flushEx()
.typeId(zilla:id("kafka"))
.merged()
.consumer()
.progress(0, 2)
.build()
.build()}

read advised zilla:flush ${kafka:flushEx()
.typeId(zilla:id("kafka"))
.merged()
.consumer()
.progress(0, 3)
.build()
.build()}

read advised zilla:flush ${kafka:flushEx()
.typeId(zilla:id("kafka"))
.merged()
.consumer()
.progress(0, 4)
.build()
.build()}

accepted

read zilla:begin.ext ${kafka:beginEx()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ write zilla:begin.ext ${kafka:beginEx()
.merged()
.capabilities("FETCH_ONLY")
.topic("requests")
.groupId("zilla-test-remote_server0")
.partition(-1, -2)
.filter()
.key("59410e57-3e0f-4b61-9328-f645a7968ac8-d41d8cd98f00b204e9800998ecf8427e")
Expand Down Expand Up @@ -57,7 +58,7 @@ read zilla:data.ext ${kafka:matchDataEx()
.merged()
.fetch()
.deferred(0)
.partition(-1, -1)
.partition(0, 2, 2)
.key("59410e57-3e0f-4b61-9328-f645a7968ac8-d41d8cd98f00b204e9800998ecf8427e")
.header("zilla:identity", "test")
.header("zilla:service", "example.EchoService")
Expand All @@ -69,6 +70,22 @@ read zilla:data.ext ${kafka:matchDataEx()
read zilla:data.null


write advise zilla:flush ${kafka:flushEx()
.typeId(zilla:id("kafka"))
.merged()
.consumer()
.progress(0, 2)
.build()
.build()}

write advise zilla:flush ${kafka:flushEx()
.typeId(zilla:id("kafka"))
.merged()
.consumer()
.progress(0, 3)
.build()
.build()}

write notify SENT_ASYNC_REQUEST

connect await SENT_ASYNC_REQUEST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ read zilla:begin.ext ${kafka:beginEx()
.merged()
.capabilities("FETCH_ONLY")
.topic("requests")
.groupId("zilla-test-remote_server0")
.partition(-1, -2)
.filter()
.key("59410e57-3e0f-4b61-9328-f645a7968ac8-d41d8cd98f00b204e9800998ecf8427e")
Expand Down Expand Up @@ -61,7 +62,7 @@ write zilla:data.ext ${kafka:dataEx()
.merged()
.fetch()
.deferred(0)
.partition(-1, -1)
.partition(0, 2, 2)
.key("59410e57-3e0f-4b61-9328-f645a7968ac8-d41d8cd98f00b204e9800998ecf8427e")
.header("zilla:identity", "test")
.header("zilla:service", "example.EchoService")
Expand All @@ -73,6 +74,22 @@ write zilla:data.ext ${kafka:dataEx()

write flush

read advised zilla:flush ${kafka:flushEx()
.typeId(zilla:id("kafka"))
.merged()
.consumer()
.progress(0, 2)
.build()
.build()}

read advised zilla:flush ${kafka:flushEx()
.typeId(zilla:id("kafka"))
.merged()
.consumer()
.progress(0, 3)
.build()
.build()}

accepted

read zilla:begin.ext ${kafka:beginEx()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ write zilla:begin.ext ${kafka:beginEx()
.merged()
.capabilities("FETCH_ONLY")
.topic("requests")
.groupId("zilla-test-remote_server0")
.partition(-1, -2)
.filter()
.key("59410e57-3e0f-4b61-9328-f645a7968ac8-d41d8cd98f00b204e9800998ecf8427e")
Expand Down Expand Up @@ -75,7 +76,7 @@ read zilla:data.ext ${kafka:matchDataEx()
.typeId(zilla:id("kafka"))
.merged()
.fetch()
.partition(0, 3, 2)
.partition(0, 3, 3)
.progress(0, 4)
.progress(1, 1)
.key("59410e57-3e0f-4b61-9328-f645a7968ac8-d41d8cd98f00b204e9800998ecf8427e")
Expand All @@ -88,6 +89,30 @@ read zilla:data.ext ${kafka:matchDataEx()
.build()}
read zilla:data.null

write advise zilla:flush ${kafka:flushEx()
.typeId(zilla:id("kafka"))
.merged()
.consumer()
.progress(0, 2)
.build()
.build()}

write advise zilla:flush ${kafka:flushEx()
.typeId(zilla:id("kafka"))
.merged()
.consumer()
.progress(0, 3)
.build()
.build()}

write advise zilla:flush ${kafka:flushEx()
.typeId(zilla:id("kafka"))
.merged()
.consumer()
.progress(0, 4)
.build()
.build()}

write notify SENT_ASYNC_REQUEST

connect await SENT_ASYNC_REQUEST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ read zilla:begin.ext ${kafka:beginEx()
.merged()
.capabilities("FETCH_ONLY")
.topic("requests")
.groupId("zilla-test-remote_server0")
.partition(-1, -2)
.filter()
.key("59410e57-3e0f-4b61-9328-f645a7968ac8-d41d8cd98f00b204e9800998ecf8427e")
Expand Down Expand Up @@ -82,7 +83,7 @@ write zilla:data.ext ${kafka:dataEx()
.merged()
.fetch()
.timestamp(kafka:timestamp())
.partition(0, 3, 2)
.partition(0, 3, 3)
.progress(0, 4)
.progress(1, 1)
.key("59410e57-3e0f-4b61-9328-f645a7968ac8-d41d8cd98f00b204e9800998ecf8427e")
Expand All @@ -95,6 +96,31 @@ write zilla:data.ext ${kafka:dataEx()
.build()}
write flush

read advised zilla:flush ${kafka:flushEx()
.typeId(zilla:id("kafka"))
.merged()
.consumer()
.progress(0, 2)
.build()
.build()}

read advised zilla:flush ${kafka:flushEx()
.typeId(zilla:id("kafka"))
.merged()
.consumer()
.progress(0, 3)
.build()
.build()}


read advised zilla:flush ${kafka:flushEx()
.typeId(zilla:id("kafka"))
.merged()
.consumer()
.progress(0, 4)
.build()
.build()}

accepted

read zilla:begin.ext ${kafka:beginEx()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ write zilla:begin.ext ${kafka:beginEx()
.merged()
.capabilities("FETCH_ONLY")
.topic("requests")
.groupId("zilla-test-remote_server0")
.partition(-1, -2)
.filter()
.key("59410e57-3e0f-4b61-9328-f645a7968ac8-d41d8cd98f00b204e9800998ecf8427e")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ read zilla:begin.ext ${kafka:beginEx()
.merged()
.capabilities("FETCH_ONLY")
.topic("requests")
.groupId("zilla-test-remote_server0")
.partition(-1, -2)
.filter()
.key("59410e57-3e0f-4b61-9328-f645a7968ac8-d41d8cd98f00b204e9800998ecf8427e")
Expand Down
Loading