Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public final class KafkaClientProduceFactory extends KafkaClientSaslHandshaker i

private final KafkaProduceClientFlusher flushRecord = this::flushRecord;
private final KafkaProduceClientFlusher flushRecordInit = this::flushRecordInit;
private final KafkaProduceClientFlusher frameProduceRecordContFin = this::flushRecordContFin;
private final KafkaProduceClientFlusher flushRecordContFin = this::flushRecordContFin;
private final KafkaProduceClientFlusher flushRecordIgnoreAll = this::flushRecordIgnoreAll;

private final KafkaProduceClientDecoder decodeSaslHandshakeResponse = this::decodeSaslHandshakeResponse;
Expand Down Expand Up @@ -546,10 +546,9 @@ private int flushRecordInit(
final KafkaAckMode ackMode = kafkaProduceDataEx.ackMode().get();
final KafkaKeyFW key = kafkaProduceDataEx.key();
final Array32FW<KafkaHeaderFW> headers = kafkaProduceDataEx.headers();
client.encodeableRecordBytesDeferred = kafkaProduceDataEx.deferred();
client.valueChecksum = kafkaProduceDataEx.crc32c();
final int deferred = kafkaProduceDataEx.deferred();
final int valueSize = payload != null ? payload.sizeof() : 0;
client.valueCompleteSize = valueSize + client.encodeableRecordBytesDeferred;
final int valueCompleteSize = valueSize + deferred;

final int maxEncodeableBytes = client.encodeSlotLimit + client.valueCompleteSize + produceRecordFramingSize;

Expand All @@ -562,6 +561,10 @@ private int flushRecordInit(
client.doEncodeRequestIfNecessary(traceId, budgetId);
}

client.valueChecksum = kafkaProduceDataEx.crc32c();
client.encodeableRecordBytesDeferred = deferred;
client.valueCompleteSize = valueCompleteSize;

if (client.producerId == RECORD_BATCH_PRODUCER_ID_NONE)
{
client.baseSequence = sequence;
Expand All @@ -574,8 +577,7 @@ private int flushRecordInit(
client.doEncodeRecordInit(traceId, timestamp, ackMode, key, payload, headers);
if (client.encodeSlot != NO_SLOT)
{
client.flusher = frameProduceRecordContFin;
client.flushFlags = FLAGS_INIT;
client.flusher = flushRecordContFin;
}
else
{
Expand Down Expand Up @@ -610,6 +612,7 @@ private int flushRecordContFin(
assert progress == limit;
client.flusher = flushRecord;
client.flushFlags = FLAGS_FIN;
client.encodeableRecordBytesDeferred = 0;
}

return progress;
Expand Down Expand Up @@ -1967,7 +1970,7 @@ private void doEncodeProduceRequest(

final ByteBuffer encodeSlotByteBuffer = encodePool.byteBuffer(encodeSlot);
final int encodeSlotBytePosition = encodeSlotByteBuffer.position();
final int partialValueSize = flushFlags != FLAGS_FIN ? encodeableRecordValueBytes : 0;
final int partialValueSize = encodeableRecordBytesDeferred > 0 ? encodeableRecordValueBytes : 0;
encodeSlotByteBuffer.limit(encodeSlotBytePosition + encodeSlotLimit - partialValueSize);
encodeSlotByteBuffer.position(encodeSlotBytePosition + encodeSlotOffset + crcLimit);

Expand All @@ -1976,7 +1979,7 @@ private void doEncodeProduceRequest(
crc.update(encodeSlotByteBuffer);

long checksum = crc.getValue();
if (flushFlags != FLAGS_FIN)
if (partialValueSize != 0)
{
checksum = computeChecksum(encodeBuffer, encodeLimit, encodeProgress, encodeSlotBuffer, checksum);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
#

property deltaMillis 0L
property newTimestamp ${kafka:timestamp() + deltaMillis}
property timestamp 1715191875046L
property newTimestamp ${timestamp + deltaMillis}

connect "zilla://streams/app0"
option zilla:window 8192
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ write 140 # size
83 # length
-1
[0x02]
0x4e8723aa
0x460c54e9
0s
0 # last offset delta
${newTimestamp} # first timestamp
Expand Down Expand Up @@ -148,7 +148,7 @@ write 140 # size
83 # length
-1
[0x02]
0x4e8723aa
0x26cd6cf

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the leading 0 for readability.

0s
0 # last offset delta
${newTimestamp} # first timestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@

property networkAcceptWindow 8192

property deltaMillis 0L
property newTimestamp ${kafka:timestamp() + deltaMillis}

accept "zilla://streams/net0"
option zilla:window ${networkAcceptWindow}
option zilla:transmission "duplex"
Expand Down Expand Up @@ -94,7 +91,7 @@ read 140
83 # length
-1
[0x02]
[0..4]
0x460c54e9
0s
0 # last offset delta
(long:timestamp) # first timestamp
Expand Down Expand Up @@ -144,7 +141,7 @@ read 140
83 # length
-1
[0x02]
[0..4]
0x26cd6cf

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the leading 0 for readability.

0s
0 # last offset delta
(long:timestamp) # first timestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ property networkConnectWindow 8192
property newRequestId ${kafka:newRequestId()}
property produceWaitMax 500

property deltaMillis 0L
property newTimestamp ${kafka:timestamp() + deltaMillis}

connect "zilla://streams/net0"
option zilla:window ${networkConnectWindow}
option zilla:transmission "duplex"
Expand Down