Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
2cef977
Adjust padding to accommodate good enough headers and don't include …
akrambek Oct 25, 2023
d201582
Merge branch 'develop' into feature/consumer-group-cont
akrambek Oct 25, 2023
76bf9de
Merge branch 'feature/consumer-group-cont' into develop
akrambek Oct 26, 2023
29ae79c
Merge branch 'aklivity:develop' into develop
akrambek Oct 30, 2023
ec1b39e
Merge branch 'aklivity:develop' into develop
akrambek Oct 30, 2023
51a9f0e
Merge branch 'aklivity:develop' into develop
akrambek Oct 31, 2023
4394783
Merge branch 'aklivity:develop' into develop
akrambek Oct 31, 2023
e8696ce
Merge branch 'aklivity:develop' into develop
akrambek Nov 2, 2023
51c37b1
Merge branch 'aklivity:develop' into develop
akrambek Nov 2, 2023
5da5f04
Merge branch 'aklivity:develop' into develop
akrambek Nov 2, 2023
db1e17c
Merge branch 'aklivity:develop' into develop
akrambek Nov 4, 2023
40f73dc
Merge branch 'aklivity:develop' into develop
akrambek Nov 6, 2023
d1a0492
Merge branch 'aklivity:develop' into develop
akrambek Nov 23, 2023
45799ce
Merge branch 'aklivity:develop' into develop
akrambek Nov 29, 2023
b6f7884
WIP
akrambek Nov 29, 2023
3660eec
Fix offset commit
akrambek Nov 29, 2023
b26ea7f
Merge branch 'bug/commit' into feature/kafka-grpc-group
akrambek Nov 29, 2023
d98fb23
Adjust the scripts
akrambek Nov 29, 2023
cf2cbb9
Refactor
akrambek Nov 30, 2023
ef40877
Merge branch 'bug/commit' into feature/kafka-grpc-group
akrambek Nov 30, 2023
a613924
WIP
akrambek Nov 30, 2023
1e55162
Merge branch 'aklivity:develop' into develop
akrambek Nov 30, 2023
946f01d
WIP
akrambek Nov 30, 2023
45dc9e8
Merge branch 'develop' into feature/kafka-grpc-group
akrambek Nov 30, 2023
211d7c7
WIP
akrambek Nov 30, 2023
7281108
Fix typo
akrambek Nov 30, 2023
2a07377
WIP
akrambek Dec 2, 2023
fedc41f
Merge branch 'aklivity:develop' into develop
akrambek Dec 4, 2023
18a8d74
Merge branch 'aklivity:develop' into develop
akrambek Dec 4, 2023
f160aad
Merge branch 'aklivity:develop' into develop
akrambek Dec 4, 2023
e0e7d5a
Merge branch 'aklivity:develop' into develop
akrambek Dec 6, 2023
9f4a8a6
Merge branch 'aklivity:develop' into develop
akrambek Dec 8, 2023
c0d096f
Merge branch 'develop' into feature/kafka-grpc-group
akrambek Dec 8, 2023
512bedc
Apply feedback from PR
akrambek Dec 8, 2023
456f111
Merge branch 'aklivity:develop' into develop
akrambek Dec 8, 2023
d466096
Merge branch 'develop' into feature/kafka-grpc-group
akrambek Dec 8, 2023
8c2be5c
Apply feedback from PR
akrambek Dec 9, 2023
0d27262
Merge branch 'aklivity:develop' into develop
akrambek Dec 9, 2023
4436e2d
Merge branch 'develop' into feature/kafka-grpc-group
akrambek Dec 9, 2023
0457c75
Apply feedback from PR
akrambek Dec 9, 2023
1c0a33c
Fix formatting
akrambek Dec 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1294,10 +1294,10 @@ private void onConsumerInitialFlush(
kafkaFlushExRO.tryWrap(extension.buffer(), extension.offset(), extension.limit()) : null;

KafkaConsumerFlushExFW consumerFlushEx = kafkaFlushEx.consumer();
final KafkaOffsetFW partition = consumerFlushEx.partition();
final KafkaOffsetFW progress = consumerFlushEx.progress();
final int leaderEpoch = consumerFlushEx.leaderEpoch();

offsetCommit.onOffsetCommitRequest(traceId, authorization, partition, leaderEpoch);
offsetCommit.onOffsetCommitRequest(traceId, authorization, progress, leaderEpoch);
}

private void onConsumerInitialAbort(
Expand Down Expand Up @@ -1747,7 +1747,7 @@ private void onOffsetCommitResponse(
ex -> ex.set((b, o, l) -> kafkaFlushExRW.wrap(b, o, l)
.typeId(kafkaTypeId)
.consumer(c -> c
.partition(p -> p
.progress(p -> p
.partitionId(commit.partitionId)
.partitionOffset(commit.partitionOffset)
.metadata(commit.metadata)
Expand All @@ -1772,7 +1772,7 @@ private void doOffsetCommit(
.set((b, o, l) -> kafkaDataExRW.wrap(b, o, l)
.typeId(kafkaTypeId)
.offsetCommit(oc -> oc
.partition(p -> p.partitionId(commit.partitionId)
.progress(p -> p.partitionId(commit.partitionId)
.partitionOffset(commit.partitionOffset)
.metadata(commit.metadata))
.generationId(delegate.fanout.generationId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -731,12 +731,12 @@ private void onApplicationData(
kafkaDataExRO.tryWrap(extension.buffer(), extension.offset(), extension.limit()) : null;

final KafkaOffsetCommitDataExFW commitDataExFW = kafkaDataEx.offsetCommit();
final KafkaOffsetFW partition = commitDataExFW.partition();
final KafkaOffsetFW progress = commitDataExFW.progress();
final int generationId = commitDataExFW.generationId();
final int leaderEpoch = commitDataExFW.leaderEpoch();

client.onOffsetCommit(traceId, partition.partitionId(), partition.partitionOffset(),
generationId, leaderEpoch, partition.metadata().asString());
client.onOffsetCommit(traceId, progress.partitionId(), progress.partitionOffset(),
generationId, leaderEpoch, progress.metadata().asString());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1775,7 +1775,7 @@ private void doMergedConsumerReplyFlush(
{
final KafkaFlushExFW kafkaFlushExFW = kafkaFlushExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(kafkaTypeId)
.merged(mc -> mc.consumer(c -> c.partition(partition)))
.merged(mc -> mc.consumer(c -> c.progress(partition)))
.build();

doFlush(sender, originId, routedId, replyId, replySeq, replyAck, replyMax,
Expand Down Expand Up @@ -2100,7 +2100,7 @@ private long nextFetchPartitionOffset(
if (!offsetsByPartitionId.isEmpty())
{
KafkaPartitionOffset kafkaPartitionOffset = offsetsByPartitionId.get(partitionId);
partitionOffset = kafkaPartitionOffset.partitionOffset + 1;
partitionOffset = kafkaPartitionOffset.partitionOffset;
}
else
{
Expand Down Expand Up @@ -2807,15 +2807,15 @@ private void doConsumerInitialFlush(
{
if (!KafkaState.initialClosed(state))
{
final KafkaOffsetFW offsetAck = consumer.partition();
final KafkaOffsetFW offsetAck = consumer.progress();
final KafkaPartitionOffset partitionOffset = merged.offsetsByPartitionId.get(offsetAck.partitionId());

final KafkaFlushExFW kafkaFlushExFW = kafkaFlushExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(kafkaTypeId)
.consumer(c -> c
.partition(p -> p
.progress(p -> p
.partitionId(offsetAck.partitionId())
.partitionOffset(offsetAck.partitionOffset() + 1)
.partitionOffset(offsetAck.partitionOffset())
.metadata(offsetAck.metadata()))
.leaderEpoch(partitionOffset.leaderEpoch))
.build();
Expand Down Expand Up @@ -2934,9 +2934,9 @@ private void onConsumerFanReplyFlush(
kafkaFlushExRO.tryWrap(extension.buffer(), extension.offset(), extension.limit()) : null;

KafkaConsumerFlushExFW consumerFlushEx = kafkaFlushEx.consumer();
final KafkaOffsetFW partition = consumerFlushEx.partition();
final KafkaOffsetFW progress = consumerFlushEx.progress();

merged.doMergedConsumerReplyFlush(traceId, partition);
merged.doMergedConsumerReplyFlush(traceId, progress);
}

private void onConsumerReplyData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2482,12 +2482,12 @@ private KafkaOffsetCommitDataExBuilder()
offsetCommitDataExRW.wrap(writeBuffer, KafkaDataExFW.FIELD_OFFSET_OFFSET_COMMIT, writeBuffer.capacity());
}

public KafkaOffsetCommitDataExBuilder partition(
public KafkaOffsetCommitDataExBuilder progress(
int partitionId,
long partitionOffset,
String metadata)
{
offsetCommitDataExRW.partition(p -> p
offsetCommitDataExRW.progress(p -> p
.partitionId(partitionId)
.partitionOffset(partitionOffset)
.metadata(metadata));
Expand Down Expand Up @@ -2737,30 +2737,30 @@ private KafkaMergedConsumerFlushExBuilder()
writeBuffer.capacity());
}

public KafkaMergedConsumerFlushExBuilder partition(
public KafkaMergedConsumerFlushExBuilder progress(
int partitionId,
long partitionOffset)
{
partition(partitionId, partitionOffset, DEFAULT_LATEST_OFFSET, null);
progress(partitionId, partitionOffset, DEFAULT_LATEST_OFFSET, null);
return this;
}

public KafkaMergedConsumerFlushExBuilder partition(
public KafkaMergedConsumerFlushExBuilder progress(
int partitionId,
long partitionOffset,
String metadata)
{
partition(partitionId, partitionOffset, DEFAULT_LATEST_OFFSET, metadata);
progress(partitionId, partitionOffset, DEFAULT_LATEST_OFFSET, metadata);
return this;
}

public KafkaMergedConsumerFlushExBuilder partition(
public KafkaMergedConsumerFlushExBuilder progress(
int partitionId,
long partitionOffset,
long latestOffset,
String metadata)
{
mergedConsumerFlushExRW.partition(p -> p
mergedConsumerFlushExRW.progress(p -> p
.partitionId(partitionId)
.partitionOffset(partitionOffset)
.latestOffset(latestOffset)
Expand Down Expand Up @@ -2972,22 +2972,22 @@ private KafkaConsumerFlushExBuilder()
flushConsumerExRW.wrap(writeBuffer, KafkaFlushExFW.FIELD_OFFSET_CONSUMER, writeBuffer.capacity());
}

public KafkaConsumerFlushExBuilder partition(
public KafkaConsumerFlushExBuilder progress(
int partitionId,
long partitionOffset)
{
flushConsumerExRW.partition(p -> p
flushConsumerExRW.progress(p -> p
.partitionId(partitionId)
.partitionOffset(partitionOffset));
return this;
}

public KafkaConsumerFlushExBuilder partition(
public KafkaConsumerFlushExBuilder progress(
int partitionId,
long partitionOffset,
String metadata)
{
flushConsumerExRW.partition(p -> p
flushConsumerExRW.progress(p -> p
.partitionId(partitionId)
.partitionOffset(partitionOffset)
.metadata(metadata));
Expand Down Expand Up @@ -4649,7 +4649,7 @@ private KafkaMergedConsumerFlushEx()
{
}

public KafkaMergedConsumerFlushEx partition(
public KafkaMergedConsumerFlushEx progress(
int partitionId,
long offset,
String metadata)
Expand All @@ -4662,7 +4662,7 @@ public KafkaMergedConsumerFlushEx partition(
return this;
}

public KafkaMergedConsumerFlushEx partition(
public KafkaMergedConsumerFlushEx progress(
int partitionId,
long offset)
{
Expand All @@ -4683,13 +4683,13 @@ private boolean match(
KafkaFlushExFW flushEx)
{
final KafkaMergedConsumerFlushExFW mergedFlushEx = flushEx.merged().consumer();
return matchPartition(mergedFlushEx);
return matchProgress(mergedFlushEx);
}

private boolean matchPartition(
private boolean matchProgress(
final KafkaMergedConsumerFlushExFW mergedFlush)
{
return partitionRW == null || partitionRW.build().equals(mergedFlush.partition());
return partitionRW == null || partitionRW.build().equals(mergedFlush.progress());
}
}
}
Expand Down Expand Up @@ -4878,15 +4878,15 @@ private KafkaConsumerFlushExMatchBuilder()
{
}

public KafkaConsumerFlushExMatchBuilder partition(
public KafkaConsumerFlushExMatchBuilder progress(
int partitionId,
long partitionOffset)
{
partition(partitionId, partitionOffset, null);
progress(partitionId, partitionOffset, null);
return this;
}

public KafkaConsumerFlushExMatchBuilder partition(
public KafkaConsumerFlushExMatchBuilder progress(
int partitionId,
long partitionOffset,
String metadata)
Expand Down Expand Up @@ -4919,14 +4919,14 @@ private boolean match(
KafkaFlushExFW flushEx)
{
KafkaConsumerFlushExFW consumerFlushEx = flushEx.consumer();
return matchPartition(consumerFlushEx) &&
return matchProgress(consumerFlushEx) &&
matchLeaderEpoch(consumerFlushEx);
}

private boolean matchPartition(
private boolean matchProgress(
final KafkaConsumerFlushExFW consumerFLushEx)
{
return partitionRW == null || partitionRW.build().equals(consumerFLushEx.partition());
return partitionRW == null || partitionRW.build().equals(consumerFLushEx.progress());
}

private boolean matchLeaderEpoch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ scope kafka

struct KafkaMergedConsumerFlushEx
{
KafkaOffset partition;
KafkaOffset progress;
}

struct KafkaMergedFetchFlushEx
Expand Down Expand Up @@ -442,7 +442,7 @@ scope kafka

struct KafkaConsumerFlushEx
{
KafkaOffset partition;
KafkaOffset progress;
int32 leaderEpoch = -1;
}

Expand Down Expand Up @@ -474,7 +474,7 @@ scope kafka

struct KafkaOffsetCommitDataEx
{
KafkaOffset partition;
KafkaOffset progress;
int32 generationId;
int32 leaderEpoch;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ read zilla:data.empty
write advise zilla:flush ${kafka:flushEx()
.typeId(zilla:id("kafka"))
.consumer()
.partition(0, 2, "test-meta")
.progress(0, 2, "test-meta")
.leaderEpoch(0)
.build()
.build()}

read advised zilla:flush ${kafka:matchFlushEx()
.typeId(zilla:id("kafka"))
.consumer()
.partition(0, 2, "test-meta")
.progress(0, 2, "test-meta")
.build()
.build()}
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ write flush
read advised zilla:flush ${kafka:flushEx()
.typeId(zilla:id("kafka"))
.consumer()
.partition(0, 2, "test-meta")
.progress(0, 2, "test-meta")
.leaderEpoch(0)
.build()
.build()}

write advise zilla:flush ${kafka:flushEx()
.typeId(zilla:id("kafka"))
.consumer()
.partition(0, 2, "test-meta")
.progress(0, 2, "test-meta")
.build()
.build()}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ connected
write zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
.offsetCommit()
.partition(0, 2, "test-meta")
.progress(0, 2, "test-meta")
.generationId(0)
.leaderEpoch(0)
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ connected
read zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
.offsetCommit()
.partition(0, 2, "test-meta")
.progress(0, 2, "test-meta")
.generationId(0)
.leaderEpoch(0)
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ write advise zilla:flush ${kafka:flushEx()
.typeId(zilla:id("kafka"))
.merged()
.consumer()
.partition(0, 2, "test-meta")
.progress(0, 3, "test-meta")
.build()
.build()}

read advised zilla:flush ${kafka:matchFlushEx()
.typeId(zilla:id("kafka"))
.merged()
.consumer()
.partition(0, 2, "test-meta")
.progress(0, 3, "test-meta")
.build()
.build()}
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ read advised zilla:flush ${kafka:flushEx()
.typeId(zilla:id("kafka"))
.merged()
.consumer()
.partition(0, 2, "test-meta")
.progress(0, 3, "test-meta")
.build()
.build()}

write advise zilla:flush ${kafka:flushEx()
.typeId(zilla:id("kafka"))
.merged()
.consumer()
.partition(0, 2, "test-meta")
.progress(0, 3, "test-meta")
.build()
.build()}
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ connected
read zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
.offsetFetch()
.partition(0, 1, 0, "test-meta")
.partition(0, 2, 0, "test-meta")
.build()
.build()}
read zilla:data.empty
Expand Down Expand Up @@ -263,7 +263,7 @@ connected
write zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
.offsetCommit()
.partition(0, 3, "test-meta")
.progress(0, 3, "test-meta")
.generationId(0)
.leaderEpoch(0)
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ connected
write zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
.offsetFetch()
.partition(0, 1, 0, "test-meta")
.partition(0, 2, 0, "test-meta")
.build()
.build()}
write zilla:data.empty
Expand Down Expand Up @@ -252,7 +252,7 @@ connected
read zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
.offsetCommit()
.partition(0, 3, "test-meta")
.progress(0, 3, "test-meta")
.generationId(0)
.leaderEpoch(0)
.build()
Expand Down
Loading