diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/cache/KafkaCachePartition.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/cache/KafkaCachePartition.java index 6576475afa..c5ed23f91c 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/cache/KafkaCachePartition.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/cache/KafkaCachePartition.java @@ -72,6 +72,8 @@ public final class KafkaCachePartition private static final long NO_ANCESTOR_OFFSET = -1L; private static final long NO_DESCENDANT_OFFSET = -1L; private static final int NO_SEQUENCE = -1; + private static final short NO_PRODUCER_ID = -1; + private static final short NO_PRODUCER_EPOCH = -1; private static final int NO_ACKNOWLEDGE = 0; private static final int NO_DELTA_POSITION = -1; @@ -98,7 +100,8 @@ public final class KafkaCachePartition private final KafkaCacheEntryFW logEntryRO = new KafkaCacheEntryFW(); private final KafkaCacheDeltaFW deltaEntryRO = new KafkaCacheDeltaFW(); - private final MutableDirectBuffer entryInfo = new UnsafeBuffer(new byte[6 * Long.BYTES + 3 * Integer.BYTES + Short.BYTES]); + private final MutableDirectBuffer entryInfo = + new UnsafeBuffer(new byte[7 * Long.BYTES + 3 * Integer.BYTES + 2 * Short.BYTES]); private final MutableDirectBuffer valueInfo = new UnsafeBuffer(new byte[Integer.BYTES]); private final Array32FW headersRO = new Array32FW(new KafkaHeaderFW()); @@ -374,12 +377,14 @@ public void writeEntryStart( entryInfo.putLong(Long.BYTES, timestamp); entryInfo.putLong(2 * Long.BYTES, producerId); entryInfo.putLong(3 * Long.BYTES, NO_ACKNOWLEDGE); - entryInfo.putInt(4 * Long.BYTES, NO_SEQUENCE); - entryInfo.putLong(4 * Long.BYTES + Integer.BYTES, ancestorOffset); - entryInfo.putLong(5 * Long.BYTES + Integer.BYTES, NO_DESCENDANT_OFFSET); - entryInfo.putInt(6 * Long.BYTES + Integer.BYTES, entryFlags); - entryInfo.putInt(6 * Long.BYTES + 2 * Integer.BYTES, deltaPosition); - entryInfo.putShort(6 * Long.BYTES + 3 * Integer.BYTES, KafkaAckMode.NONE.value()); + entryInfo.putLong(4 * Long.BYTES, NO_PRODUCER_ID); + entryInfo.putShort(5 * Long.BYTES, NO_PRODUCER_EPOCH); + entryInfo.putInt(5 * Long.BYTES + Short.BYTES, NO_SEQUENCE); + entryInfo.putLong(5 * Long.BYTES + Integer.BYTES + Short.BYTES, ancestorOffset); + entryInfo.putLong(6 * Long.BYTES + Integer.BYTES + Short.BYTES, NO_DESCENDANT_OFFSET); + entryInfo.putInt(7 * Long.BYTES + Integer.BYTES + Short.BYTES, entryFlags); + entryInfo.putInt(7 * Long.BYTES + 2 * Integer.BYTES + Short.BYTES, deltaPosition); + entryInfo.putShort(7 * Long.BYTES + 3 * Integer.BYTES + Short.BYTES, KafkaAckMode.NONE.value()); logFile.appendBytes(entryInfo); logFile.appendBytes(key); @@ -554,13 +559,14 @@ public void writeProduceEntryStart( entryInfo.putLong(Long.BYTES, timestamp); entryInfo.putLong(2 * Long.BYTES, ownerId); entryInfo.putLong(3 * Long.BYTES, NO_ACKNOWLEDGE); - entryInfo.putInt(4 * Long.BYTES, sequence); - entryInfo.putLong(4 * Long.BYTES + Integer.BYTES, NO_ANCESTOR_OFFSET); - entryInfo.putLong(5 * Long.BYTES + Integer.BYTES, NO_DESCENDANT_OFFSET); - entryInfo.putInt(6 * Long.BYTES + Integer.BYTES, 0x00); - entryInfo.putInt(6 * Long.BYTES + 2 * Integer.BYTES, NO_DELTA_POSITION); - entryInfo.putShort(6 * Long.BYTES + 3 * Integer.BYTES, ackMode.value()); - + entryInfo.putLong(4 * Long.BYTES, producerId); + entryInfo.putShort(5 * Long.BYTES, producerEpoch); + entryInfo.putInt(5 * Long.BYTES + Short.BYTES, sequence); + entryInfo.putLong(5 * Long.BYTES + Integer.BYTES + Short.BYTES, NO_ANCESTOR_OFFSET); + entryInfo.putLong(6 * Long.BYTES + Integer.BYTES + Short.BYTES, NO_DESCENDANT_OFFSET); + entryInfo.putInt(7 * Long.BYTES + Integer.BYTES + Short.BYTES, 0x00); + entryInfo.putInt(7 * Long.BYTES + 2 * Integer.BYTES + Short.BYTES, NO_DELTA_POSITION); + entryInfo.putShort(7 * Long.BYTES + 3 * Integer.BYTES + Short.BYTES, ackMode.value()); logFile.appendBytes(entryInfo); logFile.appendBytes(key); diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheServerProduceFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheServerProduceFactory.java index eb07549b80..416cb47711 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheServerProduceFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheServerProduceFactory.java @@ -1171,6 +1171,8 @@ private void doProduceInitialData( { final long partitionOffset = nextEntry.offset$(); final long timestamp = nextEntry.timestamp(); + final long producerId = nextEntry.producerId(); + final short producerEpoch = nextEntry.producerEpoch(); final int sequence = nextEntry.sequence(); final KafkaAckMode ackMode = KafkaAckMode.valueOf(nextEntry.ackMode()); final KafkaKeyFW key = nextEntry.key(); @@ -1234,11 +1236,11 @@ private void doProduceInitialData( switch (flags) { case FLAG_INIT | FLAG_FIN: - doServerInitialDataFull(traceId, timestamp, sequence, checksum, + doServerInitialDataFull(traceId, timestamp, producerId, producerEpoch, sequence, checksum, ackMode, key, headers, trailers, fragment, reserved, flags); break; case FLAG_INIT: - doServerInitialDataInit(traceId, deferred, timestamp, sequence, + doServerInitialDataInit(traceId, deferred, timestamp, producerId, producerEpoch, sequence, checksum, ackMode, key, headers, trailers, fragment, reserved, flags); break; case FLAG_NONE: @@ -1277,6 +1279,8 @@ private void doProduceInitialData( private void doServerInitialDataFull( long traceId, long timestamp, + long producerId, + short produceEpoch, int sequence, long checksum, KafkaAckMode ackMode, @@ -1291,6 +1295,8 @@ private void doServerInitialDataFull( ex -> ex.set((b, o, l) -> kafkaDataExRW.wrap(b, o, l) .typeId(kafkaTypeId) .produce(f -> f.timestamp(timestamp) + .producerId(producerId) + .producerEpoch(produceEpoch) .sequence(sequence) .crc32c(checksum) .ackMode(a -> a.set(ackMode)) @@ -1308,6 +1314,8 @@ private void doServerInitialDataInit( long traceId, int deferred, long timestamp, + long producerId, + short produceEpoch, int sequence, long checksum, KafkaAckMode ackMode, @@ -1323,6 +1331,8 @@ private void doServerInitialDataInit( .typeId(kafkaTypeId) .produce(f -> f.deferred(deferred) .timestamp(timestamp) + .producerId(producerId) + .producerEpoch(produceEpoch) .sequence(sequence) .crc32c(checksum) .ackMode(a -> a.set(ackMode)) diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java index b2fb009d15..13ef521647 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java @@ -93,9 +93,10 @@ public final class KafkaClientProduceFactory extends KafkaClientSaslHandshaker i private static final byte RECORD_BATCH_MAGIC = 2; private static final short RECORD_BATCH_ATTRIBUTES_NONE = 0; private static final short RECORD_BATCH_ATTRIBUTES_NO_TIMESTAMP = 0x08; - private static final int RECORD_BATCH_PRODUCER_ID_NONE = -1; + private static final long RECORD_BATCH_PRODUCER_ID_NONE = -1; private static final short RECORD_BATCH_PRODUCER_EPOCH_NONE = -1; - private static final short RECORD_BATCH_SEQUENCE_NONE = -1; + private static final int RECORD_BATCH_BASE_SEQUENCE_NONE = -1; + private static final int RECORD_SEQUENCE_NONE = -1; private static final byte RECORD_ATTRIBUTES_NONE = 0; private static final String TRANSACTION_ID_NONE = null; @@ -531,6 +532,9 @@ private int flushRecordInit( assert kafkaDataEx.kind() == KafkaDataExFW.KIND_PRODUCE; final KafkaProduceDataExFW kafkaProduceDataEx = kafkaDataEx.produce(); final long timestamp = kafkaProduceDataEx.timestamp(); + final long producerId = kafkaProduceDataEx.producerId(); + final short producerEpoch = kafkaProduceDataEx.producerEpoch(); + final int sequence = kafkaProduceDataEx.sequence(); final KafkaAckMode ackMode = kafkaProduceDataEx.ackMode().get(); final KafkaKeyFW key = kafkaProduceDataEx.key(); final Array32FW headers = kafkaProduceDataEx.headers(); @@ -542,11 +546,23 @@ private int flushRecordInit( final int maxEncodeableBytes = client.encodeSlotLimit + client.valueCompleteSize + produceRecordFramingSize; if (client.encodeSlot != NO_SLOT && - maxEncodeableBytes > encodePool.slotCapacity()) + (maxEncodeableBytes > encodePool.slotCapacity() || + client.producerId != producerId || + client.producerEpoch != producerEpoch || + sequence <= client.sequence)) { client.doEncodeRequestIfNecessary(traceId, budgetId); } + if (client.producerId == RECORD_BATCH_PRODUCER_ID_NONE) + { + client.baseSequence = sequence; + } + + client.producerId = producerId; + client.producerEpoch = producerEpoch; + client.sequence = sequence; + client.doEncodeRecordInit(traceId, timestamp, ackMode, key, payload, headers); if (client.encodeSlot != NO_SLOT) { @@ -1237,6 +1253,11 @@ private final class KafkaProduceClient extends KafkaSaslClient private LongLongConsumer encoder; private boolean flushable; + private long producerId = RECORD_BATCH_PRODUCER_ID_NONE; + private short producerEpoch = RECORD_BATCH_PRODUCER_EPOCH_NONE; + private int baseSequence = RECORD_BATCH_BASE_SEQUENCE_NONE; + private int sequence = RECORD_SEQUENCE_NONE; + KafkaProduceClient( KafkaProduceStream stream, long resolvedId, @@ -1878,6 +1899,9 @@ private void doEncodeProduceRequest( ? RECORD_BATCH_ATTRIBUTES_NO_TIMESTAMP : RECORD_BATCH_ATTRIBUTES_NONE; + final int baseSequence = client.producerId == RECORD_BATCH_PRODUCER_ID_NONE ? RECORD_BATCH_BASE_SEQUENCE_NONE : + client.baseSequence; + final RecordBatchFW recordBatch = recordBatchRW.wrap(encodeBuffer, encodeProgress, encodeLimit) .baseOffset(0) .length(recordBatchLength) @@ -1888,9 +1912,9 @@ private void doEncodeProduceRequest( .lastOffsetDelta(encodeableRecordCount - 1) .firstTimestamp(encodeableRecordBatchTimestamp) .maxTimestamp(encodeableRecordBatchTimestampMax) - .producerId(RECORD_BATCH_PRODUCER_ID_NONE) - .producerEpoch(RECORD_BATCH_PRODUCER_EPOCH_NONE) - .baseSequence(RECORD_BATCH_SEQUENCE_NONE) + .producerId(client.producerId) + .producerEpoch(client.producerEpoch) + .baseSequence(baseSequence) .recordCount(encodeableRecordCount) .build(); @@ -1922,6 +1946,10 @@ private void doEncodeProduceRequest( encodeableRecordBatchTimestamp = TIMESTAMP_NONE; encodedAckMode = encodeableAckMode; encodeableAckMode = KafkaAckMode.NONE; + client.producerId = RECORD_BATCH_PRODUCER_ID_NONE; + client.producerEpoch = RECORD_BATCH_PRODUCER_EPOCH_NONE; + client.baseSequence = RECORD_BATCH_BASE_SEQUENCE_NONE; + client.sequence = RECORD_SEQUENCE_NONE; assert encodeSlot != NO_SLOT; final MutableDirectBuffer encodeSlotBuffer = encodePool.buffer(encodeSlot); diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaMergedFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaMergedFactory.java index e07ed96424..1178b0b24c 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaMergedFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaMergedFactory.java @@ -87,6 +87,7 @@ import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaMergedConsumerFlushExFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaMergedFlushExFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaMergedProduceDataExFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaMergedProduceFlushExFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaMetaDataExFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaOffsetFetchDataExFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaResetExFW; @@ -126,6 +127,7 @@ public final class KafkaMergedFactory implements BindingHandler private static final DirectBuffer EMPTY_BUFFER = new UnsafeBuffer(); private static final OctetsFW EMPTY_OCTETS = new OctetsFW().wrap(EMPTY_BUFFER, 0, 0); + private static final KafkaKeyFW EMPTY_KEY = new KafkaKeyFW(); private static final Consumer EMPTY_EXTENSION = ex -> {}; private static final MessageConsumer NO_RECEIVER = (m, b, i, l) -> {}; @@ -1362,7 +1364,6 @@ private void onMergedInitialFlush( { final long traceId = flush.traceId(); final long sequence = flush.sequence(); - final long acknowledge = flush.acknowledge(); final OctetsFW extension = flush.extension(); final int reserved = flush.reserved(); final ExtensionFW flushEx = extension.get(extensionRO::tryWrap); @@ -1377,6 +1378,9 @@ private void onMergedInitialFlush( switch (kafkaMergedFlushEx.kind()) { + case KafkaMergedFlushExFW.KIND_PRODUCE: + onMergedProduceFlush(kafkaMergedFlushEx, traceId); + break; case KafkaMergedFlushExFW.KIND_FETCH: onMergedFetchFlush(kafkaMergedFlushEx, traceId, sequence, reserved); break; @@ -1386,6 +1390,18 @@ private void onMergedInitialFlush( } } + private void onMergedProduceFlush( + KafkaMergedFlushExFW kafkaMergedFlushEx, + long traceId) + { + final KafkaMergedProduceFlushExFW produce = kafkaMergedFlushEx.produce(); + final KafkaKeyFW hashKey = produce.hashKey(); + + final int partitionId = nextPartitionData(hashKey, EMPTY_KEY); + + doMergedProduceReplyFlush(traceId, partitionId); + } + private void onMergedFetchFlush( KafkaMergedFlushExFW kafkaMergedFlushEx, long traceId, @@ -1592,28 +1608,34 @@ private void doMergedReplyBegin( if (capabilities == FETCH_ONLY) { doBegin(sender, originId, routedId, replyId, replySeq, replyAck, replyMax, - traceId, authorization, affinity, beginExToKafka()); + traceId, authorization, affinity, beginExToKafka(beginExToKafkaMergedFetchOnly())); + } + else if (capabilities == PRODUCE_ONLY) + { + doBegin(sender, originId, routedId, replyId, replySeq, replyAck, replyMax, + traceId, authorization, affinity, beginExToKafka(beginExToKafkaMergedProduceOnly())); } else { doBegin(sender, originId, routedId, replyId, replySeq, replyAck, replyMax, - traceId, authorization, affinity, EMPTY_EXTENSION); + traceId, authorization, affinity, EMPTY_EXTENSION); } doUnmergedFetchReplyWindowsIfNecessary(traceId); } - private Flyweight.Builder.Visitor beginExToKafka() + private Flyweight.Builder.Visitor beginExToKafka( + Consumer beginExToKafkaMerged) { return (buffer, offset, maxLimit) -> kafkaBeginExRW.wrap(buffer, offset, maxLimit) .typeId(kafkaTypeId) - .merged(beginExToKafkaMerged()) + .merged(beginExToKafkaMerged) .build() .limit() - offset; } - private Consumer beginExToKafkaMerged() + private Consumer beginExToKafkaMergedFetchOnly() { return builder -> { @@ -1640,6 +1662,15 @@ private Consumer beginExToKafkaMerged() }; } + private Consumer beginExToKafkaMergedProduceOnly() + { + return builder -> + { + builder.capabilities(c -> c.set(PRODUCE_ONLY)).topic(topic); + leadersByPartitionId.intForEach((k, v) -> builder.partitionsItem(i -> i.partitionId(k))); + }; + } + private void doMergedReplyData( long traceId, int flags, @@ -1806,6 +1837,19 @@ private void doMergedConsumerReplyFlush( traceId, authorization, 0, kafkaFlushExFW); } + private void doMergedProduceReplyFlush( + long traceId, + int partitionId) + { + final KafkaFlushExFW kafkaFlushExFW = kafkaFlushExRW.wrap(extBuffer, 0, extBuffer.capacity()) + .typeId(kafkaTypeId) + .merged(mc -> mc.produce(c -> c.partitionId(partitionId))) + .build(); + + doFlush(sender, originId, routedId, replyId, replySeq, replyAck, replyMax, + traceId, authorization, 0, kafkaFlushExFW); + } + private void doMergedFetchReplyFlush( long traceId, int reserved, diff --git a/runtime/binding-kafka/src/main/zilla/internal.idl b/runtime/binding-kafka/src/main/zilla/internal.idl index c6ae057f28..fc7a31a91a 100644 --- a/runtime/binding-kafka/src/main/zilla/internal.idl +++ b/runtime/binding-kafka/src/main/zilla/internal.idl @@ -23,6 +23,8 @@ scope internal int64 timestamp; int64 ownerId; int64 acknowledge = 0; + int64 producerId = -1; + int16 producerEpoch = -1; int32 sequence = -1; int64 ancestor; int64 descendant; diff --git a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/CacheMergedIT.java b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/CacheMergedIT.java index e6091b5d26..26398f0cd9 100644 --- a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/CacheMergedIT.java +++ b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/CacheMergedIT.java @@ -341,6 +341,26 @@ public void shouldProduceMergedMessageValuesByDefault() throws Exception k3po.finish(); } + @Test + @Configuration("cache.yaml") + @Specification({ + "${app}/merged.produce.message.values.producer.id/client", + "${app}/unmerged.produce.message.values.producer.id/server"}) + public void shouldProduceMergedMessageValuesWithProducerId() throws Exception + { + k3po.finish(); + } + + @Test + @Configuration("cache.yaml") + @Specification({ + "${app}/merged.produce.message.value.partition.id/client", + "${app}/unmerged.produce.message.value.partition.id/server"}) + public void shouldProduceMergedMessageValueByGettingPartitionId() throws Exception + { + k3po.finish(); + } + @Test @Configuration("cache.options.merged.yaml") @Specification({ diff --git a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientInitProducerIdSaslIT.java b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientInitProducerIdSaslIT.java new file mode 100644 index 0000000000..723c93b22d --- /dev/null +++ b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientInitProducerIdSaslIT.java @@ -0,0 +1,79 @@ +/* + * Copyright 2021-2023 Aklivity Inc. + * + * Aklivity licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.aklivity.zilla.runtime.binding.kafka.internal.stream; + +import static io.aklivity.zilla.runtime.binding.kafka.internal.KafkaConfigurationTest.KAFKA_CLIENT_SASL_SCRAM_NONCE_NAME; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.rules.RuleChain.outerRule; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.DisableOnDebug; +import org.junit.rules.TestRule; +import org.junit.rules.Timeout; +import org.kaazing.k3po.junit.annotation.Specification; +import org.kaazing.k3po.junit.rules.K3poRule; + +import io.aklivity.zilla.runtime.engine.test.EngineRule; +import io.aklivity.zilla.runtime.engine.test.annotation.Configuration; +import io.aklivity.zilla.runtime.engine.test.annotation.Configure; + +public class ClientInitProducerIdSaslIT +{ + private final K3poRule k3po = new K3poRule() + .addScriptRoot("net", "io/aklivity/zilla/specs/binding/kafka/streams/network/init.producer.id.v4.sasl.handshake.v1") + .addScriptRoot("app", "io/aklivity/zilla/specs/binding/kafka/streams/application/init.producer.id"); + + private final TestRule timeout = new DisableOnDebug(new Timeout(15, SECONDS)); + + private final EngineRule engine = new EngineRule() + .directory("target/zilla-itests") + .countersBufferCapacity(8192) + .configurationRoot("io/aklivity/zilla/specs/binding/kafka/config") + .external("net0") + .clean(); + + @Rule + public final TestRule chain = outerRule(engine).around(k3po).around(timeout); + + + @Test + @Configuration("client.options.sasl.plain.yaml") + @Specification({ + "${app}/produce.new.id/client", + "${net}/produce.new.id.sasl.plain/server"}) + public void shouldGenerateNewProducerIdWithSaslPlain() throws Exception + { + k3po.finish(); + } + + @Test + @Configuration("client.options.sasl.scram.yaml") + @Specification({ + "${app}/produce.new.id/client", + "${net}/produce.new.id.sasl.scram/server"}) + @Configure(name = KAFKA_CLIENT_SASL_SCRAM_NONCE_NAME, + value = "io.aklivity.zilla.runtime.binding.kafka.internal.stream.ClientInitProducerIdSaslIT::supplyNonce") + public void shouldGenerateNewProducerIdWithSaslScram() throws Exception + { + k3po.finish(); + } + + public static String supplyNonce() + { + return "fyko+d2lbbFgONRv9qkxdawL"; + } +} diff --git a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientProduceIT.java b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientProduceIT.java index 5a22d21243..980b7c084f 100644 --- a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientProduceIT.java +++ b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientProduceIT.java @@ -152,6 +152,37 @@ public void shouldSendMessageKeyDistinct() throws Exception k3po.finish(); } + @Test + @Configuration("client.when.topic.yaml") + @Specification({ + "${app}/message.producer.id/client", + "${net}/message.producer.id/server"}) + public void shouldSendMessageValueWithProducerId() throws Exception + { + k3po.finish(); + } + + @Test + @Configuration("client.when.topic.yaml") + @Specification({ + "${app}/message.values.producer.id/client", + "${net}/message.values.producer.id/server"}) + @Configure(name = KafkaConfigurationTest.KAFKA_CLIENT_PRODUCE_MAX_REQUEST_MILLIS_NAME, value = "200") + public void shouldSendMessageValuesWithProducerId() throws Exception + { + k3po.finish(); + } + + @Test + @Configuration("client.when.topic.yaml") + @Specification({ + "${app}/message.values.producer.id.replay/client", + "${net}/message.values.producer.id.replay/server"}) + public void shouldReplyMessageValuesWithProducerId() throws Exception + { + k3po.finish(); + } + @Test @Configuration("client.when.topic.yaml") @Specification({ diff --git a/specs/binding-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctions.java b/specs/binding-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctions.java index 57a391e68c..9d53ec0521 100644 --- a/specs/binding-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctions.java +++ b/specs/binding-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctions.java @@ -89,6 +89,7 @@ import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaMergedFetchFlushExFW; import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaMergedFlushExFW; import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaMergedProduceDataExFW; +import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaMergedProduceFlushExFW; import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaMetaBeginExFW; import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaMetaDataExFW; import io.aklivity.zilla.specs.binding.kafka.internal.types.stream.KafkaOffsetCommitBeginExFW; @@ -2691,6 +2692,13 @@ private KafkaMergedFlushExBuilder() mergedFlushExRW.wrap(writeBuffer, KafkaFlushExFW.FIELD_OFFSET_MERGED, writeBuffer.capacity()); } + public KafkaMergedProduceFlushExBuilder produce() + { + mergedFlushExRW.kind(KafkaApi.PRODUCE.value()); + + return new KafkaMergedProduceFlushExBuilder(); + } + public KafkaMergedFetchFlushExBuilder fetch() { mergedFlushExRW.kind(KafkaApi.FETCH.value()); @@ -2826,6 +2834,50 @@ public KafkaFlushExBuilder build() } } + public final class KafkaMergedProduceFlushExBuilder + { + private final KafkaMergedProduceFlushExFW.Builder mergedProduceFlushExRW = + new KafkaMergedProduceFlushExFW.Builder(); + + private KafkaMergedProduceFlushExBuilder() + { + mergedProduceFlushExRW.wrap(writeBuffer, + KafkaFlushExFW.FIELD_OFFSET_MERGED + KafkaMergedFlushExFW.FIELD_OFFSET_PRODUCE, + writeBuffer.capacity()); + } + + public KafkaMergedProduceFlushExBuilder hashKey( + String hashKey) + { + if (hashKey == null) + { + mergedProduceFlushExRW.hashKey(m -> m.length(-1) + .value((OctetsFW) null)); + } + else + { + keyRO.wrap(hashKey.getBytes(UTF_8)); + mergedProduceFlushExRW.hashKey(k -> k.length(keyRO.capacity()) + .value(keyRO, 0, keyRO.capacity())); + } + return this; + } + + public KafkaMergedProduceFlushExBuilder partitionId( + int partitionId) + { + mergedProduceFlushExRW.partitionId(partitionId); + return this; + } + + public KafkaFlushExBuilder build() + { + final KafkaMergedProduceFlushExFW mergedProduceFlushEx = mergedProduceFlushExRW.build(); + flushExRO.wrap(writeBuffer, 0, mergedProduceFlushEx.limit()); + return KafkaFlushExBuilder.this; + } + } + public final class KafkaMergedConsumerFlushExBuilder { private final KafkaMergedConsumerFlushExFW.Builder mergedConsumerFlushExRW = @@ -4025,6 +4077,8 @@ public final class KafkaMergedProduceDataExMatcherBuilder { private Integer deferred; private Long timestamp; + private Long producerId; + private Short producerEpoch; private Long filters; private KafkaOffsetFW.Builder partitionRW; private Array32FW.Builder progressRW; @@ -4051,6 +4105,20 @@ public KafkaMergedProduceDataExMatcherBuilder timestamp( return this; } + public KafkaMergedProduceDataExMatcherBuilder producerId( + long producerId) + { + this.producerId = producerId; + return this; + } + + public KafkaMergedProduceDataExMatcherBuilder producerEpoch( + short producerEpoch) + { + this.producerEpoch = producerEpoch; + return this; + } + public KafkaMergedProduceDataExMatcherBuilder filters( long filters) { @@ -4318,6 +4386,8 @@ private boolean match( return matchPartition(produce) && matchDeferred(produce) && matchTimestamp(produce) && + matchProducerId(produce) && + matchProducerEpoch(produce) && matchKey(produce) && matchHashKey(produce) && matchHeaders(produce); @@ -4341,6 +4411,18 @@ private boolean matchTimestamp( return timestamp == null || timestamp == mergedProduceDataEx.timestamp(); } + private boolean matchProducerId( + final KafkaMergedProduceDataExFW mergedProduceDataEx) + { + return producerId == null || producerId == mergedProduceDataEx.producerId(); + } + + private boolean matchProducerEpoch( + final KafkaMergedProduceDataExFW mergedProduceDataEx) + { + return producerEpoch == null || producerEpoch == mergedProduceDataEx.producerEpoch(); + } + private boolean matchKey( final KafkaMergedProduceDataExFW mergedProduceDataEx) { diff --git a/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl b/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl index 13636f639b..30e63478a9 100644 --- a/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl +++ b/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl @@ -279,6 +279,7 @@ scope kafka { case 252: kafka::stream::KafkaMergedConsumerFlushEx consumer; case 1: kafka::stream::KafkaMergedFetchFlushEx fetch; + case 0: kafka::stream::KafkaMergedProduceFlushEx produce; } struct KafkaMergedConsumerFlushEx @@ -296,6 +297,12 @@ scope kafka KafkaKey key; } + struct KafkaMergedProduceFlushEx + { + KafkaKey hashKey; + int32 partitionId = -1; + } + struct KafkaMetaBeginEx { string16 topic; diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/init.producer.id/produce.new.id/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/init.producer.id/produce.new.id/server.rpt index b711605d9e..8b4631548b 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/init.producer.id/produce.new.id/server.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/init.producer.id/produce.new.id/server.rpt @@ -41,3 +41,4 @@ write zilla:begin.ext ${kafka:beginEx() .producerEpoch(2) .build() .build()} +write flush diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/merged.group.produce.message.value/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/merged.group.produce.message.value/client.rpt index c7de6e3bfc..b4e579768d 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/merged.group.produce.message.value/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/merged.group.produce.message.value/client.rpt @@ -31,6 +31,16 @@ write zilla:begin.ext ${kafka:beginEx() connected +read zilla:begin.ext ${kafka:matchBeginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("PRODUCE_ONLY") + .topic("test") + .partition(0, -1) + .partition(1, -1) + .build() + .build()} + write zilla:data.ext ${kafka:dataEx() .typeId(zilla:id("kafka")) .merged() diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/merged.group.produce.message.value/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/merged.group.produce.message.value/server.rpt index 7ef71f8311..097224b9e0 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/merged.group.produce.message.value/server.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/merged.group.produce.message.value/server.rpt @@ -36,6 +36,17 @@ read zilla:begin.ext ${kafka:beginEx() connected +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("PRODUCE_ONLY") + .topic("test") + .partition(0, -1) + .partition(1, -1) + .build() + .build()} +write flush + read zilla:data.ext ${kafka:dataEx() .typeId(zilla:id("kafka")) .merged() diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/merged.produce.message.value.partition.id/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/merged.produce.message.value.partition.id/client.rpt new file mode 100644 index 0000000000..dbb2c9f467 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/merged.produce.message.value.partition.id/client.rpt @@ -0,0 +1,72 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("PRODUCE_ONLY") + .topic("test") + .ackMode("LEADER_ONLY") + .build() + .build()} + +connected + +read zilla:begin.ext ${kafka:matchBeginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("PRODUCE_ONLY") + .topic("test") + .partition(0, -1) + .build() + .build()} + +write advise zilla:flush ${kafka:flushEx() + .typeId(zilla:id("kafka")) + .merged() + .produce() + .hashKey("key7") + .build() + .build()} + +read advised zilla:flush ${kafka:flushEx() + .typeId(zilla:id("kafka")) + .merged() + .produce() + .partitionId(0) + .build() + .build()} + + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .merged() + .produce() + .timestamp(newTimestamp) + .producerId(1) + .producerEpoch(1) + .partition(0, 0) + .key("a") + .hashKey("key7") + .build() + .build()} +write "Hello, world #A1" +write flush + diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/merged.produce.message.value.partition.id/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/merged.produce.message.value.partition.id/server.rpt new file mode 100644 index 0000000000..58abdd81f4 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/merged.produce.message.value.partition.id/server.rpt @@ -0,0 +1,75 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property deltaMillis 0L +property newTimestamp ${kafka:timestamp() + deltaMillis} + +accept "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("PRODUCE_ONLY") + .topic("test") + .ackMode("LEADER_ONLY") + .build() + .build()} + +connected + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("PRODUCE_ONLY") + .topic("test") + .partition(0, -1) + .build() + .build()} +write flush + +read advised zilla:flush ${kafka:flushEx() + .typeId(zilla:id("kafka")) + .merged() + .produce() + .hashKey("key7") + .build() + .build()} + +write advise zilla:flush ${kafka:flushEx() + .typeId(zilla:id("kafka")) + .merged() + .produce() + .partitionId(0) + .build() + .build()} + +read zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .merged() + .produce() + .timestamp(newTimestamp) + .producerId(1) + .producerEpoch(1) + .partition(0, 0) + .key("a") + .hashKey("key7") + .build() + .build()} +read "Hello, world #A1" diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/merged.produce.message.values.producer.id/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/merged.produce.message.values.producer.id/client.rpt new file mode 100644 index 0000000000..0d31cddaf3 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/merged.produce.message.values.producer.id/client.rpt @@ -0,0 +1,111 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property deltaMillis 0L +property newTimestamp ${kafka:timestamp() + deltaMillis} + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("PRODUCE_ONLY") + .topic("test") + .ackMode("LEADER_ONLY") + .build() + .build()} + +connected + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .merged() + .produce() + .timestamp(newTimestamp) + .producerId(1) + .producerEpoch(1) + .partition(0, 1) + .build() + .build()} +write "Hello, world #A1" +write flush + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .merged() + .produce() + .timestamp(newTimestamp) + .producerId(1) + .producerEpoch(1) + .partition(1, 1) + .build() + .build()} +write "Hello, world #B1" +write flush + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .merged() + .produce() + .timestamp(newTimestamp) + .producerId(1) + .producerEpoch(1) + .partition(0, 2) + .build() + .build()} +write "Hello, world #A2" +write flush + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .merged() + .produce() + .timestamp(newTimestamp) + .producerId(1) + .producerEpoch(1) + .partition(1, 2) + .build() + .build()} +write "Hello, world #B2" +write flush + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .merged() + .produce() + .timestamp(newTimestamp) + .producerId(1) + .producerEpoch(1) + .partition(2, 1) + .build() + .build()} +write "Hello, world #C1" +write flush + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .merged() + .produce() + .timestamp(newTimestamp) + .producerId(1) + .producerEpoch(1) + .partition(2, 2) + .build() + .build()} +write "Hello, world #C2" +write flush diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/merged.produce.message.values.producer.id/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/merged.produce.message.values.producer.id/server.rpt new file mode 100644 index 0000000000..a790cc43fe --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/merged.produce.message.values.producer.id/server.rpt @@ -0,0 +1,98 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +accept "zilla://streams/app0" + option zilla:window 16 + option zilla:transmission "half-duplex" + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("PRODUCE_ONLY") + .topic("test") + .ackMode("LEADER_ONLY") + .build() + .build()} + +connected + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .merged() + .produce() + .producerId(1) + .producerEpoch(1) + .partition(0, 1) + .build() + .build()} +read "Hello, world #A1" + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .merged() + .produce() + .producerId(1) + .producerEpoch(1) + .partition(1, 1) + .build() + .build()} +read "Hello, world #B1" + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .merged() + .produce() + .producerId(1) + .producerEpoch(1) + .partition(0, 2) + .build() + .build()} +read "Hello, world #A2" + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .merged() + .produce() + .producerId(1) + .producerEpoch(1) + .partition(1, 2) + .build() + .build()} +read "Hello, world #B2" + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .merged() + .produce() + .producerId(1) + .producerEpoch(1) + .partition(2, 1) + .build() + .build()} +read "Hello, world #C1" + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .merged() + .produce() + .producerId(1) + .producerEpoch(1) + .partition(2, 2) + .build() + .build()} +read "Hello, world #C2" diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.produce.message.value.partition.id/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.produce.message.value.partition.id/client.rpt new file mode 100644 index 0000000000..1c777a55bc --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.produce.message.value.partition.id/client.rpt @@ -0,0 +1,143 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/app1" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .describe() + .topic("test") + .config("cleanup.policy") + .config("max.message.bytes") + .config("segment.bytes") + .config("segment.index.bytes") + .config("segment.ms") + .config("retention.bytes") + .config("retention.ms") + .config("delete.retention.ms") + .config("min.compaction.lag.ms") + .config("max.compaction.lag.ms") + .config("min.cleanable.dirty.ratio") + .build() + .build()} + +connected + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .describe() + .topic("test") + .config("cleanup.policy") + .config("max.message.bytes") + .config("segment.bytes") + .config("segment.index.bytes") + .config("segment.ms") + .config("retention.bytes") + .config("retention.ms") + .config("delete.retention.ms") + .config("min.compaction.lag.ms") + .config("max.compaction.lag.ms") + .config("min.cleanable.dirty.ratio") + .build() + .build()} + +read zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .describe() + .config("cleanup.policy", "delete") + .config("max.message.bytes", 1000012) + .config("segment.bytes", 1073741824) + .config("segment.index.bytes", 10485760) + .config("segment.ms", 604800000) + .config("retention.bytes", -1) + .config("retention.ms", 604800000) + .config("delete.retention.ms", 86400000) + .config("min.compaction.lag.ms", 0) + .config("max.compaction.lag.ms", 9223372036854775807) + .config("min.cleanable.dirty.ratio", 0.5) + .build() + .build()} + +read notify RECEIVED_CONFIG + +connect await RECEIVED_CONFIG + "zilla://streams/app1" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +connected + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +read zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .meta() + .partition(0, 1) + .build() + .build()} +read notify PARTITION_COUNT_1 + +connect await PARTITION_COUNT_1 + "zilla://streams/app1" + option zilla:window 8192 + option zilla:transmission "half-duplex" + option zilla:affinity 1 + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(0) + .build() + .build()} + +connected + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(0) + .build() + .build()} + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .produce() + .timestamp(newTimestamp) + .producerId(1) + .producerEpoch(1) + .sequence(0) + .ackMode("LEADER_ONLY") + .key("a") + .build() + .build()} +write "Hello, world #A1" +write flush diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.produce.message.value.partition.id/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.produce.message.value.partition.id/server.rpt new file mode 100644 index 0000000000..61d830eabd --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.produce.message.value.partition.id/server.rpt @@ -0,0 +1,144 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property deltaMillis 0L +property newTimestamp ${kafka:timestamp() + deltaMillis} +property padding 0 + +accept "zilla://streams/app1" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .describe() + .topic("test") + .config("cleanup.policy") + .config("max.message.bytes") + .config("segment.bytes") + .config("segment.index.bytes") + .config("segment.ms") + .config("retention.bytes") + .config("retention.ms") + .config("delete.retention.ms") + .config("min.compaction.lag.ms") + .config("max.compaction.lag.ms") + .config("min.cleanable.dirty.ratio") + .build() + .build()} + +connected + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .describe() + .topic("test") + .config("cleanup.policy") + .config("max.message.bytes") + .config("segment.bytes") + .config("segment.index.bytes") + .config("segment.ms") + .config("retention.bytes") + .config("retention.ms") + .config("delete.retention.ms") + .config("min.compaction.lag.ms") + .config("max.compaction.lag.ms") + .config("min.cleanable.dirty.ratio") + .build() + .build()} +write flush + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .describe() + .config("cleanup.policy", "delete") + .config("max.message.bytes", 1000012) + .config("segment.bytes", 1073741824) + .config("segment.index.bytes", 10485760) + .config("segment.ms", 604800000) + .config("retention.bytes", -1) + .config("retention.ms", 604800000) + .config("delete.retention.ms", 86400000) + .config("min.compaction.lag.ms", 0) + .config("max.compaction.lag.ms", 9223372036854775807) + .config("min.cleanable.dirty.ratio", 0.5) + .build() + .build()} +write flush + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +connected + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} +write flush + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .meta() + .partition(0, 1) + .build() + .build()} +write flush + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(0) + .build() + .build()} + +connected + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(0) + .build() + .build()} +write flush + + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .produce() + .timestamp(newTimestamp) + .producerId(1) + .producerEpoch(1) + .sequence(0) + .ackMode("LEADER_ONLY") + .key("a") + .build() + .build()} +read "Hello, world #A1" diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.produce.message.values.producer.id/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.produce.message.values.producer.id/client.rpt new file mode 100644 index 0000000000..67ba1546ee --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.produce.message.values.producer.id/client.rpt @@ -0,0 +1,257 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/app1" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .describe() + .topic("test") + .config("cleanup.policy") + .config("max.message.bytes") + .config("segment.bytes") + .config("segment.index.bytes") + .config("segment.ms") + .config("retention.bytes") + .config("retention.ms") + .config("delete.retention.ms") + .config("min.compaction.lag.ms") + .config("max.compaction.lag.ms") + .config("min.cleanable.dirty.ratio") + .build() + .build()} + +connected + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .describe() + .topic("test") + .config("cleanup.policy") + .config("max.message.bytes") + .config("segment.bytes") + .config("segment.index.bytes") + .config("segment.ms") + .config("retention.bytes") + .config("retention.ms") + .config("delete.retention.ms") + .config("min.compaction.lag.ms") + .config("max.compaction.lag.ms") + .config("min.cleanable.dirty.ratio") + .build() + .build()} + +read zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .describe() + .config("cleanup.policy", "delete") + .config("max.message.bytes", 1000012) + .config("segment.bytes", 1073741824) + .config("segment.index.bytes", 10485760) + .config("segment.ms", 604800000) + .config("retention.bytes", -1) + .config("retention.ms", 604800000) + .config("delete.retention.ms", 86400000) + .config("min.compaction.lag.ms", 0) + .config("max.compaction.lag.ms", 9223372036854775807) + .config("min.cleanable.dirty.ratio", 0.5) + .build() + .build()} + +read notify RECEIVED_CONFIG + +connect await RECEIVED_CONFIG + "zilla://streams/app1" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +connected + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +read zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .meta() + .partition(0, 1) + .partition(1, 2) + .partition(2, 3) + .build() + .build()} +read notify PARTITION_COUNT_3 + +connect await PARTITION_COUNT_3 + "zilla://streams/app1" + option zilla:window 8192 + option zilla:transmission "half-duplex" + option zilla:affinity 1 + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(0) + .build() + .build()} + +connected + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(0) + .build() + .build()} + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .produce() + .timestamp(newTimestamp) + .producerId(1) + .producerEpoch(1) + .sequence(1) + .ackMode("LEADER_ONLY") + .build() + .build()} +write "Hello, world #A1" +write flush + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .produce() + .timestamp(newTimestamp) + .producerId(1) + .producerEpoch(1) + .sequence(2) + .ackMode("LEADER_ONLY") + .build() + .build()} +write "Hello, world #A2" +write flush + +connect await PARTITION_COUNT_3 + "zilla://streams/app1" + option zilla:window 8192 + option zilla:transmission "half-duplex" + option zilla:affinity 2 + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(1) + .build() + .build()} + +connected + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(1) + .build() + .build()} + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .produce() + .timestamp(newTimestamp) + .producerId(1) + .producerEpoch(1) + .sequence(1) + .ackMode("LEADER_ONLY") + .build() + .build()} +write "Hello, world #B1" +write flush + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .produce() + .timestamp(newTimestamp) + .producerId(1) + .producerEpoch(1) + .sequence(2) + .ackMode("LEADER_ONLY") + .build() + .build()} +write "Hello, world #B2" +write flush + +connect await PARTITION_COUNT_3 + "zilla://streams/app1" + option zilla:window 8192 + option zilla:transmission "half-duplex" + option zilla:affinity 3 + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(2) + .build() + .build()} + +connected + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(2) + .build() + .build()} + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .produce() + .timestamp(newTimestamp) + .producerId(1) + .producerEpoch(1) + .sequence(1) + .ackMode("LEADER_ONLY") + .build() + .build()} +write "Hello, world #C1" +write flush + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .produce() + .timestamp(newTimestamp) + .producerId(1) + .producerEpoch(1) + .sequence(2) + .ackMode("LEADER_ONLY") + .build() + .build()} +write "Hello, world #C2" +write flush diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.produce.message.values.producer.id/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.produce.message.values.producer.id/server.rpt new file mode 100644 index 0000000000..84238ff270 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.produce.message.values.producer.id/server.rpt @@ -0,0 +1,245 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property deltaMillis 0L +property newTimestamp ${kafka:timestamp() + deltaMillis} + +accept "zilla://streams/app1" + option zilla:window 64 + option zilla:transmission "half-duplex" + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .describe() + .topic("test") + .config("cleanup.policy") + .config("max.message.bytes") + .config("segment.bytes") + .config("segment.index.bytes") + .config("segment.ms") + .config("retention.bytes") + .config("retention.ms") + .config("delete.retention.ms") + .config("min.compaction.lag.ms") + .config("max.compaction.lag.ms") + .config("min.cleanable.dirty.ratio") + .build() + .build()} + +connected + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .describe() + .topic("test") + .config("cleanup.policy") + .config("max.message.bytes") + .config("segment.bytes") + .config("segment.index.bytes") + .config("segment.ms") + .config("retention.bytes") + .config("retention.ms") + .config("delete.retention.ms") + .config("min.compaction.lag.ms") + .config("max.compaction.lag.ms") + .config("min.cleanable.dirty.ratio") + .build() + .build()} +write flush + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .describe() + .config("cleanup.policy", "delete") + .config("max.message.bytes", 1000012) + .config("segment.bytes", 1073741824) + .config("segment.index.bytes", 10485760) + .config("segment.ms", 604800000) + .config("retention.bytes", -1) + .config("retention.ms", 604800000) + .config("delete.retention.ms", 86400000) + .config("min.compaction.lag.ms", 0) + .config("max.compaction.lag.ms", 9223372036854775807) + .config("min.cleanable.dirty.ratio", 0.5) + .build() + .build()} +write flush + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +connected + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} +write flush + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .meta() + .partition(0, 1) + .partition(1, 2) + .partition(2, 3) + .build() + .build()} +write flush + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(0) + .build() + .build()} + +connected + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(0) + .build() + .build()} +write flush + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .produce() + .timestamp(newTimestamp) + .producerId(1) + .producerEpoch(1) + .sequence(1) + .ackMode("LEADER_ONLY") + .build() + .build()} +read "Hello, world #A1" + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .produce() + .timestamp(newTimestamp) + .producerId(1) + .producerEpoch(1) + .sequence(2) + .ackMode("LEADER_ONLY") + .build() + .build()} +read "Hello, world #A2" + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(1) + .build() + .build()} + +connected + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(1) + .build() + .build()} +write flush + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .produce() + .timestamp(newTimestamp) + .producerId(1) + .producerEpoch(1) + .sequence(1) + .ackMode("LEADER_ONLY") + .build() + .build()} +read "Hello, world #B1" + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .produce() + .timestamp(newTimestamp) + .producerId(1) + .producerEpoch(1) + .sequence(2) + .ackMode("LEADER_ONLY") + .build() + .build()} +read "Hello, world #B2" + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(2) + .build() + .build()} + +connected + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(2) + .build() + .build()} +write flush + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .produce() + .timestamp(newTimestamp) + .producerId(1) + .producerEpoch(1) + .sequence(1) + .ackMode("LEADER_ONLY") + .build() + .build()} +read "Hello, world #C1" + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .produce() + .timestamp(newTimestamp) + .producerId(1) + .producerEpoch(1) + .sequence(2) + .ackMode("LEADER_ONLY") + .build() + .build()} +read "Hello, world #C2" diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.producer.id/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.producer.id/client.rpt new file mode 100644 index 0000000000..483b0570ce --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.producer.id/client.rpt @@ -0,0 +1,83 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property deltaMillis 0L +property newTimestamp ${kafka:timestamp() + deltaMillis} + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +connected + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +read zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .meta() + .partition(0, 177) + .build() + .build()} + +read notify ROUTED_BROKER_CLIENT + +connect await ROUTED_BROKER_CLIENT + "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + option zilla:affinity 0xb1 + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(0) + .build() + .build()} + +connected + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(0) + .build() + .build()} + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .produce() + .timestamp(newTimestamp) + .producerId(8) + .producerEpoch(1) + .sequence(0) + .build() + .build()} +write "Hello, world" +write flush diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.producer.id/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.producer.id/server.rpt new file mode 100644 index 0000000000..2338b82799 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.producer.id/server.rpt @@ -0,0 +1,78 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property serverAddress "zilla://streams/app0" + +accept ${serverAddress} + option zilla:window 8192 + option zilla:transmission "half-duplex" + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +connected + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} +write flush + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .meta() + .partition(0, 177) + .build() + .build()} +write flush + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(0) + .build() + .build()} + +connected + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(0) + .build() + .build()} + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .produce() + .producerId(8) + .producerEpoch(1) + .sequence(0) + .build() + .build()} +read "Hello, world" diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.repeated/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.repeated/client.rpt index 980c49a933..ce3393ea3b 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.repeated/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.repeated/client.rpt @@ -74,6 +74,7 @@ write zilla:data.ext ${kafka:dataEx() .typeId(zilla:id("kafka")) .produce() .timestamp(newTimestamp) + .sequence(0) .build() .build()} write "Hello, world" @@ -83,6 +84,7 @@ write zilla:data.ext ${kafka:dataEx() .typeId(zilla:id("kafka")) .produce() .timestamp(newTimestamp) + .sequence(1) .build() .build()} write "Hello, world" diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.producer.id.changes/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.producer.id.changes/client.rpt new file mode 100644 index 0000000000..86f89d8a07 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.producer.id.changes/client.rpt @@ -0,0 +1,95 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property deltaMillis 0L +property newTimestamp ${kafka:timestamp() + deltaMillis} + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +connected + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +read zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .meta() + .partition(0, 177) + .build() + .build()} + +read notify ROUTED_BROKER_CLIENT + +connect await ROUTED_BROKER_CLIENT + "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + option zilla:affinity 0xb1 + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(0) + .build() + .build()} + +connected + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(0) + .build() + .build()} + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .produce() + .timestamp(newTimestamp) + .producerId(1) + .producerEpoch(1) + .sequence(0) + .build() + .build()} +write "Hello, world" +write flush + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .produce() + .timestamp(newTimestamp) + .producerId(2) + .producerEpoch(2) + .sequence(0) + .build() + .build()} +write "Hello, again" +write flush diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.producer.id.changes/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.producer.id.changes/server.rpt new file mode 100644 index 0000000000..80a632c830 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.producer.id.changes/server.rpt @@ -0,0 +1,91 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property serverAddress "zilla://streams/app0" + +accept ${serverAddress} + option zilla:window 8192 + option zilla:padding 512 + option zilla:transmission "half-duplex" + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +connected + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} +write flush + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .meta() + .partition(0, 177) + .build() + .build()} +write flush + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(0) + .build() + .build()} + +connected + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(0) + .build() + .build()} + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .produce() + .producerId(1) + .producerEpoch(1) + .sequence(0) + .build() + .build()} + +read "Hello, world" + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .produce() + .producerId(2) + .producerEpoch(2) + .sequence(0) + .build() + .build()} + +read "Hello, again" diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.producer.id.replay/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.producer.id.replay/client.rpt new file mode 100644 index 0000000000..d6d7147ec0 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.producer.id.replay/client.rpt @@ -0,0 +1,95 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property deltaMillis 0L +property newTimestamp ${kafka:timestamp() + deltaMillis} + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +connected + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +read zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .meta() + .partition(0, 177) + .build() + .build()} + +read notify ROUTED_BROKER_CLIENT + +connect await ROUTED_BROKER_CLIENT + "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + option zilla:affinity 0xb1 + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(0) + .build() + .build()} + +connected + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(0) + .build() + .build()} + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .produce() + .timestamp(newTimestamp) + .producerId(1) + .producerEpoch(1) + .sequence(2) + .build() + .build()} +write "Hello, world" +write flush + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .produce() + .timestamp(newTimestamp) + .producerId(1) + .producerEpoch(1) + .sequence(1) + .build() + .build()} +write "Hello, again" +write flush diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.producer.id.replay/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.producer.id.replay/server.rpt new file mode 100644 index 0000000000..3d5d3309ba --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.producer.id.replay/server.rpt @@ -0,0 +1,91 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property serverAddress "zilla://streams/app0" + +accept ${serverAddress} + option zilla:window 8192 + option zilla:padding 512 + option zilla:transmission "half-duplex" + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +connected + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} +write flush + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .meta() + .partition(0, 177) + .build() + .build()} +write flush + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(0) + .build() + .build()} + +connected + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(0) + .build() + .build()} + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .produce() + .producerId(1) + .producerEpoch(1) + .sequence(2) + .build() + .build()} + +read "Hello, world" + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .produce() + .producerId(1) + .producerEpoch(1) + .sequence(1) + .build() + .build()} + +read "Hello, again" diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.producer.id/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.producer.id/client.rpt new file mode 100644 index 0000000000..042e1058a1 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.producer.id/client.rpt @@ -0,0 +1,95 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property deltaMillis 0L +property newTimestamp ${kafka:timestamp() + deltaMillis} + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +connected + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +read zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .meta() + .partition(0, 177) + .build() + .build()} + +read notify ROUTED_BROKER_CLIENT + +connect await ROUTED_BROKER_CLIENT + "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + option zilla:affinity 0xb1 + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(0) + .build() + .build()} + +connected + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(0) + .build() + .build()} + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .produce() + .timestamp(newTimestamp) + .producerId(1) + .producerEpoch(1) + .sequence(0) + .build() + .build()} +write "Hello, world" +write flush + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .produce() + .timestamp(newTimestamp) + .producerId(1) + .producerEpoch(1) + .sequence(1) + .build() + .build()} +write "Hello, again" +write flush diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.producer.id/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.producer.id/server.rpt new file mode 100644 index 0000000000..073007b705 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.producer.id/server.rpt @@ -0,0 +1,91 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property serverAddress "zilla://streams/app0" + +accept ${serverAddress} + option zilla:window 8192 + option zilla:padding 512 + option zilla:transmission "half-duplex" + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +connected + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} +write flush + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .meta() + .partition(0, 177) + .build() + .build()} +write flush + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(0) + .build() + .build()} + +connected + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(0) + .build() + .build()} + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .produce() + .producerId(1) + .producerEpoch(1) + .sequence(0) + .build() + .build()} + +read "Hello, world" + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .produce() + .producerId(1) + .producerEpoch(1) + .sequence(1) + .build() + .build()} + +read "Hello, again" diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/init.producer.id.v4.sasl.handshake.v1/produce.new.id.sasl.plain/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/init.producer.id.v4.sasl.handshake.v1/produce.new.id.sasl.plain/client.rpt new file mode 100644 index 0000000000..4c5d98077f --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/init.producer.id.v4.sasl.handshake.v1/produce.new.id.sasl.plain/client.rpt @@ -0,0 +1,76 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property networkConnectWindow 8192 + +property newRequestId ${kafka:newRequestId()} +property fetchWaitMax 500 +property fetchBytesMax 65535 +property partitionBytesMax 8192 + +connect "zilla://streams/net0" + option zilla:window ${networkConnectWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +connected + +write 22 # size + 17s # sasl.handshake + 1s # v1 + ${newRequestId} + 5s "zilla" # client id + 5s "PLAIN" # mechanism + +read 17 # size + ${newRequestId} + 0s # no error + 1 # mechanisms + 5s "PLAIN" # PLAIN + +write 37 # size + 36s # sasl.authenticate + 1s # v1 + ${newRequestId} + 5s "zilla" # client id + 18 + [0x00] "username" # authentication bytes + [0x00] "password" + +read 20 # size + ${newRequestId} + 0s # no error + -1 + -1s # authentication bytes + 0L # session lifetime + +write 31 # size + 22s # init producer id + 4s # v4 + ${newRequestId} + 5s "zilla" # client id + -1s # transaction + 60000 # transaction timeout ms + -1L # producer id + -1s # producer epoch + + +read 20 # size + (int:newRequestId) + 0 # throttle time ms + 0s # no error + 1L # producer id + 2s # producer epoch diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/init.producer.id.v4.sasl.handshake.v1/produce.new.id.sasl.plain/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/init.producer.id.v4.sasl.handshake.v1/produce.new.id.sasl.plain/server.rpt new file mode 100644 index 0000000000..784b3d2b8e --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/init.producer.id.v4.sasl.handshake.v1/produce.new.id.sasl.plain/server.rpt @@ -0,0 +1,72 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property networkAcceptWindow 8192 + +accept "zilla://streams/net0" + option zilla:window ${networkAcceptWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +accepted + +connected + +read 22 # size + 17s # sasl.handshake + 1s # v1 + (int:requestId) + 5s "zilla" # client id + 5s "PLAIN" # mechanism + +write 17 # size + ${requestId} + 0s # no error + 1 # mechanisms + 5s "PLAIN" # PLAIN + +read 37 # size + 36s # sasl.authenticate + 1s # v1 + (int:requestId) + 5s "zilla" # client id + 18 + [0x00] "username" # authentication bytes + [0x00] "password" + +write 20 # size + ${requestId} + 0s # no error + -1 + -1s # authentication bytes + 0L # session lifetime + +read 31 # size + 22s # init producer id + 4s # v4 + (int:newRequestId) + 5s "zilla" # client id + -1s # transaction + 60000 # transaction timeout ms + -1L # producer id + -1s # producer epoch + +write 20 # size + ${newRequestId} + 0 # throttle time ms + 0s # no error + 1L # producer id + 2s # producer epoch diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/init.producer.id.v4.sasl.handshake.v1/produce.new.id.sasl.scram/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/init.producer.id.v4.sasl.handshake.v1/produce.new.id.sasl.scram/client.rpt new file mode 100644 index 0000000000..572339d077 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/init.producer.id.v4.sasl.handshake.v1/produce.new.id.sasl.scram/client.rpt @@ -0,0 +1,90 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property networkConnectWindow 8192 + +property newRequestId ${kafka:newRequestId()} +property fetchWaitMax 500 +property fetchBytesMax 65535 +property partitionBytesMax 8192 + +connect "zilla://streams/net0" + option zilla:window ${networkConnectWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +connected + +write 28 # size + 17s # sasl.handshake + 1s # v1 + ${newRequestId} + 5s "zilla" # client id + 11s "SCRAM-SHA-1" # mechanism + +read 23 # size + ${newRequestId} + 0s # no error + 1 # mechanisms + 11s "SCRAM-SHA-1" # SCRAM + +write 55 # size + 36s # sasl.authenticate + 1s # v1 + ${newRequestId} + 5s "zilla" # client id + 36 # authentication bytes + "n,,n=user,r=fyko+d2lbbFgONRv9qkxdawL" + +read 92 # size + ${newRequestId} + 0s # no error + -1s + 70 "r=fyko+d2lbbFgONRv9qkxdawL3rfcNHYJY1ZVvWVs7j,s=QSXCR+Q6sek8bf92,i=4096" + 0L # session lifetime + +write 101 # size + 36s # sasl.authenticate + 1s # v1 + ${newRequestId} + 5s "zilla" # client id + 82 # authentication bytes + "c=biws,r=fyko+d2lbbFgONRv9qkxdawL3rfcNHYJY1ZVvWVs7j,p=v0X8v3Bz2T0CJGbJQyF0X+HI4Ts=" + +read 52 # size + ${newRequestId} + 0s # no error + -1s + 30 "v=rmF9pqV8S7suAoZWja4dJRkFsKQ=" + 0L # session lifetime + +write 31 # size + 22s # init producer id + 4s # v4 + ${newRequestId} + 5s "zilla" # client id + -1s # transaction + 60000 # transaction timeout ms + -1L # producer id + -1s # producer epoch + + +read 20 # size + (int:newRequestId) + 0 # throttle time ms + 0s # no error + 1L # producer id + 2s # producer epoch diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/init.producer.id.v4.sasl.handshake.v1/produce.new.id.sasl.scram/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/init.producer.id.v4.sasl.handshake.v1/produce.new.id.sasl.scram/server.rpt new file mode 100644 index 0000000000..6c63bb07c8 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/init.producer.id.v4.sasl.handshake.v1/produce.new.id.sasl.scram/server.rpt @@ -0,0 +1,86 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property networkAcceptWindow 8192 + +accept "zilla://streams/net0" + option zilla:window ${networkAcceptWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +accepted + +connected + +read 28 # size + 17s # sasl.handshake + 1s # v1 + (int:requestId) + 5s "zilla" # client id + 11s "SCRAM-SHA-1" # mechanism + +write 23 # size + ${requestId} + 0s # no error + 1 # mechanisms + 11s "SCRAM-SHA-1" # SCRAM + +read 55 # size + 36s # sasl.authenticate + 1s # v1 + (int:requestId) + 5s "zilla" # client id + 36 # authentication bytes + "n,,n=user,r=fyko+d2lbbFgONRv9qkxdawL" + +write 92 # size + ${requestId} + 0s # no error + -1s + 70 "r=fyko+d2lbbFgONRv9qkxdawL3rfcNHYJY1ZVvWVs7j,s=QSXCR+Q6sek8bf92,i=4096" # authentication bytes + 0L # session lifetime + +read 101 # size + 36s # sasl.authenticate + 1s # v1 + (int:requestId) + 5s "zilla" # client id + 82 # authentication bytes + "c=biws,r=fyko+d2lbbFgONRv9qkxdawL3rfcNHYJY1ZVvWVs7j,p=v0X8v3Bz2T0CJGbJQyF0X+HI4Ts=" + +write 52 # size + ${requestId} + 0s # no error + -1s + 30 "v=rmF9pqV8S7suAoZWja4dJRkFsKQ=" # authentication bytes + 0L # session lifetime + +read 31 # size + 22s # init producer id + 4s # v4 + (int:newRequestId) + 5s "zilla" # client id + -1s # transaction + 60000 # transaction timeout ms + -1L # producer id + -1s # producer epoch + +write 20 # size + ${newRequestId} + 0 # throttle time ms + 0s # no error + 1L # producer id + 2s # producer epoch diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.producer.id/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.producer.id/client.rpt new file mode 100644 index 0000000000..188fb0e957 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.producer.id/client.rpt @@ -0,0 +1,128 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property networkConnectWindow 8192 + +property newRequestId ${kafka:newRequestId()} +property produceWaitMax 500 + +connect "zilla://streams/net0" + option zilla:window ${networkConnectWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +connected + +write 26 # size + 3s # metadata + 5s # v5 + ${newRequestId} + 5s "zilla" # client id + 1 # topics + 4s "test" # "test" topic + [0x00] # allow_auto_topic_creation + +read 97 # size + ${newRequestId} + [0..4] + 1 # brokers + 0xb1 # broker id + 19s "broker1.example.com" # host name + 9092 # port + -1s # no rack + 9s "cluster 1" # cluster id + 1 # controller id + 1 # topics + 0s # no error + 4s "test" # "test" topic + [0x00] # not internal + 1 # partitions + 0s # no error + 0 # partition + 0xb1 # leader + 0 # no replicas + -1 # no in-sync replicas + 0 # offline replicas + +read notify ROUTED_BROKER_SERVER + +connect await ROUTED_BROKER_SERVER + "zilla://streams/net0" + option zilla:window ${networkConnectWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +write zilla:begin.ext ${proxy:beginEx() + .typeId(zilla:id("proxy")) + .addressInet() + .protocol("stream") + .source("0.0.0.0") + .destination("broker1.example.com") + .sourcePort(0) + .destinationPort(9092) + .build() + .info() + .authority("broker1.example.com") + .build() + .build()} + +connected + +write 125 # size + 0s # produce + 3s # v3 + ${newRequestId} + 5s "zilla" # client id + -1s # transactional id + 0s # acks + ${produceWaitMax} + 1 + 4s "test" + 1 + 0 # partition + 80 # record set size + 0L # first offset + 68 # length + -1 + [0x02] + 0x4e8723aa + 0s + 0 # last offset delta + ${newTimestamp} # first timestamp + ${newTimestamp} # last timestamp + 8L + 1s + 0 + 1 # records + ${kafka:varint(18)} + [0x00] + ${kafka:varint(0)} + ${kafka:varint(0)} + ${kafka:varint(-1)} # key + ${kafka:varint(12)} # value + "Hello, world" + ${kafka:varint(0)} # headers + +read 44 + ${newRequestId} + 1 # topics + 4s "test" + 1 # partitions + 0 # partition + 0s # no error + 20L # base offset + [0..8] # log append time + [0..4] # throttle ms diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.producer.id/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.producer.id/server.rpt new file mode 100644 index 0000000000..ff81148e96 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.producer.id/server.rpt @@ -0,0 +1,124 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property networkAcceptWindow 8192 + +property deltaMillis 0L +property newTimestamp ${kafka:timestamp() + deltaMillis} + +accept "zilla://streams/net0" + option zilla:window ${networkAcceptWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +accepted + +connected + +read 26 # size + 3s # metadata + 5s # v5 + (int:requestId) + 5s "zilla" # client id + 1 # topics + 4s "test" # "test" topic + [0x00] # allow_auto_topic_creation + +write 97 # size + ${requestId} + 0 + 1 # brokers + 0xb1 # broker id + 19s "broker1.example.com" # host name + 9092 # port + -1s # no rack + 9s "cluster 1" # cluster id + 1 # controller id + 1 # topics + 0s # no error + 4s "test" # "test" topic + [0x00] # not internal + 1 # partitions + 0s # no error + 0 # partition + 0xb1 # leader + 0 # no replicas + -1 # no in-sync replicas + 0 # offline replicas + +accepted + +read zilla:begin.ext ${proxy:matchBeginEx() + .typeId(zilla:id("proxy")) + .addressInet() + .protocol("stream") + .source("0.0.0.0") + .destination("broker1.example.com") + .sourcePort(0) + .destinationPort(9092) + .build() + .info() + .authority("broker1.example.com") + .build() + .build()} + +connected + +read 125 + 0s + 3s + (int:requestId) + 5s "zilla" # client id + -1s + [0..2] + [0..4] + 1 + 4s "test" + 1 + 0 + 80 # record set size + 0L # first offset + 68 # length + -1 + [0x02] + [0..4] + 0s + 0 # last offset delta + (long:timestamp) # first timestamp + ${timestamp} # last timestamp + 8L + 1s + 0 + 1 # records + ${kafka:varint(18)} + [0x00] + ${kafka:varint(0)} + ${kafka:varint(0)} + ${kafka:varint(-1)} # key + ${kafka:varint(12)} # value + "Hello, world" + ${kafka:varint(0)} # headers + +write 44 + ${requestId} + 1 # topics + 4s "test" + 1 # partitions + 0 # partition 0 + 0s # no error + 20L # base offset + 0L # log append time + 0 # throttle diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.changes/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.changes/client.rpt new file mode 100644 index 0000000000..01d3e62f46 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.changes/client.rpt @@ -0,0 +1,174 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property networkConnectWindow 8192 + +property newRequestId ${kafka:newRequestId()} +property produceWaitMax 500 + +connect "zilla://streams/net0" + option zilla:window ${networkConnectWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +connected + +write 26 # size + 3s # metadata + 5s # v5 + ${newRequestId} + 5s "zilla" # client id + 1 # topics + 4s "test" # "test" topic + [0x00] # allow_auto_topic_creation + +read 97 # size + ${newRequestId} + [0..4] + 1 # brokers + 0xb1 # broker id + 19s "broker1.example.com" # host name + 9092 # port + -1s # no rack + 9s "cluster 1" # cluster id + 1 # controller id + 1 # topics + 0s # no error + 4s "test" # "test" topic + [0x00] # not internal + 1 # partitions + 0s # no error + 0 # partition + 0xb1 # leader + 0 # no replicas + -1 # no in-sync replicas + 0 # offline replicas + +read notify ROUTED_BROKER_SERVER + +connect await ROUTED_BROKER_SERVER + "zilla://streams/net0" + option zilla:window ${networkConnectWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +write zilla:begin.ext ${proxy:beginEx() + .typeId(zilla:id("proxy")) + .addressInet() + .protocol("stream") + .source("0.0.0.0") + .destination("broker1.example.com") + .sourcePort(0) + .destinationPort(9092) + .build() + .info() + .authority("broker1.example.com") + .build() + .build()} + +connected + +write 125 # size + 0s # produce + 3s # v3 + ${newRequestId} + 5s "zilla" # client id + -1s # transactional id + 0s # acks + ${produceWaitMax} + 1 + 4s "test" + 1 + 0 # partition + 80 # record set size + 0L # first offset + 68 # length + -1 + [0x02] + 0x4e8723aa + 0s + 0 # last offset delta + ${newTimestamp} # first timestamp + ${newTimestamp} # last timestamp + 1L + 1s + 0 + 1 # records + ${kafka:varint(18)} + [0x00] + ${kafka:varint(0)} + ${kafka:varint(0)} + ${kafka:varint(-1)} # key + ${kafka:varint(12)} # value + "Hello, world" + ${kafka:varint(0)} # headers + +read 44 + ${newRequestId} + 1 # topics + 4s "test" + 1 # partitions + 0 # partition + 0s # no error + 20L # base offset + [0..8] # log append time + [0..4] # throttle ms + +write 125 # size + 0s # produce + 3s # v3 + ${newRequestId} + 5s "zilla" # client id + -1s # transactional id + 0s # acks + ${produceWaitMax} + 1 + 4s "test" + 1 + 0 # partition + 80 # record set size + 0L # first offset + 68 # length + -1 + [0x02] + 0x4e8723aa + 0s + 0 # last offset delta + ${newTimestamp} # first timestamp + ${newTimestamp} # last timestamp + 2L + 2s + 0 + 1 # records + ${kafka:varint(18)} + [0x00] + ${kafka:varint(0)} + ${kafka:varint(0)} + ${kafka:varint(-1)} # key + ${kafka:varint(12)} # value + "Hello, again" + ${kafka:varint(0)} # headers + +read 44 + ${newRequestId} + 1 # topics + 4s "test" + 1 # partitions + 0 # partition + 0s # no error + 20L # base offset + [0..8] # log append time + [0..4] # throttle ms diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.changes/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.changes/server.rpt new file mode 100644 index 0000000000..2d8dc04b47 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.changes/server.rpt @@ -0,0 +1,170 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property networkAcceptWindow 8192 + +property deltaMillis 0L +property newTimestamp ${kafka:timestamp() + deltaMillis} + +accept "zilla://streams/net0" + option zilla:window ${networkAcceptWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +accepted + +connected + +read 26 # size + 3s # metadata + 5s # v5 + (int:requestId) + 5s "zilla" # client id + 1 # topics + 4s "test" # "test" topic + [0x00] # allow_auto_topic_creation + +write 97 # size + ${requestId} + 0 + 1 # brokers + 0xb1 # broker id + 19s "broker1.example.com" # host name + 9092 # port + -1s # no rack + 9s "cluster 1" # cluster id + 1 # controller id + 1 # topics + 0s # no error + 4s "test" # "test" topic + [0x00] # not internal + 1 # partitions + 0s # no error + 0 # partition + 0xb1 # leader + 0 # no replicas + -1 # no in-sync replicas + 0 # offline replicas + +accepted + +read zilla:begin.ext ${proxy:matchBeginEx() + .typeId(zilla:id("proxy")) + .addressInet() + .protocol("stream") + .source("0.0.0.0") + .destination("broker1.example.com") + .sourcePort(0) + .destinationPort(9092) + .build() + .info() + .authority("broker1.example.com") + .build() + .build()} + +connected + +read 125 + 0s + 3s + (int:requestId) + 5s "zilla" # client id + -1s + [0..2] + [0..4] + 1 + 4s "test" + 1 + 0 + 80 # record set size + 0L # first offset + 68 # length + -1 + [0x02] + [0..4] + 0s + 0 # last offset delta + (long:timestamp) # first timestamp + ${timestamp} # last timestamp + 1L + 1s + 0 + 1 # records + ${kafka:varint(18)} + [0x00] + ${kafka:varint(0)} + ${kafka:varint(0)} + ${kafka:varint(-1)} # key + ${kafka:varint(12)} # value + "Hello, world" + ${kafka:varint(0)} # headers + +write 44 + ${requestId} + 1 # topics + 4s "test" + 1 # partitions + 0 # partition 0 + 0s # no error + 20L # base offset + 0L # log append time + 0 # throttle + +read 125 + 0s + 3s + (int:requestId) + 5s "zilla" # client id + -1s + [0..2] + [0..4] + 1 + 4s "test" + 1 + 0 + 80 # record set size + 0L # first offset + 68 # length + -1 + [0x02] + [0..4] + 0s + 0 # last offset delta + (long:timestamp) # first timestamp + ${timestamp} # last timestamp + 2L + 2s + 0 + 1 # records + ${kafka:varint(18)} + [0x00] + ${kafka:varint(0)} + ${kafka:varint(0)} + ${kafka:varint(-1)} # key + ${kafka:varint(12)} # value + "Hello, again" + ${kafka:varint(0)} # headers + +write 44 + ${requestId} + 1 # topics + 4s "test" + 1 # partitions + 0 # partition 0 + 0s # no error + 20L # base offset + 0L # log append time + 0 # throttle diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.replay/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.replay/client.rpt new file mode 100644 index 0000000000..e182fd23d7 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.replay/client.rpt @@ -0,0 +1,174 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property networkConnectWindow 8192 + +property newRequestId ${kafka:newRequestId()} +property produceWaitMax 500 + +connect "zilla://streams/net0" + option zilla:window ${networkConnectWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +connected + +write 26 # size + 3s # metadata + 5s # v5 + ${newRequestId} + 5s "zilla" # client id + 1 # topics + 4s "test" # "test" topic + [0x00] # allow_auto_topic_creation + +read 97 # size + ${newRequestId} + [0..4] + 1 # brokers + 0xb1 # broker id + 19s "broker1.example.com" # host name + 9092 # port + -1s # no rack + 9s "cluster 1" # cluster id + 1 # controller id + 1 # topics + 0s # no error + 4s "test" # "test" topic + [0x00] # not internal + 1 # partitions + 0s # no error + 0 # partition + 0xb1 # leader + 0 # no replicas + -1 # no in-sync replicas + 0 # offline replicas + +read notify ROUTED_BROKER_SERVER + +connect await ROUTED_BROKER_SERVER + "zilla://streams/net0" + option zilla:window ${networkConnectWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +write zilla:begin.ext ${proxy:beginEx() + .typeId(zilla:id("proxy")) + .addressInet() + .protocol("stream") + .source("0.0.0.0") + .destination("broker1.example.com") + .sourcePort(0) + .destinationPort(9092) + .build() + .info() + .authority("broker1.example.com") + .build() + .build()} + +connected + +write 125 # size + 0s # produce + 3s # v3 + ${newRequestId} + 5s "zilla" # client id + -1s # transactional id + 0s # acks + ${produceWaitMax} + 1 + 4s "test" + 1 + 0 # partition + 80 # record set size + 0L # first offset + 68 # length + -1 + [0x02] + 0x4e8723aa + 0s + 0 # last offset delta + ${newTimestamp} # first timestamp + ${newTimestamp} # last timestamp + 1L + 1s + 2 + 1 # records + ${kafka:varint(18)} + [0x00] + ${kafka:varint(0)} + ${kafka:varint(0)} + ${kafka:varint(-1)} # key + ${kafka:varint(12)} # value + "Hello, world" + ${kafka:varint(0)} # headers + +read 44 + ${newRequestId} + 1 # topics + 4s "test" + 1 # partitions + 0 # partition + 0s # no error + 20L # base offset + [0..8] # log append time + [0..4] # throttle ms + +write 125 # size + 0s # produce + 3s # v3 + ${newRequestId} + 5s "zilla" # client id + -1s # transactional id + 0s # acks + ${produceWaitMax} + 1 + 4s "test" + 1 + 0 # partition + 80 # record set size + 0L # first offset + 68 # length + -1 + [0x02] + 0x4e8723aa + 0s + 0 # last offset delta + ${newTimestamp} # first timestamp + ${newTimestamp} # last timestamp + 1L + 1s + 1 + 1 # records + ${kafka:varint(18)} + [0x00] + ${kafka:varint(0)} + ${kafka:varint(0)} + ${kafka:varint(-1)} # key + ${kafka:varint(12)} # value + "Hello, again" + ${kafka:varint(0)} # headers + +read 44 + ${newRequestId} + 1 # topics + 4s "test" + 1 # partitions + 0 # partition + 0s # no error + 20L # base offset + [0..8] # log append time + [0..4] # throttle ms diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.replay/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.replay/server.rpt new file mode 100644 index 0000000000..bc36eb5f18 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.replay/server.rpt @@ -0,0 +1,170 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property networkAcceptWindow 8192 + +property deltaMillis 0L +property newTimestamp ${kafka:timestamp() + deltaMillis} + +accept "zilla://streams/net0" + option zilla:window ${networkAcceptWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +accepted + +connected + +read 26 # size + 3s # metadata + 5s # v5 + (int:requestId) + 5s "zilla" # client id + 1 # topics + 4s "test" # "test" topic + [0x00] # allow_auto_topic_creation + +write 97 # size + ${requestId} + 0 + 1 # brokers + 0xb1 # broker id + 19s "broker1.example.com" # host name + 9092 # port + -1s # no rack + 9s "cluster 1" # cluster id + 1 # controller id + 1 # topics + 0s # no error + 4s "test" # "test" topic + [0x00] # not internal + 1 # partitions + 0s # no error + 0 # partition + 0xb1 # leader + 0 # no replicas + -1 # no in-sync replicas + 0 # offline replicas + +accepted + +read zilla:begin.ext ${proxy:matchBeginEx() + .typeId(zilla:id("proxy")) + .addressInet() + .protocol("stream") + .source("0.0.0.0") + .destination("broker1.example.com") + .sourcePort(0) + .destinationPort(9092) + .build() + .info() + .authority("broker1.example.com") + .build() + .build()} + +connected + +read 125 + 0s + 3s + (int:requestId) + 5s "zilla" # client id + -1s + [0..2] + [0..4] + 1 + 4s "test" + 1 + 0 + 80 # record set size + 0L # first offset + 68 # length + -1 + [0x02] + [0..4] + 0s + 0 # last offset delta + (long:timestamp) # first timestamp + ${timestamp} # last timestamp + 1L + 1s + 2 + 1 # records + ${kafka:varint(18)} + [0x00] + ${kafka:varint(0)} + ${kafka:varint(0)} + ${kafka:varint(-1)} # key + ${kafka:varint(12)} # value + "Hello, world" + ${kafka:varint(0)} # headers + +write 44 + ${requestId} + 1 # topics + 4s "test" + 1 # partitions + 0 # partition 0 + 0s # no error + 20L # base offset + 0L # log append time + 0 # throttle + +read 125 + 0s + 3s + (int:requestId) + 5s "zilla" # client id + -1s + [0..2] + [0..4] + 1 + 4s "test" + 1 + 0 + 80 # record set size + 0L # first offset + 68 # length + -1 + [0x02] + [0..4] + 0s + 0 # last offset delta + (long:timestamp) # first timestamp + ${timestamp} # last timestamp + 1L + 1s + 1 + 1 # records + ${kafka:varint(18)} + [0x00] + ${kafka:varint(0)} + ${kafka:varint(0)} + ${kafka:varint(-1)} # key + ${kafka:varint(12)} # value + "Hello, again" + ${kafka:varint(0)} # headers + +write 44 + ${requestId} + 1 # topics + 4s "test" + 1 # partitions + 0 # partition 0 + 0s # no error + 20L # base offset + 0L # log append time + 0 # throttle diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id/client.rpt new file mode 100644 index 0000000000..c0652f5969 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id/client.rpt @@ -0,0 +1,136 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property networkConnectWindow 8192 + +property newRequestId ${kafka:newRequestId()} +property produceWaitMax 500 + +connect "zilla://streams/net0" + option zilla:window ${networkConnectWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +connected + +write 26 # size + 3s # metadata + 5s # v5 + ${newRequestId} + 5s "zilla" # client id + 1 # topics + 4s "test" # "test" topic + [0x00] # allow_auto_topic_creation + +read 97 # size + ${newRequestId} + [0..4] + 1 # brokers + 0xb1 # broker id + 19s "broker1.example.com" # host name + 9092 # port + -1s # no rack + 9s "cluster 1" # cluster id + 1 # controller id + 1 # topics + 0s # no error + 4s "test" # "test" topic + [0x00] # not internal + 1 # partitions + 0s # no error + 0 # partition + 0xb1 # leader + 0 # no replicas + -1 # no in-sync replicas + 0 # offline replicas + +read notify ROUTED_BROKER_SERVER + +connect await ROUTED_BROKER_SERVER + "zilla://streams/net0" + option zilla:window ${networkConnectWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +write zilla:begin.ext ${proxy:beginEx() + .typeId(zilla:id("proxy")) + .addressInet() + .protocol("stream") + .source("0.0.0.0") + .destination("broker1.example.com") + .sourcePort(0) + .destinationPort(9092) + .build() + .info() + .authority("broker1.example.com") + .build() + .build()} + +connected + +write 144 # size + 0s # produce + 3s # v3 + ${newRequestId} + 5s "zilla" # client id + -1s # transactional id + 0s # acks + ${produceWaitMax} + 1 + 4s "test" + 1 + 0 # partition + 99 # record set size + 0L # first offset + 87 # length + -1 + [0x02] + 0x4e8723aa + 0s + 1 # last offset delta + ${newTimestamp} # first timestamp + ${newTimestamp} # last timestamp + 1L + 1s + 0 + 2 # records + ${kafka:varint(18)} + [0x00] + ${kafka:varint(0)} + ${kafka:varint(0)} + ${kafka:varint(-1)} # key + ${kafka:varint(12)} # value + "Hello, world" + ${kafka:varint(0)} # headers + ${kafka:varint(18)} + [0x00] + ${kafka:varint(0)} + ${kafka:varint(1)} + ${kafka:varint(-1)} # key + ${kafka:varint(12)} # value + "Hello, again" + ${kafka:varint(0)} # headers + +read 44 + ${newRequestId} + 1 # topics + 4s "test" + 1 # partitions + 0 # partition + 0s # no error + 20L # base offset + [0..8] # log append time + [0..4] # throttle ms diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id/server.rpt new file mode 100644 index 0000000000..1a28c212c0 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id/server.rpt @@ -0,0 +1,133 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property networkAcceptWindow 8192 + +property deltaMillis 0L +property newTimestamp ${kafka:timestamp() + deltaMillis} + +accept "zilla://streams/net0" + option zilla:window ${networkAcceptWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +accepted + +connected + +read 26 # size + 3s # metadata + 5s # v5 + (int:requestId) + 5s "zilla" # client id + 1 # topics + 4s "test" # "test" topic + [0x00] # allow_auto_topic_creation + +write 97 # size + ${requestId} + 0 + 1 # brokers + 0xb1 # broker id + 19s "broker1.example.com" # host name + 9092 # port + -1s # no rack + 9s "cluster 1" # cluster id + 1 # controller id + 1 # topics + 0s # no error + 4s "test" # "test" topic + [0x00] # not internal + 1 # partitions + 0s # no error + 0 # partition + 0xb1 # leader + 0 # no replicas + -1 # no in-sync replicas + 0 # offline replicas + +accepted + +read zilla:begin.ext ${proxy:matchBeginEx() + .typeId(zilla:id("proxy")) + .addressInet() + .protocol("stream") + .source("0.0.0.0") + .destination("broker1.example.com") + .sourcePort(0) + .destinationPort(9092) + .build() + .info() + .authority("broker1.example.com") + .build() + .build()} + +connected + +read 144 + 0s + 3s + (int:requestId) + 5s "zilla" # client id + -1s + [0..2] + [0..4] + 1 + 4s "test" + 1 + 0 + 99 # record set size + 0L # first offset + 87 # length + -1 + [0x02] + [0..4] + 0s + 1 # last offset delta + (long:timestamp) # first timestamp + ${timestamp} # last timestamp + 1L + 1s + 0 + 2 # records + ${kafka:varint(18)} + [0x00] + ${kafka:varint(0)} + ${kafka:varint(0)} + ${kafka:varint(-1)} # key + ${kafka:varint(12)} # value + "Hello, world" + ${kafka:varint(0)} # headers + ${kafka:varint(18)} + [0x00] + ${kafka:varint(0)} + ${kafka:varint(1)} + ${kafka:varint(-1)} # key + ${kafka:varint(12)} # value + "Hello, again" + ${kafka:varint(0)} # headers + + +write 44 + ${requestId} + 1 # topics + 4s "test" + 1 # partitions + 0 # partition 0 + 0s # no error + 20L # base offset + 0L # log append time + 0 # throttle diff --git a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctionsTest.java b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctionsTest.java index a8e198acd4..4b36b4164d 100644 --- a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctionsTest.java +++ b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctionsTest.java @@ -1124,6 +1124,27 @@ public void shouldGenerateMergedFetchFlushExtensionWithStableOffset() .get((b, o, m) -> b.getStringWithoutLengthUtf8(o, m - o)))) != null)); } + @Test + public void shouldGenerateMergedProduceFlushExtension() + { + byte[] build = KafkaFunctions.flushEx() + .typeId(0x01) + .merged() + .produce() + .hashKey("hashTopic") + .partitionId(0) + .build() + .build(); + + DirectBuffer buffer = new UnsafeBuffer(build); + KafkaFlushExFW flushEx = new KafkaFlushExFW().wrap(buffer, 0, buffer.capacity()); + assertEquals(0x01, flushEx.typeId()); + + assertEquals("hashTopic", flushEx.merged().produce().hashKey() + .value() + .get((b, o, m) -> b.getStringWithoutLengthUtf8(o, m - o))); + } + @Test public void shouldGenerateMergedConsumerFlushExtension() { @@ -1156,6 +1177,8 @@ public void shouldMatchProduceMergedDataExtension() throws Exception .partition(0, 0L) .progress(0, 1L) .timestamp(12345678L) + .producerId(1L) + .producerEpoch((short) 1) .key("match") .header("name", "value") .headerNull("name-n") @@ -1170,6 +1193,8 @@ public void shouldMatchProduceMergedDataExtension() throws Exception .merged(m -> m.produce(mp -> mp .deferred(100) .timestamp(12345678L) + .producerId(1L) + .producerEpoch((short) 1) .partition(p -> p.partitionId(0).partitionOffset(0L)) .key(k -> k.length(5) .value(v -> v.set("match".getBytes(UTF_8)))) diff --git a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/InitProducerIdIT.java b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/InitProducerIdIT.java new file mode 100644 index 0000000000..794acf430c --- /dev/null +++ b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/InitProducerIdIT.java @@ -0,0 +1,47 @@ +/* + * Copyright 2021-2023 Aklivity Inc. + * + * Aklivity licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.aklivity.zilla.specs.binding.kafka.streams.application; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.rules.RuleChain.outerRule; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.DisableOnDebug; +import org.junit.rules.TestRule; +import org.junit.rules.Timeout; +import org.kaazing.k3po.junit.annotation.Specification; +import org.kaazing.k3po.junit.rules.K3poRule; + +public class InitProducerIdIT +{ + private final K3poRule k3po = new K3poRule() + .addScriptRoot("app", "io/aklivity/zilla/specs/binding/kafka/streams/application/init.producer.id"); + + private final TestRule timeout = new DisableOnDebug(new Timeout(5, SECONDS)); + + @Rule + public final TestRule chain = outerRule(k3po).around(timeout); + + @Test + @Specification({ + "${app}/produce.new.id/client", + "${app}/produce.new.id/server"}) + public void shouldGenerateNewProducerId() throws Exception + { + k3po.finish(); + } +} diff --git a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/MergedIT.java b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/MergedIT.java index 44cae789c0..baed1b7cc8 100644 --- a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/MergedIT.java +++ b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/MergedIT.java @@ -746,4 +746,41 @@ public void shouldAckUnmergedFetchMessage() throws Exception { k3po.finish(); } + + @Test + @Specification({ + "${app}/merged.produce.message.values.producer.id/client", + "${app}/merged.produce.message.values.producer.id/server"}) + public void shouldProduceMergedMessageValuesWithProducerId() throws Exception + { + k3po.finish(); + } + + @Test + @Specification({ + "${app}/unmerged.produce.message.values.producer.id/client", + "${app}/unmerged.produce.message.values.producer.id/server"}) + public void shouldProduceUnmergedMessageValuesWithProducerId() throws Exception + { + k3po.finish(); + } + + @Test + @Specification({ + "${app}/merged.produce.message.value.partition.id/client", + "${app}/merged.produce.message.value.partition.id/server"}) + public void shouldProduceMergedMessageValueByGettingPartitionId() throws Exception + { + k3po.finish(); + } + + @Test + @Specification({ + "${app}/unmerged.produce.message.value.partition.id/client", + "${app}/unmerged.produce.message.value.partition.id/server"}) + public void shouldProduceUnmergedMessageValueByGettingPartitionId() throws Exception + { + k3po.finish(); + } + } diff --git a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/ProduceIT.java b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/ProduceIT.java index d15b907eb0..68c95e2df5 100644 --- a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/ProduceIT.java +++ b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/ProduceIT.java @@ -145,6 +145,42 @@ public void shouldSendMessageValue() throws Exception k3po.finish(); } + @Test + @Specification({ + "${app}/message.producer.id/client", + "${app}/message.producer.id/server"}) + public void shouldSendMessageValueWithProducerId() throws Exception + { + k3po.finish(); + } + + @Test + @Specification({ + "${app}/message.values.producer.id/client", + "${app}/message.values.producer.id/server"}) + public void shouldSendMessageValuesWithProducerId() throws Exception + { + k3po.finish(); + } + + @Test + @Specification({ + "${app}/message.values.producer.id.changes/client", + "${app}/message.values.producer.id.changes/server"}) + public void shouldSendMessageValuesWithProducerIdThatChanges() throws Exception + { + k3po.finish(); + } + + @Test + @Specification({ + "${app}/message.values.producer.id.replay/client", + "${app}/message.values.producer.id.replay/server"}) + public void shouldReplyMessageValuesWithProducerId() throws Exception + { + k3po.finish(); + } + @Test @Specification({ "${app}/message.value.null/client", diff --git a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/network/InitProducerIdIT.java b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/network/InitProducerIdIT.java new file mode 100644 index 0000000000..db9e626f68 --- /dev/null +++ b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/network/InitProducerIdIT.java @@ -0,0 +1,47 @@ +/* + * Copyright 2021-2023 Aklivity Inc. + * + * Aklivity licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.aklivity.zilla.specs.binding.kafka.streams.network; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.rules.RuleChain.outerRule; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.DisableOnDebug; +import org.junit.rules.TestRule; +import org.junit.rules.Timeout; +import org.kaazing.k3po.junit.annotation.Specification; +import org.kaazing.k3po.junit.rules.K3poRule; + +public class InitProducerIdIT +{ + private final K3poRule k3po = new K3poRule() + .addScriptRoot("net", "io/aklivity/zilla/specs/binding/kafka/streams/network/init.producer.id.v4"); + + private final TestRule timeout = new DisableOnDebug(new Timeout(5, SECONDS)); + + @Rule + public final TestRule chain = outerRule(k3po).around(timeout); + + @Test + @Specification({ + "${net}/produce.new.id/client", + "${net}/produce.new.id/server"}) + public void shouldGenerateNewProducerId() throws Exception + { + k3po.finish(); + } +} diff --git a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/network/InitProducerIdSaslIT.java b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/network/InitProducerIdSaslIT.java new file mode 100644 index 0000000000..a3c795dfd5 --- /dev/null +++ b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/network/InitProducerIdSaslIT.java @@ -0,0 +1,57 @@ +/* + * Copyright 2021-2023 Aklivity Inc. + * + * Aklivity licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.aklivity.zilla.specs.binding.kafka.streams.network; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.rules.RuleChain.outerRule; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.DisableOnDebug; +import org.junit.rules.TestRule; +import org.junit.rules.Timeout; +import org.kaazing.k3po.junit.annotation.Specification; +import org.kaazing.k3po.junit.rules.K3poRule; + +public class InitProducerIdSaslIT +{ + private final K3poRule k3po = new K3poRule() + .addScriptRoot("net", + "io/aklivity/zilla/specs/binding/kafka/streams/network/init.producer.id.v4.sasl.handshake.v1"); + + private final TestRule timeout = new DisableOnDebug(new Timeout(5, SECONDS)); + + @Rule + public final TestRule chain = outerRule(k3po).around(timeout); + + @Test + @Specification({ + "${net}/produce.new.id.sasl.plain/client", + "${net}/produce.new.id.sasl.plain/server"}) + public void shouldGenerateNewProducerIdWithSaslPlain() throws Exception + { + k3po.finish(); + } + + @Test + @Specification({ + "${net}/produce.new.id.sasl.scram/client", + "${net}/produce.new.id.sasl.scram/server"}) + public void shouldGenerateNewProducerIdWithSaslScram() throws Exception + { + k3po.finish(); + } +} diff --git a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/network/ProduceIT.java b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/network/ProduceIT.java index 5ec40e1f68..f62ffb3baf 100644 --- a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/network/ProduceIT.java +++ b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/network/ProduceIT.java @@ -118,6 +118,42 @@ public void shouldSendMessageValue() throws Exception k3po.finish(); } + @Test + @Specification({ + "${net}/message.producer.id/client", + "${net}/message.producer.id/server"}) + public void shouldSendMessageValueWithProducerId() throws Exception + { + k3po.finish(); + } + + @Test + @Specification({ + "${net}/message.values.producer.id/client", + "${net}/message.values.producer.id/server"}) + public void shouldSendMessageValuesWithProducerId() throws Exception + { + k3po.finish(); + } + + @Test + @Specification({ + "${net}/message.values.producer.id.changes/client", + "${net}/message.values.producer.id.changes/server"}) + public void shouldSendMessageValuesWithProducerIdThatChanges() throws Exception + { + k3po.finish(); + } + + @Test + @Specification({ + "${net}/message.values.producer.id.replay/client", + "${net}/message.values.producer.id.replay/server"}) + public void shouldReplyMessageValuesWithProducerId() throws Exception + { + k3po.finish(); + } + @Test @Specification({ "${net}/message.value.null/client",