From 2cef977324c9b78508de72b0eb49c6eff88346c0 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Wed, 25 Oct 2023 13:35:29 -0700 Subject: [PATCH 1/3] Adjust padding to accommodate good enough headers and don't include partial data frame while computing crc32c value --- .../internal/stream/KafkaClientProduceFactory.java | 9 +++++++-- .../kafka/internal/stream/ClientMergedIT.java | 6 +++--- .../produce/message.value.100k/client.rpt | 2 +- .../produce/message.value.100k/server.rpt | 4 ++-- .../application/produce/message.value.10k/client.rpt | 2 +- .../application/produce/message.value.10k/server.rpt | 4 ++-- .../produce/message.values.sequential/client.rpt | 2 +- .../produce/message.values.sequential/server.rpt | 2 +- .../produce.v3/message.values.sequential/client.rpt | 12 ++++++------ .../produce.v3/message.values.sequential/server.rpt | 12 ++++++------ 10 files changed, 30 insertions(+), 25 deletions(-) diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java index d6c2cb2b5b..9690473ba5 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java @@ -86,7 +86,7 @@ public final class KafkaClientProduceFactory extends KafkaClientSaslHandshaker i { private static final int PRODUCE_REQUEST_RECORDS_OFFSET_MAX = 512; - private static final int KAFKA_RECORD_FRAMING = 100; // TODO + private static final int KAFKA_RECORD_FRAMING = 512; // TODO private static final int FLAGS_CON = 0x00; private static final int FLAGS_FIN = 0x01; @@ -539,6 +539,7 @@ private int flushRecordInit( final int valueSize = payload != null ? payload.sizeof() : 0; client.valueCompleteSize = valueSize + client.encodeableRecordBytesDeferred; + final int maxEncodeableBytes = client.encodeSlotLimit + client.valueCompleteSize + KAFKA_RECORD_FRAMING; if (client.encodeSlot != NO_SLOT && maxEncodeableBytes > encodePool.slotCapacity()) @@ -1191,6 +1192,7 @@ private final class KafkaProduceClient extends KafkaSaslClient private int encodeableRecordCount; private int encodeableRecordBytes; private int encodeableRecordBytesDeferred; + private int encodeableRecordValueBytes; private int flushableRequestBytes; private int decodeSlot = NO_SLOT; @@ -1652,6 +1654,7 @@ private void doEncodeRecordInit( encodeSlotBuffer.putBytes(encodeSlotLimit, encodeBuffer, 0, encodeProgress); encodeSlotLimit += encodeProgress; + encodeableRecordValueBytes = 0; if (headersCount > 0) { @@ -1689,6 +1692,7 @@ private void doEncodeRecordCont( encodeSlotBuffer.putBytes(encodeSlotLimit, value.buffer(), value.offset(), length); encodeSlotLimit += length; + encodeableRecordValueBytes += length; if ((flags & FLAGS_FIN) == 0) { @@ -1893,7 +1897,8 @@ private void doEncodeProduceRequest( final ByteBuffer encodeSlotByteBuffer = encodePool.byteBuffer(encodeSlot); final int encodeSlotBytePosition = encodeSlotByteBuffer.position(); - encodeSlotByteBuffer.limit(encodeSlotBytePosition + encodeSlotLimit); + final int partialValueSize = flushFlags != FLAGS_FIN ? encodeableRecordValueBytes : 0; + encodeSlotByteBuffer.limit(encodeSlotBytePosition + encodeSlotLimit - partialValueSize); encodeSlotByteBuffer.position(encodeSlotBytePosition + encodeSlotOffset + crcLimit); final CRC32C crc = crc32c; diff --git a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientMergedIT.java b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientMergedIT.java index 8ae2d6fd51..7803a7ce1f 100644 --- a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientMergedIT.java +++ b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientMergedIT.java @@ -49,7 +49,7 @@ public class ClientMergedIT .countersBufferCapacity(8192) .configure(ENGINE_BUFFER_SLOT_CAPACITY, 8192) .configure(KAFKA_CLIENT_META_MAX_AGE_MILLIS, 1000) - .configure(KAFKA_CLIENT_PRODUCE_MAX_BYTES, 116) + .configure(KAFKA_CLIENT_PRODUCE_MAX_BYTES, 528) .configurationRoot("io/aklivity/zilla/specs/binding/kafka/config") .external("net0") .clean(); @@ -234,7 +234,7 @@ public void shouldProduceMergedMessageValues() throws Exception @Configure( name = "zilla.binding.kafka.client.produce.max.bytes", value = "200000") - @ScriptProperty("padding ${512 + 100}") + @ScriptProperty("padding ${512 + 512}") public void shouldProduceMergedMessageValue10k() throws Exception { k3po.finish(); @@ -248,7 +248,7 @@ public void shouldProduceMergedMessageValue10k() throws Exception @Configure( name = "zilla.binding.kafka.client.produce.max.bytes", value = "200000") - @ScriptProperty("padding ${512 + 100}") + @ScriptProperty("padding ${512 + 512}") public void shouldProduceMergedMessageValue100k() throws Exception { k3po.finish(); diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.100k/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.100k/client.rpt index b21502d3b5..cd5237ac4a 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.100k/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.100k/client.rpt @@ -73,7 +73,7 @@ read zilla:begin.ext ${kafka:beginEx() write zilla:data.ext ${kafka:dataEx() .typeId(zilla:id("kafka")) .produce() - .deferred(102400 - 8192 + 512 + 100) + .deferred(102400 - 8192 + 512 + 512) .timestamp(newTimestamp) .build() .build()} diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.100k/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.100k/server.rpt index cf93c76e87..6cd3a8d747 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.100k/server.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.100k/server.rpt @@ -18,7 +18,7 @@ property serverAddress "zilla://streams/app0" accept ${serverAddress} option zilla:window 8192 - option zilla:padding 612 + option zilla:padding 1024 option zilla:transmission "half-duplex" accepted @@ -71,7 +71,7 @@ write zilla:begin.ext ${kafka:beginEx() read zilla:data.ext ${kafka:matchDataEx() .typeId(zilla:id("kafka")) .produce() - .deferred(102400 - 8192 + 512 + 100) + .deferred(102400 - 8192 + 512 + 512) .build() .build()} diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.10k/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.10k/client.rpt index dcd8f9d0de..b054e49965 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.10k/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.10k/client.rpt @@ -73,7 +73,7 @@ read zilla:begin.ext ${kafka:beginEx() write zilla:data.ext ${kafka:dataEx() .typeId(zilla:id("kafka")) .produce() - .deferred(10240 - 8192 + 512 + 100) + .deferred(10240 - 8192 + 512 + 512) .timestamp(newTimestamp) .build() .build()} diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.10k/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.10k/server.rpt index c248b9e910..6b3eaeb77b 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.10k/server.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.value.10k/server.rpt @@ -18,7 +18,7 @@ property serverAddress "zilla://streams/app0" accept ${serverAddress} option zilla:window 8192 - option zilla:padding 612 + option zilla:padding 1024 option zilla:transmission "half-duplex" accepted @@ -71,7 +71,7 @@ write zilla:begin.ext ${kafka:beginEx() read zilla:data.ext ${kafka:matchDataEx() .typeId(zilla:id("kafka")) .produce() - .deferred(10240 - 8192 + 512 + 100) + .deferred(10240 - 8192 + 512 + 512) .build() .build()} read zilla:data.ext ${kafka:matchDataEx() diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.sequential/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.sequential/client.rpt index 8ddf6911ff..5b8050c2df 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.sequential/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.sequential/client.rpt @@ -81,7 +81,7 @@ write zilla:data.ext ${kafka:dataEx() .produce() .build() .build()} -write ${kafka:randomBytes(7580)} +write ${kafka:randomBytes(8192-(512+512))} write flush write zilla:data.ext ${kafka:dataEx() diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.sequential/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.sequential/server.rpt index b9d3f4903c..5a2d505c1c 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.sequential/server.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.sequential/server.rpt @@ -79,7 +79,7 @@ read zilla:data.ext ${kafka:matchDataEx() .produce() .build() .build()} -read [0..7580] +read [0..7168] read zilla:data.ext ${kafka:matchDataEx() .typeId(zilla:id("kafka")) diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.sequential/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.sequential/client.rpt index 6e967c671d..9b48621d87 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.sequential/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.sequential/client.rpt @@ -78,7 +78,7 @@ write zilla:begin.ext ${proxy:beginEx() connected -write 7690 +write 7278 0s 3s ${newRequestId} @@ -90,9 +90,9 @@ write 7690 4s "test" 1 0 - 7650 # record set size + 7238 # record set size 0L # first offset - 7638 # length + 7226 # length -1 [0x02] 0x4e8723aa @@ -104,13 +104,13 @@ write 7690 -1s -1 1 # records - ${kafka:varint(7587)} + ${kafka:varint(7175)} [0x00] ${kafka:varint(0)} ${kafka:varint(0)} ${kafka:varint(-1)} # key - ${kafka:varint(7580)} # value - ${kafka:randomBytes(7580)} + ${kafka:varint(7168)} # value + ${kafka:randomBytes(7168)} ${kafka:varint(0)} # headers read 44 diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.sequential/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.sequential/server.rpt index 0c2dae01c2..a6aaebb5a4 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.sequential/server.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.sequential/server.rpt @@ -74,7 +74,7 @@ read zilla:begin.ext ${proxy:beginEx() connected -read 7690 +read 7278 0s 3s (int:requestId) @@ -86,9 +86,9 @@ read 7690 4s "test" 1 0 - 7650 # record set size + 7238 # record set size 0L # first offset - 7638 # length + 7226 # length -1 [0x02] [0..4] @@ -100,13 +100,13 @@ read 7690 -1s -1 1 # records - ${kafka:varint(7587)} + ${kafka:varint(7175)} [0x00] ${kafka:varint(0)} ${kafka:varint(0)} ${kafka:varint(-1)} # key - ${kafka:varint(7580)} # value - [0..7580] + ${kafka:varint(7168)} # value + [0..7168] ${kafka:varint(0)} # headers write 44 From 21ac36741abd05bc7a33d19f54acd837451980f1 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Wed, 10 Apr 2024 14:12:38 -0700 Subject: [PATCH 2/3] Fix schema validation parsing --- .../internal/config/OpenapiClientNamespaceGenerator.java | 6 +++++- .../openapi/internal/config/OpenapiNamespaceGenerator.java | 4 +++- .../internal/config/OpenapiServerNamespaceGenerator.java | 5 ++++- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiClientNamespaceGenerator.java b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiClientNamespaceGenerator.java index 646753900e..b19cfb1c8f 100644 --- a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiClientNamespaceGenerator.java +++ b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiClientNamespaceGenerator.java @@ -32,6 +32,7 @@ import io.aklivity.zilla.runtime.binding.openapi.internal.model.OpenapiHeader; import io.aklivity.zilla.runtime.binding.openapi.internal.model.OpenapiResponse; import io.aklivity.zilla.runtime.binding.openapi.internal.model.OpenapiResponseByContentType; +import io.aklivity.zilla.runtime.binding.openapi.internal.model.OpenapiSchema; import io.aklivity.zilla.runtime.binding.openapi.internal.model.OpenapiServer; import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiOperationView; import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiOperationsView; @@ -202,7 +203,10 @@ private HttpResponseConfigBuilder injectResponseHeaders( for (Map.Entry header : responses.headers.entrySet()) { String name = header.getKey(); - ModelConfig model = models.get(header.getValue().schema.type); + OpenapiSchema schema = header.getValue().schema; + String modelName = schema.format != null ? String.format("%s:%s", schema.type, schema.format) : + schema.type; + ModelConfig model = models.get(modelName); if (model != null) { response diff --git a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiNamespaceGenerator.java b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiNamespaceGenerator.java index c02fec1caf..45423b5710 100644 --- a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiNamespaceGenerator.java +++ b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiNamespaceGenerator.java @@ -46,7 +46,9 @@ public abstract class OpenapiNamespaceGenerator protected final Matcher jsonContentType = JSON_CONTENT_TYPE.matcher(""); protected final Map models = Map.of( "string", StringModelConfig.builder().build(), - "integer", Int32ModelConfig.builder().build() + "integer", Int32ModelConfig.builder().build(), + "integer:int32", Int32ModelConfig.builder().build(), + "integer:int64", Int32ModelConfig.builder().build() ); public abstract NamespaceConfig generate( diff --git a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiServerNamespaceGenerator.java b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiServerNamespaceGenerator.java index 5d04383cfc..3a456bb859 100644 --- a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiServerNamespaceGenerator.java +++ b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiServerNamespaceGenerator.java @@ -252,7 +252,10 @@ private HttpRequestConfigBuilder injectParams( { if (parameter.schema != null && parameter.schema.type != null) { - ModelConfig model = models.get(parameter.schema.type); + OpenapiSchema schema = parameter.schema; + String modelName = schema.format != null ? String.format("%s:%s", schema.type, schema.format) : + schema.type; + ModelConfig model = models.get(modelName); if (model != null) { switch (parameter.in) From fabb521478b7ae97a25698fdaccb152a613edbb4 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Wed, 10 Apr 2024 14:24:10 -0700 Subject: [PATCH 3/3] Fix typo --- .../openapi/internal/config/OpenapiNamespaceGenerator.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiNamespaceGenerator.java b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiNamespaceGenerator.java index 45423b5710..5b1b86b4e4 100644 --- a/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiNamespaceGenerator.java +++ b/incubator/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiNamespaceGenerator.java @@ -33,6 +33,7 @@ import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder; import io.aklivity.zilla.runtime.engine.config.TelemetryRefConfigBuilder; import io.aklivity.zilla.runtime.model.core.config.Int32ModelConfig; +import io.aklivity.zilla.runtime.model.core.config.Int64ModelConfig; import io.aklivity.zilla.runtime.model.core.config.StringModelConfig; public abstract class OpenapiNamespaceGenerator @@ -48,7 +49,7 @@ public abstract class OpenapiNamespaceGenerator "string", StringModelConfig.builder().build(), "integer", Int32ModelConfig.builder().build(), "integer:int32", Int32ModelConfig.builder().build(), - "integer:int64", Int32ModelConfig.builder().build() + "integer:int64", Int64ModelConfig.builder().build() ); public abstract NamespaceConfig generate(