From 1973db44a9bd530413ccc0982ecd25248ade1db2 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Wed, 8 May 2024 11:38:55 -0700 Subject: [PATCH 1/3] Generate correct crc32c value for the messages with different produceId --- .../stream/KafkaClientProduceFactory.java | 19 +++++++++++-------- .../client.rpt | 3 ++- .../client.rpt | 4 ++-- .../server.rpt | 7 ++----- .../message.values.producer.id/client.rpt | 3 +++ 5 files changed, 20 insertions(+), 16 deletions(-) diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java index 385209cc6d..9b01f8f2ee 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java @@ -159,7 +159,7 @@ public final class KafkaClientProduceFactory extends KafkaClientSaslHandshaker i private final KafkaProduceClientFlusher flushRecord = this::flushRecord; private final KafkaProduceClientFlusher flushRecordInit = this::flushRecordInit; - private final KafkaProduceClientFlusher frameProduceRecordContFin = this::flushRecordContFin; + private final KafkaProduceClientFlusher flushRecordContFin = this::flushRecordContFin; private final KafkaProduceClientFlusher flushRecordIgnoreAll = this::flushRecordIgnoreAll; private final KafkaProduceClientDecoder decodeSaslHandshakeResponse = this::decodeSaslHandshakeResponse; @@ -546,10 +546,9 @@ private int flushRecordInit( final KafkaAckMode ackMode = kafkaProduceDataEx.ackMode().get(); final KafkaKeyFW key = kafkaProduceDataEx.key(); final Array32FW headers = kafkaProduceDataEx.headers(); - client.encodeableRecordBytesDeferred = kafkaProduceDataEx.deferred(); - client.valueChecksum = kafkaProduceDataEx.crc32c(); + final int deferred = kafkaProduceDataEx.deferred(); final int valueSize = payload != null ? payload.sizeof() : 0; - client.valueCompleteSize = valueSize + client.encodeableRecordBytesDeferred; + final int valueCompleteSize = valueSize + deferred; final int maxEncodeableBytes = client.encodeSlotLimit + client.valueCompleteSize + produceRecordFramingSize; @@ -562,6 +561,10 @@ private int flushRecordInit( client.doEncodeRequestIfNecessary(traceId, budgetId); } + client.valueChecksum = kafkaProduceDataEx.crc32c(); + client.encodeableRecordBytesDeferred = deferred; + client.valueCompleteSize = valueCompleteSize; + if (client.producerId == RECORD_BATCH_PRODUCER_ID_NONE) { client.baseSequence = sequence; @@ -574,8 +577,7 @@ private int flushRecordInit( client.doEncodeRecordInit(traceId, timestamp, ackMode, key, payload, headers); if (client.encodeSlot != NO_SLOT) { - client.flusher = frameProduceRecordContFin; - client.flushFlags = FLAGS_INIT; + client.flusher = flushRecordContFin; } else { @@ -610,6 +612,7 @@ private int flushRecordContFin( assert progress == limit; client.flusher = flushRecord; client.flushFlags = FLAGS_FIN; + client.encodeableRecordBytesDeferred = 0; } return progress; @@ -1967,7 +1970,7 @@ private void doEncodeProduceRequest( final ByteBuffer encodeSlotByteBuffer = encodePool.byteBuffer(encodeSlot); final int encodeSlotBytePosition = encodeSlotByteBuffer.position(); - final int partialValueSize = flushFlags != FLAGS_FIN ? encodeableRecordValueBytes : 0; + final int partialValueSize = encodeableRecordBytesDeferred > 0 ? encodeableRecordValueBytes : 0; encodeSlotByteBuffer.limit(encodeSlotBytePosition + encodeSlotLimit - partialValueSize); encodeSlotByteBuffer.position(encodeSlotBytePosition + encodeSlotOffset + crcLimit); @@ -1976,7 +1979,7 @@ private void doEncodeProduceRequest( crc.update(encodeSlotByteBuffer); long checksum = crc.getValue(); - if (flushFlags != FLAGS_FIN) + if (partialValueSize != 0) { checksum = computeChecksum(encodeBuffer, encodeLimit, encodeProgress, encodeSlotBuffer, checksum); } diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.producer.id.changes/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.producer.id.changes/client.rpt index 4b980edead..b2cec8629d 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.producer.id.changes/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.producer.id.changes/client.rpt @@ -15,7 +15,8 @@ # property deltaMillis 0L -property newTimestamp ${kafka:timestamp() + deltaMillis} +property timestamp 1715191875046L +property newTimestamp ${timestamp + deltaMillis} connect "zilla://streams/app0" option zilla:window 8192 diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.changes/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.changes/client.rpt index f452e74075..7fefefedd0 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.changes/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.changes/client.rpt @@ -98,7 +98,7 @@ write 140 # size 83 # length -1 [0x02] - 0x4e8723aa + 0x460c54e9 0s 0 # last offset delta ${newTimestamp} # first timestamp @@ -148,7 +148,7 @@ write 140 # size 83 # length -1 [0x02] - 0x4e8723aa + 0x26cd6cf 0s 0 # last offset delta ${newTimestamp} # first timestamp diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.changes/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.changes/server.rpt index f9741db993..826ad6242d 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.changes/server.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.changes/server.rpt @@ -16,9 +16,6 @@ property networkAcceptWindow 8192 -property deltaMillis 0L -property newTimestamp ${kafka:timestamp() + deltaMillis} - accept "zilla://streams/net0" option zilla:window ${networkAcceptWindow} option zilla:transmission "duplex" @@ -94,7 +91,7 @@ read 140 83 # length -1 [0x02] - [0..4] + 0x460c54e9 0s 0 # last offset delta (long:timestamp) # first timestamp @@ -144,7 +141,7 @@ read 140 83 # length -1 [0x02] - [0..4] + 0x26cd6cf 0s 0 # last offset delta (long:timestamp) # first timestamp diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id/client.rpt index c0652f5969..7c13c115d4 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id/client.rpt @@ -19,6 +19,9 @@ property networkConnectWindow 8192 property newRequestId ${kafka:newRequestId()} property produceWaitMax 500 +property deltaMillis 0L +property newTimestamp ${kafka:timestamp() + deltaMillis} + connect "zilla://streams/net0" option zilla:window ${networkConnectWindow} option zilla:transmission "duplex" From 3e82d226aa4328b1ebc17526a9da6308df9d1900 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Wed, 8 May 2024 12:05:22 -0700 Subject: [PATCH 2/3] Fix script --- .../message.values.producer.id.changes/client.rpt | 7 +++++-- .../message.values.producer.id.changes/server.rpt | 4 ++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.changes/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.changes/client.rpt index 7fefefedd0..acff5e148e 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.changes/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.changes/client.rpt @@ -19,6 +19,9 @@ property networkConnectWindow 8192 property newRequestId ${kafka:newRequestId()} property produceWaitMax 500 +property deltaMillis 0L +property newTimestamp ${kafka:timestamp() + deltaMillis} + connect "zilla://streams/net0" option zilla:window ${networkConnectWindow} option zilla:transmission "duplex" @@ -98,7 +101,7 @@ write 140 # size 83 # length -1 [0x02] - 0x460c54e9 + 0x0460c54e9 0s 0 # last offset delta ${newTimestamp} # first timestamp @@ -148,7 +151,7 @@ write 140 # size 83 # length -1 [0x02] - 0x26cd6cf + 0x026cd6cf 0s 0 # last offset delta ${newTimestamp} # first timestamp diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.changes/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.changes/server.rpt index 826ad6242d..f0bbfa17ef 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.changes/server.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.changes/server.rpt @@ -91,7 +91,7 @@ read 140 83 # length -1 [0x02] - 0x460c54e9 + 0x0460c54e9 0s 0 # last offset delta (long:timestamp) # first timestamp @@ -141,7 +141,7 @@ read 140 83 # length -1 [0x02] - 0x26cd6cf + 0x026cd6cf 0s 0 # last offset delta (long:timestamp) # first timestamp From aa8c13b6c17f27e35ebdd61f85a6667284912702 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Wed, 8 May 2024 12:11:29 -0700 Subject: [PATCH 3/3] FIx checksum condition --- .../kafka/internal/stream/KafkaClientProduceFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java index 9b01f8f2ee..70c89311fc 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java @@ -1979,7 +1979,7 @@ private void doEncodeProduceRequest( crc.update(encodeSlotByteBuffer); long checksum = crc.getValue(); - if (partialValueSize != 0) + if (encodeableRecordBytesDeferred > 0) { checksum = computeChecksum(encodeBuffer, encodeLimit, encodeProgress, encodeSlotBuffer, checksum); }