diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java index 385209cc6d..70c89311fc 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientProduceFactory.java @@ -159,7 +159,7 @@ public final class KafkaClientProduceFactory extends KafkaClientSaslHandshaker i private final KafkaProduceClientFlusher flushRecord = this::flushRecord; private final KafkaProduceClientFlusher flushRecordInit = this::flushRecordInit; - private final KafkaProduceClientFlusher frameProduceRecordContFin = this::flushRecordContFin; + private final KafkaProduceClientFlusher flushRecordContFin = this::flushRecordContFin; private final KafkaProduceClientFlusher flushRecordIgnoreAll = this::flushRecordIgnoreAll; private final KafkaProduceClientDecoder decodeSaslHandshakeResponse = this::decodeSaslHandshakeResponse; @@ -546,10 +546,9 @@ private int flushRecordInit( final KafkaAckMode ackMode = kafkaProduceDataEx.ackMode().get(); final KafkaKeyFW key = kafkaProduceDataEx.key(); final Array32FW headers = kafkaProduceDataEx.headers(); - client.encodeableRecordBytesDeferred = kafkaProduceDataEx.deferred(); - client.valueChecksum = kafkaProduceDataEx.crc32c(); + final int deferred = kafkaProduceDataEx.deferred(); final int valueSize = payload != null ? payload.sizeof() : 0; - client.valueCompleteSize = valueSize + client.encodeableRecordBytesDeferred; + final int valueCompleteSize = valueSize + deferred; final int maxEncodeableBytes = client.encodeSlotLimit + client.valueCompleteSize + produceRecordFramingSize; @@ -562,6 +561,10 @@ private int flushRecordInit( client.doEncodeRequestIfNecessary(traceId, budgetId); } + client.valueChecksum = kafkaProduceDataEx.crc32c(); + client.encodeableRecordBytesDeferred = deferred; + client.valueCompleteSize = valueCompleteSize; + if (client.producerId == RECORD_BATCH_PRODUCER_ID_NONE) { client.baseSequence = sequence; @@ -574,8 +577,7 @@ private int flushRecordInit( client.doEncodeRecordInit(traceId, timestamp, ackMode, key, payload, headers); if (client.encodeSlot != NO_SLOT) { - client.flusher = frameProduceRecordContFin; - client.flushFlags = FLAGS_INIT; + client.flusher = flushRecordContFin; } else { @@ -610,6 +612,7 @@ private int flushRecordContFin( assert progress == limit; client.flusher = flushRecord; client.flushFlags = FLAGS_FIN; + client.encodeableRecordBytesDeferred = 0; } return progress; @@ -1967,7 +1970,7 @@ private void doEncodeProduceRequest( final ByteBuffer encodeSlotByteBuffer = encodePool.byteBuffer(encodeSlot); final int encodeSlotBytePosition = encodeSlotByteBuffer.position(); - final int partialValueSize = flushFlags != FLAGS_FIN ? encodeableRecordValueBytes : 0; + final int partialValueSize = encodeableRecordBytesDeferred > 0 ? encodeableRecordValueBytes : 0; encodeSlotByteBuffer.limit(encodeSlotBytePosition + encodeSlotLimit - partialValueSize); encodeSlotByteBuffer.position(encodeSlotBytePosition + encodeSlotOffset + crcLimit); @@ -1976,7 +1979,7 @@ private void doEncodeProduceRequest( crc.update(encodeSlotByteBuffer); long checksum = crc.getValue(); - if (flushFlags != FLAGS_FIN) + if (encodeableRecordBytesDeferred > 0) { checksum = computeChecksum(encodeBuffer, encodeLimit, encodeProgress, encodeSlotBuffer, checksum); } diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.producer.id.changes/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.producer.id.changes/client.rpt index 4b980edead..b2cec8629d 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.producer.id.changes/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.values.producer.id.changes/client.rpt @@ -15,7 +15,8 @@ # property deltaMillis 0L -property newTimestamp ${kafka:timestamp() + deltaMillis} +property timestamp 1715191875046L +property newTimestamp ${timestamp + deltaMillis} connect "zilla://streams/app0" option zilla:window 8192 diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.changes/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.changes/client.rpt index f452e74075..acff5e148e 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.changes/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.changes/client.rpt @@ -19,6 +19,9 @@ property networkConnectWindow 8192 property newRequestId ${kafka:newRequestId()} property produceWaitMax 500 +property deltaMillis 0L +property newTimestamp ${kafka:timestamp() + deltaMillis} + connect "zilla://streams/net0" option zilla:window ${networkConnectWindow} option zilla:transmission "duplex" @@ -98,7 +101,7 @@ write 140 # size 83 # length -1 [0x02] - 0x4e8723aa + 0x0460c54e9 0s 0 # last offset delta ${newTimestamp} # first timestamp @@ -148,7 +151,7 @@ write 140 # size 83 # length -1 [0x02] - 0x4e8723aa + 0x026cd6cf 0s 0 # last offset delta ${newTimestamp} # first timestamp diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.changes/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.changes/server.rpt index f9741db993..f0bbfa17ef 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.changes/server.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id.changes/server.rpt @@ -16,9 +16,6 @@ property networkAcceptWindow 8192 -property deltaMillis 0L -property newTimestamp ${kafka:timestamp() + deltaMillis} - accept "zilla://streams/net0" option zilla:window ${networkAcceptWindow} option zilla:transmission "duplex" @@ -94,7 +91,7 @@ read 140 83 # length -1 [0x02] - [0..4] + 0x0460c54e9 0s 0 # last offset delta (long:timestamp) # first timestamp @@ -144,7 +141,7 @@ read 140 83 # length -1 [0x02] - [0..4] + 0x026cd6cf 0s 0 # last offset delta (long:timestamp) # first timestamp diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id/client.rpt index c0652f5969..7c13c115d4 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.values.producer.id/client.rpt @@ -19,6 +19,9 @@ property networkConnectWindow 8192 property newRequestId ${kafka:newRequestId()} property produceWaitMax 500 +property deltaMillis 0L +property newTimestamp ${kafka:timestamp() + deltaMillis} + connect "zilla://streams/net0" option zilla:window ${networkConnectWindow} option zilla:transmission "duplex"