From 597dcd2c6b579575aed217053e3e5cfc4c9dc446 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Mon, 4 Dec 2023 08:31:30 -0800 Subject: [PATCH 1/3] Start from historical offset if the group has no commit history --- .../binding/kafka/internal/stream/KafkaMergedFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaMergedFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaMergedFactory.java index 73144010de..be8adaab4f 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaMergedFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaMergedFactory.java @@ -1961,7 +1961,7 @@ private void onTopicOffsetFetchDataChanged( partitions.forEach(p -> offsetsByPartitionId.put(p.partitionId(), new KafkaPartitionOffset( p.partitionId(), - p.partitionOffset(), + p.partitionOffset() == LIVE.value() ? HISTORICAL.value() : p.partitionOffset(), 0, p.leaderEpoch(), p.metadata().asString()))); From 38d32ad9786351a4a0961fb063f35c9fa6bc4a36 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Mon, 4 Dec 2023 11:39:19 -0800 Subject: [PATCH 2/3] Rename the protocol --- .../kafka/internal/stream/KafkaCacheServerConsumerFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheServerConsumerFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheServerConsumerFactory.java index ce0de2592b..b47dd66691 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheServerConsumerFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheServerConsumerFactory.java @@ -652,7 +652,7 @@ private void doConsumerInitialBegin( .typeId(kafkaTypeId) .group(g -> g.groupId(groupId) - .protocol("highlander") + .protocol("rebalance") .timeout(timeout) .metadataLen(metadata.sizeof()) .metadata(metadata.buffer(), 0, metadata.sizeof())) From d471fd0b9343db5a780b8ed9169bc77df1bb6ac2 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Mon, 4 Dec 2023 12:03:59 -0800 Subject: [PATCH 3/3] Update scripts --- .../consumer/commit.acknowledge.message.offset/client.rpt | 4 ++-- .../consumer/commit.acknowledge.message.offset/server.rpt | 4 ++-- .../streams/application/group/partition.assignment/client.rpt | 2 +- .../streams/application/group/partition.assignment/server.rpt | 2 +- .../streams/application/group/reassign.new.topic/client.rpt | 4 ++-- .../streams/application/group/reassign.new.topic/server.rpt | 4 ++-- .../merged/unmerged.group.fetch.message.ack/client.rpt | 4 ++-- .../merged/unmerged.group.fetch.message.ack/server.rpt | 4 ++-- .../merged/unmerged.group.fetch.message.value/client.rpt | 4 ++-- .../merged/unmerged.group.fetch.message.value/server.rpt | 4 ++-- .../unmerged.group.produce.invalid.partition/client.rpt | 4 ++-- .../unmerged.group.produce.invalid.partition/server.rpt | 4 ++-- .../merged/unmerged.group.produce.message.value/client.rpt | 4 ++-- .../merged/unmerged.group.produce.message.value/server.rpt | 4 ++-- 14 files changed, 26 insertions(+), 26 deletions(-) diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/consumer/commit.acknowledge.message.offset/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/consumer/commit.acknowledge.message.offset/client.rpt index 99dc8ecdd7..3db9611ec3 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/consumer/commit.acknowledge.message.offset/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/consumer/commit.acknowledge.message.offset/client.rpt @@ -22,7 +22,7 @@ write zilla:begin.ext ${kafka:beginEx() .typeId(zilla:id("kafka")) .group() .groupId("client-1") - .protocol("highlander") + .protocol("rebalance") .timeout(45000) .metadata(kafka:memberMetadata() .consumerId("consumer-1") @@ -40,7 +40,7 @@ read zilla:begin.ext ${kafka:matchBeginEx() .typeId(zilla:id("kafka")) .group() .groupId("client-1") - .protocol("highlander") + .protocol("rebalance") .instanceId("zilla") .timeout(30000) .build() diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/consumer/commit.acknowledge.message.offset/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/consumer/commit.acknowledge.message.offset/server.rpt index 7d40aa7138..5174434b63 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/consumer/commit.acknowledge.message.offset/server.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/consumer/commit.acknowledge.message.offset/server.rpt @@ -26,7 +26,7 @@ read zilla:begin.ext ${kafka:matchBeginEx() .typeId(zilla:id("kafka")) .group() .groupId("client-1") - .protocol("highlander") + .protocol("rebalance") .timeout(45000) .metadata(kafka:memberMetadata() .consumerId("consumer-1") @@ -44,7 +44,7 @@ write zilla:begin.ext ${kafka:beginEx() .typeId(zilla:id("kafka")) .group() .groupId("client-1") - .protocol("highlander") + .protocol("rebalance") .instanceId("zilla") .timeout(30000) .build() diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/group/partition.assignment/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/group/partition.assignment/client.rpt index e7e5184c68..988d1a1100 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/group/partition.assignment/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/group/partition.assignment/client.rpt @@ -22,7 +22,7 @@ write zilla:begin.ext ${kafka:beginEx() .typeId(zilla:id("kafka")) .group() .groupId("client-1") - .protocol("highlander") + .protocol("rebalance") .timeout(45000) .metadata(kafka:memberMetadata() .consumerId("consumer-1") diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/group/partition.assignment/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/group/partition.assignment/server.rpt index 66ffcd34bb..b102c31df0 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/group/partition.assignment/server.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/group/partition.assignment/server.rpt @@ -26,7 +26,7 @@ read zilla:begin.ext ${kafka:matchBeginEx() .typeId(zilla:id("kafka")) .group() .groupId("client-1") - .protocol("highlander") + .protocol("rebalance") .timeout(45000) .metadata(kafka:memberMetadata() .consumerId("consumer-1") diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/group/reassign.new.topic/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/group/reassign.new.topic/client.rpt index 80c8bb119e..ccc26b884e 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/group/reassign.new.topic/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/group/reassign.new.topic/client.rpt @@ -22,7 +22,7 @@ write zilla:begin.ext ${kafka:beginEx() .typeId(zilla:id("kafka")) .group() .groupId("client-1") - .protocol("highlander") + .protocol("rebalance") .timeout(45000) .metadata(kafka:memberMetadata() .consumerId("consumer-1") @@ -40,7 +40,7 @@ read zilla:begin.ext ${kafka:matchBeginEx() .typeId(zilla:id("kafka")) .group() .groupId("client-1") - .protocol("highlander") + .protocol("rebalance") .instanceId("zilla") .timeout(30000) .build() diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/group/reassign.new.topic/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/group/reassign.new.topic/server.rpt index cd525e312f..cf1a74abf1 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/group/reassign.new.topic/server.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/group/reassign.new.topic/server.rpt @@ -26,7 +26,7 @@ read zilla:begin.ext ${kafka:matchBeginEx() .typeId(zilla:id("kafka")) .group() .groupId("client-1") - .protocol("highlander") + .protocol("rebalance") .timeout(45000) .metadata(kafka:memberMetadata() .consumerId("consumer-1") @@ -44,7 +44,7 @@ write zilla:begin.ext ${kafka:beginEx() .typeId(zilla:id("kafka")) .group() .groupId("client-1") - .protocol("highlander") + .protocol("rebalance") .instanceId("zilla") .timeout(30000) .build() diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.fetch.message.ack/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.fetch.message.ack/client.rpt index 8002081eeb..608dc73d7a 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.fetch.message.ack/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.fetch.message.ack/client.rpt @@ -116,7 +116,7 @@ write zilla:begin.ext ${kafka:beginEx() .typeId(zilla:id("kafka")) .group() .groupId("client-1") - .protocol("highlander") + .protocol("rebalance") .timeout(45000) .metadata(kafka:memberMetadata() .consumerId("consumer-1") @@ -134,7 +134,7 @@ read zilla:begin.ext ${kafka:matchBeginEx() .typeId(zilla:id("kafka")) .group() .groupId("client-1") - .protocol("highlander") + .protocol("rebalance") .timeout(30000) .build() .build()} diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.fetch.message.ack/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.fetch.message.ack/server.rpt index cc1e23fe81..71b5c0264b 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.fetch.message.ack/server.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.fetch.message.ack/server.rpt @@ -114,7 +114,7 @@ read zilla:begin.ext ${kafka:matchBeginEx() .typeId(zilla:id("kafka")) .group() .groupId("client-1") - .protocol("highlander") + .protocol("rebalance") .timeout(45000) .metadata(kafka:memberMetadata() .consumerId("consumer-1") @@ -132,7 +132,7 @@ write zilla:begin.ext ${kafka:beginEx() .typeId(zilla:id("kafka")) .group() .groupId("client-1") - .protocol("highlander") + .protocol("rebalance") .instanceId("zilla") .timeout(30000) .build() diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.fetch.message.value/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.fetch.message.value/client.rpt index 43e8de36b9..e42199851b 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.fetch.message.value/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.fetch.message.value/client.rpt @@ -116,7 +116,7 @@ write zilla:begin.ext ${kafka:beginEx() .typeId(zilla:id("kafka")) .group() .groupId("client-1") - .protocol("highlander") + .protocol("rebalance") .timeout(45000) .metadata(kafka:memberMetadata() .consumerId("consumer-1") @@ -134,7 +134,7 @@ read zilla:begin.ext ${kafka:matchBeginEx() .typeId(zilla:id("kafka")) .group() .groupId("client-1") - .protocol("highlander") + .protocol("rebalance") .instanceId("zilla") .timeout(30000) .build() diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.fetch.message.value/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.fetch.message.value/server.rpt index d6dbc0bd41..b95174c38d 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.fetch.message.value/server.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.fetch.message.value/server.rpt @@ -115,7 +115,7 @@ read zilla:begin.ext ${kafka:matchBeginEx() .typeId(zilla:id("kafka")) .group() .groupId("client-1") - .protocol("highlander") + .protocol("rebalance") .timeout(45000) .metadata(kafka:memberMetadata() .consumerId("consumer-1") @@ -133,7 +133,7 @@ write zilla:begin.ext ${kafka:beginEx() .typeId(zilla:id("kafka")) .group() .groupId("client-1") - .protocol("highlander") + .protocol("rebalance") .instanceId("zilla") .timeout(30000) .build() diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.invalid.partition/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.invalid.partition/client.rpt index 6d4bb4e42a..f43f28565c 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.invalid.partition/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.invalid.partition/client.rpt @@ -114,7 +114,7 @@ write zilla:begin.ext ${kafka:beginEx() .typeId(zilla:id("kafka")) .group() .groupId("client-1") - .protocol("highlander") + .protocol("rebalance") .timeout(0) .metadata(kafka:memberMetadata() .consumerId("consumer-1") @@ -132,7 +132,7 @@ read zilla:begin.ext ${kafka:matchBeginEx() .typeId(zilla:id("kafka")) .group() .groupId("client-1") - .protocol("highlander") + .protocol("rebalance") .timeout(30000) .build() .build()} diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.invalid.partition/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.invalid.partition/server.rpt index d6cb1fba53..4e31b9f587 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.invalid.partition/server.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.invalid.partition/server.rpt @@ -115,7 +115,7 @@ read zilla:begin.ext ${kafka:matchBeginEx() .typeId(zilla:id("kafka")) .group() .groupId("client-1") - .protocol("highlander") + .protocol("rebalance") .timeout(0) .metadata(kafka:memberMetadata() .consumerId("consumer-1") @@ -133,7 +133,7 @@ write zilla:begin.ext ${kafka:beginEx() .typeId(zilla:id("kafka")) .group() .groupId("client-1") - .protocol("highlander") + .protocol("rebalance") .timeout(30000) .build() .build()} diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.message.value/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.message.value/client.rpt index 066d56c375..791e2589b6 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.message.value/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.message.value/client.rpt @@ -114,7 +114,7 @@ write zilla:begin.ext ${kafka:beginEx() .typeId(zilla:id("kafka")) .group() .groupId("client-1") - .protocol("highlander") + .protocol("rebalance") .timeout(0) .metadata(kafka:memberMetadata() .consumerId("consumer-1") @@ -132,7 +132,7 @@ read zilla:begin.ext ${kafka:matchBeginEx() .typeId(zilla:id("kafka")) .group() .groupId("client-1") - .protocol("highlander") + .protocol("rebalance") .timeout(30000) .build() .build()} diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.message.value/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.message.value/server.rpt index d5ba82e974..9edfd5f2d7 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.message.value/server.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.message.value/server.rpt @@ -115,7 +115,7 @@ read zilla:begin.ext ${kafka:matchBeginEx() .typeId(zilla:id("kafka")) .group() .groupId("client-1") - .protocol("highlander") + .protocol("rebalance") .timeout(0) .metadata(kafka:memberMetadata() .consumerId("consumer-1") @@ -133,7 +133,7 @@ write zilla:begin.ext ${kafka:beginEx() .typeId(zilla:id("kafka")) .group() .groupId("client-1") - .protocol("highlander") + .protocol("rebalance") .timeout(30000) .build() .build()}