Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
135 commits
Select commit Hold shift + click to select a range
2cef977
Adjust padding to accommodate good enough headers and don't include …
akrambek Oct 25, 2023
d201582
Merge branch 'develop' into feature/consumer-group-cont
akrambek Oct 25, 2023
76bf9de
Merge branch 'feature/consumer-group-cont' into develop
akrambek Oct 26, 2023
29ae79c
Merge branch 'aklivity:develop' into develop
akrambek Oct 30, 2023
ec1b39e
Merge branch 'aklivity:develop' into develop
akrambek Oct 30, 2023
51a9f0e
Merge branch 'aklivity:develop' into develop
akrambek Oct 31, 2023
4394783
Merge branch 'aklivity:develop' into develop
akrambek Oct 31, 2023
e8696ce
Merge branch 'aklivity:develop' into develop
akrambek Nov 2, 2023
51c37b1
Merge branch 'aklivity:develop' into develop
akrambek Nov 2, 2023
5da5f04
Merge branch 'aklivity:develop' into develop
akrambek Nov 2, 2023
db1e17c
Merge branch 'aklivity:develop' into develop
akrambek Nov 4, 2023
40f73dc
Merge branch 'aklivity:develop' into develop
akrambek Nov 6, 2023
d1a0492
Merge branch 'aklivity:develop' into develop
akrambek Nov 23, 2023
45799ce
Merge branch 'aklivity:develop' into develop
akrambek Nov 29, 2023
1e55162
Merge branch 'aklivity:develop' into develop
akrambek Nov 30, 2023
fedc41f
Merge branch 'aklivity:develop' into develop
akrambek Dec 4, 2023
18a8d74
Merge branch 'aklivity:develop' into develop
akrambek Dec 4, 2023
f160aad
Merge branch 'aklivity:develop' into develop
akrambek Dec 4, 2023
e0e7d5a
Merge branch 'aklivity:develop' into develop
akrambek Dec 6, 2023
9f4a8a6
Merge branch 'aklivity:develop' into develop
akrambek Dec 8, 2023
456f111
Merge branch 'aklivity:develop' into develop
akrambek Dec 8, 2023
0d27262
Merge branch 'aklivity:develop' into develop
akrambek Dec 9, 2023
9fe7a91
Merge branch 'aklivity:develop' into develop
akrambek Dec 11, 2023
7e3d237
Merge branch 'aklivity:develop' into develop
akrambek Dec 12, 2023
33c4411
Merge branch 'aklivity:develop' into develop
akrambek Dec 13, 2023
fe9e318
Merge branch 'aklivity:develop' into develop
akrambek Dec 14, 2023
d8b5e5c
Merge branch 'aklivity:develop' into develop
akrambek Dec 14, 2023
ebca7ef
Merge branch 'aklivity:develop' into develop
akrambek Dec 18, 2023
5e3e059
Merge branch 'aklivity:develop' into develop
akrambek Dec 22, 2023
ee71db9
Merge branch 'aklivity:develop' into develop
akrambek Dec 24, 2023
0b7a15a
Merge branch 'aklivity:develop' into develop
akrambek Dec 25, 2023
be13489
Merge branch 'aklivity:develop' into develop
akrambek Dec 26, 2023
95df84c
Merge branch 'aklivity:develop' into develop
akrambek Dec 26, 2023
3ebdbf5
Merge branch 'aklivity:develop' into develop
akrambek Dec 28, 2023
24ad9e1
Merge branch 'aklivity:develop' into develop
akrambek Dec 30, 2023
6d21fec
Merge branch 'aklivity:develop' into develop
akrambek Dec 31, 2023
368a0a6
Merge branch 'aklivity:develop' into develop
akrambek Dec 31, 2023
7069f1a
Merge branch 'aklivity:develop' into develop
akrambek Jan 2, 2024
09b7041
Merge branch 'aklivity:develop' into develop
akrambek Jan 3, 2024
98f1faa
Merge branch 'aklivity:develop' into develop
akrambek Jan 4, 2024
371391a
Merge branch 'aklivity:develop' into develop
akrambek Jan 5, 2024
c6a0882
Merge branch 'aklivity:develop' into develop
akrambek Jan 8, 2024
f99f009
Merge branch 'aklivity:develop' into develop
akrambek Jan 9, 2024
a110b68
Merge branch 'aklivity:develop' into develop
akrambek Jan 11, 2024
80c4625
Merge branch 'aklivity:develop' into develop
akrambek Jan 16, 2024
6617e20
Merge branch 'aklivity:develop' into develop
akrambek Jan 19, 2024
dea9f53
Merge branch 'aklivity:develop' into develop
akrambek Jan 20, 2024
b74db57
Merge branch 'aklivity:develop' into develop
akrambek Jan 23, 2024
4617b54
Merge branch 'aklivity:develop' into develop
akrambek Jan 30, 2024
b3b421d
Merge branch 'aklivity:develop' into develop
akrambek Jan 31, 2024
73d64b1
Merge branch 'aklivity:develop' into develop
akrambek Feb 1, 2024
7bb546e
Merge branch 'aklivity:develop' into develop
akrambek Feb 2, 2024
b1c7901
Merge branch 'aklivity:develop' into develop
akrambek Feb 8, 2024
949df2f
Merge branch 'aklivity:develop' into develop
akrambek Feb 13, 2024
ca946b8
Merge branch 'aklivity:develop' into develop
akrambek Feb 14, 2024
f9dcd75
Merge branch 'aklivity:develop' into develop
akrambek Feb 21, 2024
e1e5e75
Merge branch 'aklivity:develop' into develop
akrambek Feb 22, 2024
5f50549
Merge branch 'aklivity:develop' into develop
akrambek Feb 23, 2024
32725be
Merge branch 'aklivity:develop' into develop
akrambek Feb 28, 2024
83b145a
Merge branch 'aklivity:develop' into develop
akrambek Feb 28, 2024
75e7709
Merge branch 'aklivity:develop' into develop
akrambek Feb 29, 2024
d4c3117
Merge branch 'aklivity:develop' into develop
akrambek Feb 29, 2024
a438cfe
Merge branch 'aklivity:develop' into develop
akrambek Mar 1, 2024
36f8f1e
Merge branch 'aklivity:develop' into develop
akrambek Mar 4, 2024
cf8a5c7
Merge branch 'aklivity:develop' into develop
akrambek Mar 10, 2024
7b46270
Merge branch 'aklivity:develop' into develop
akrambek Mar 12, 2024
6461ebf
Merge branch 'aklivity:develop' into develop
akrambek Mar 13, 2024
a97cfb3
Merge branch 'aklivity:develop' into develop
akrambek Mar 14, 2024
fc97dc2
Merge branch 'aklivity:develop' into develop
akrambek Mar 16, 2024
a5edb3f
Merge branch 'aklivity:develop' into develop
akrambek Mar 17, 2024
7a79fd6
Merge branch 'aklivity:develop' into develop
akrambek Mar 19, 2024
82c24c3
Merge branch 'aklivity:develop' into develop
akrambek Mar 19, 2024
1f3f305
Merge branch 'aklivity:develop' into develop
akrambek Mar 19, 2024
3a12b8b
Merge branch 'aklivity:develop' into develop
akrambek Mar 25, 2024
7684d2a
Merge branch 'aklivity:develop' into develop
akrambek Apr 9, 2024
d40e2cd
Merge branch 'aklivity:develop' into develop
akrambek Apr 9, 2024
fcd6cd2
Merge branch 'aklivity:develop' into develop
akrambek Apr 9, 2024
141a871
Merge branch 'aklivity:develop' into develop
akrambek Apr 9, 2024
782b711
Merge branch 'aklivity:develop' into develop
akrambek Apr 10, 2024
0c93c5f
Merge branch 'aklivity:develop' into develop
akrambek Apr 10, 2024
70f2543
Merge branch 'aklivity:develop' into develop
akrambek Apr 11, 2024
c451d70
Merge branch 'aklivity:develop' into develop
akrambek Apr 12, 2024
45f6a83
Merge branch 'aklivity:develop' into develop
akrambek Apr 12, 2024
32ba7e9
Merge branch 'aklivity:develop' into develop
akrambek Apr 16, 2024
95ff994
Merge branch 'aklivity:develop' into develop
akrambek Apr 22, 2024
49787e7
Merge branch 'aklivity:develop' into develop
akrambek May 1, 2024
bb3b121
Merge branch 'aklivity:develop' into develop
akrambek May 1, 2024
3d6cf14
Merge branch 'aklivity:develop' into develop
akrambek May 1, 2024
9bc44b5
Merge branch 'aklivity:develop' into develop
akrambek May 2, 2024
4e82b78
Merge branch 'aklivity:develop' into develop
akrambek May 8, 2024
dd0ae1b
Merge branch 'aklivity:develop' into develop
akrambek May 10, 2024
688c928
Merge branch 'aklivity:develop' into develop
akrambek May 15, 2024
027a6dc
Merge branch 'aklivity:develop' into develop
akrambek May 16, 2024
7fb8aea
Merge branch 'aklivity:develop' into develop
akrambek May 20, 2024
57b40a8
filtering by structured value field(s)
ankitk-me Jun 10, 2024
e340fb1
model-json extract support
ankitk-me Jun 11, 2024
0ccc615
Test Converter update to support extracted header for ITs
ankitk-me Jun 12, 2024
fa1916e
incorrect index fix
ankitk-me Jun 12, 2024
6b3d555
incorrect index fix for number
ankitk-me Jun 12, 2024
0d15ed9
Merge remote-tracking branch 'origin' into payloadFilter
ankitk-me Jun 12, 2024
bff1a15
Merge branch 'aklivity:develop' into develop
akrambek Jun 12, 2024
11f8b1e
Merge branch 'aklivity:develop' into develop
akrambek Jun 12, 2024
94c5656
Merge remote-tracking branch 'origin' into payloadFilter
ankitk-me Jun 12, 2024
e66f27c
addressing review comments
ankitk-me Jun 14, 2024
d421999
update to reuse matcher object
ankitk-me Jun 14, 2024
6832657
Merge branch 'aklivity:develop' into develop
akrambek Jun 19, 2024
6ffc602
support for Avro model and header format change
ankitk-me Jun 19, 2024
864ea63
support for Protobuf model
ankitk-me Jun 21, 2024
0caa9f5
WIP
akrambek Jun 24, 2024
f35a45a
Merge branch 'aklivity:develop' into develop
akrambek Jun 24, 2024
ff652e5
Merge branch 'develop' into feature/asyncapi-topics
akrambek Jun 24, 2024
2ebdc1c
WIP
akrambek Jun 27, 2024
7de1be5
Support other format data types in catalog
akrambek Jun 28, 2024
8fa527d
Remove comment
akrambek Jun 28, 2024
86ea213
Support kafka key validation
akrambek Jun 28, 2024
1f13ca9
WIP
akrambek Jun 28, 2024
ec2f8a9
Merge branch 'aklivity:develop' into develop
akrambek Jun 29, 2024
b8fd378
Merge branch 'develop' into feature/key-validation
akrambek Jun 29, 2024
bc7af09
WIP
akrambek Jun 29, 2024
769fa52
WIP
akrambek Jun 29, 2024
8f874d1
Merge branch 'aklivity:develop' into develop
akrambek Jun 29, 2024
8a84564
Merge branch 'develop' into feature/key-validation
akrambek Jun 29, 2024
2141e7e
Merge branch 'feature/key-validation' into feature/asyncapi-schema-re…
akrambek Jun 29, 2024
eef2dd6
Fix conflict
akrambek Jun 29, 2024
e866c11
WIP
akrambek Jun 30, 2024
21e57ff
WIP
akrambek Jun 30, 2024
fe30d17
WIP
akrambek Jun 30, 2024
85678f4
WIP
akrambek Jun 30, 2024
5cbdb14
WIP
akrambek Jul 1, 2024
7f2842c
Merge branch 'aklivity:develop' into develop
akrambek Jul 1, 2024
f73451d
Merge branch 'develop' into feature/asyncapi-schema-registry
akrambek Jul 1, 2024
88bc237
Apply feedback from PR
akrambek Jul 2, 2024
67ee5f0
Check if the binding exists
akrambek Jul 4, 2024
d9c8edb
Merge branch 'aklivity:develop' into develop
akrambek Jul 4, 2024
e2174c8
Merge branch 'develop' into feature/asyncapi-schema-registry
akrambek Jul 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Adjust padding to accommodate good enough headers and don't include p…
…artial data frame while computing crc32c value
  • Loading branch information
akrambek committed Oct 25, 2023
commit 2cef977324c9b78508de72b0eb49c6eff88346c0
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public final class KafkaClientProduceFactory extends KafkaClientSaslHandshaker i
{
private static final int PRODUCE_REQUEST_RECORDS_OFFSET_MAX = 512;

private static final int KAFKA_RECORD_FRAMING = 100; // TODO
private static final int KAFKA_RECORD_FRAMING = 512; // TODO

private static final int FLAGS_CON = 0x00;
private static final int FLAGS_FIN = 0x01;
Expand Down Expand Up @@ -539,6 +539,7 @@ private int flushRecordInit(
final int valueSize = payload != null ? payload.sizeof() : 0;
client.valueCompleteSize = valueSize + client.encodeableRecordBytesDeferred;


final int maxEncodeableBytes = client.encodeSlotLimit + client.valueCompleteSize + KAFKA_RECORD_FRAMING;
if (client.encodeSlot != NO_SLOT &&
maxEncodeableBytes > encodePool.slotCapacity())
Expand Down Expand Up @@ -1191,6 +1192,7 @@ private final class KafkaProduceClient extends KafkaSaslClient
private int encodeableRecordCount;
private int encodeableRecordBytes;
private int encodeableRecordBytesDeferred;
private int encodeableRecordValueBytes;
private int flushableRequestBytes;

private int decodeSlot = NO_SLOT;
Expand Down Expand Up @@ -1652,6 +1654,7 @@ private void doEncodeRecordInit(

encodeSlotBuffer.putBytes(encodeSlotLimit, encodeBuffer, 0, encodeProgress);
encodeSlotLimit += encodeProgress;
encodeableRecordValueBytes = 0;

if (headersCount > 0)
{
Expand Down Expand Up @@ -1689,6 +1692,7 @@ private void doEncodeRecordCont(

encodeSlotBuffer.putBytes(encodeSlotLimit, value.buffer(), value.offset(), length);
encodeSlotLimit += length;
encodeableRecordValueBytes += length;

if ((flags & FLAGS_FIN) == 0)
{
Expand Down Expand Up @@ -1893,7 +1897,8 @@ private void doEncodeProduceRequest(

final ByteBuffer encodeSlotByteBuffer = encodePool.byteBuffer(encodeSlot);
final int encodeSlotBytePosition = encodeSlotByteBuffer.position();
encodeSlotByteBuffer.limit(encodeSlotBytePosition + encodeSlotLimit);
final int partialValueSize = flushFlags != FLAGS_FIN ? encodeableRecordValueBytes : 0;
encodeSlotByteBuffer.limit(encodeSlotBytePosition + encodeSlotLimit - partialValueSize);
encodeSlotByteBuffer.position(encodeSlotBytePosition + encodeSlotOffset + crcLimit);

final CRC32C crc = crc32c;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class ClientMergedIT
.countersBufferCapacity(8192)
.configure(ENGINE_BUFFER_SLOT_CAPACITY, 8192)
.configure(KAFKA_CLIENT_META_MAX_AGE_MILLIS, 1000)
.configure(KAFKA_CLIENT_PRODUCE_MAX_BYTES, 116)
.configure(KAFKA_CLIENT_PRODUCE_MAX_BYTES, 528)
.configurationRoot("io/aklivity/zilla/specs/binding/kafka/config")
.external("net0")
.clean();
Expand Down Expand Up @@ -234,7 +234,7 @@ public void shouldProduceMergedMessageValues() throws Exception
@Configure(
name = "zilla.binding.kafka.client.produce.max.bytes",
value = "200000")
@ScriptProperty("padding ${512 + 100}")
@ScriptProperty("padding ${512 + 512}")
public void shouldProduceMergedMessageValue10k() throws Exception
{
k3po.finish();
Expand All @@ -248,7 +248,7 @@ public void shouldProduceMergedMessageValue10k() throws Exception
@Configure(
name = "zilla.binding.kafka.client.produce.max.bytes",
value = "200000")
@ScriptProperty("padding ${512 + 100}")
@ScriptProperty("padding ${512 + 512}")
public void shouldProduceMergedMessageValue100k() throws Exception
{
k3po.finish();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ read zilla:begin.ext ${kafka:beginEx()
write zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
.produce()
.deferred(102400 - 8192 + 512 + 100)
.deferred(102400 - 8192 + 512 + 512)
.timestamp(newTimestamp)
.build()
.build()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ property serverAddress "zilla://streams/app0"

accept ${serverAddress}
option zilla:window 8192
option zilla:padding 612
option zilla:padding 1024
option zilla:transmission "half-duplex"

accepted
Expand Down Expand Up @@ -71,7 +71,7 @@ write zilla:begin.ext ${kafka:beginEx()
read zilla:data.ext ${kafka:matchDataEx()
.typeId(zilla:id("kafka"))
.produce()
.deferred(102400 - 8192 + 512 + 100)
.deferred(102400 - 8192 + 512 + 512)
.build()
.build()}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ read zilla:begin.ext ${kafka:beginEx()
write zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
.produce()
.deferred(10240 - 8192 + 512 + 100)
.deferred(10240 - 8192 + 512 + 512)
.timestamp(newTimestamp)
.build()
.build()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ property serverAddress "zilla://streams/app0"

accept ${serverAddress}
option zilla:window 8192
option zilla:padding 612
option zilla:padding 1024
option zilla:transmission "half-duplex"

accepted
Expand Down Expand Up @@ -71,7 +71,7 @@ write zilla:begin.ext ${kafka:beginEx()
read zilla:data.ext ${kafka:matchDataEx()
.typeId(zilla:id("kafka"))
.produce()
.deferred(10240 - 8192 + 512 + 100)
.deferred(10240 - 8192 + 512 + 512)
.build()
.build()}
read zilla:data.ext ${kafka:matchDataEx()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ write zilla:data.ext ${kafka:dataEx()
.produce()
.build()
.build()}
write ${kafka:randomBytes(7580)}
write ${kafka:randomBytes(8192-(512+512))}
write flush

write zilla:data.ext ${kafka:dataEx()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ read zilla:data.ext ${kafka:matchDataEx()
.produce()
.build()
.build()}
read [0..7580]
read [0..7168]

read zilla:data.ext ${kafka:matchDataEx()
.typeId(zilla:id("kafka"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ write zilla:begin.ext ${proxy:beginEx()

connected

write 7690
write 7278
0s
3s
${newRequestId}
Expand All @@ -90,9 +90,9 @@ write 7690
4s "test"
1
0
7650 # record set size
7238 # record set size
0L # first offset
7638 # length
7226 # length
-1
[0x02]
0x4e8723aa
Expand All @@ -104,13 +104,13 @@ write 7690
-1s
-1
1 # records
${kafka:varint(7587)}
${kafka:varint(7175)}
[0x00]
${kafka:varint(0)}
${kafka:varint(0)}
${kafka:varint(-1)} # key
${kafka:varint(7580)} # value
${kafka:randomBytes(7580)}
${kafka:varint(7168)} # value
${kafka:randomBytes(7168)}
${kafka:varint(0)} # headers

read 44
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ read zilla:begin.ext ${proxy:beginEx()

connected

read 7690
read 7278
0s
3s
(int:requestId)
Expand All @@ -86,9 +86,9 @@ read 7690
4s "test"
1
0
7650 # record set size
7238 # record set size
0L # first offset
7638 # length
7226 # length
-1
[0x02]
[0..4]
Expand All @@ -100,13 +100,13 @@ read 7690
-1s
-1
1 # records
${kafka:varint(7587)}
${kafka:varint(7175)}
[0x00]
${kafka:varint(0)}
${kafka:varint(0)}
${kafka:varint(-1)} # key
${kafka:varint(7580)} # value
[0..7580]
${kafka:varint(7168)} # value
[0..7168]
${kafka:varint(0)} # headers

write 44
Expand Down