Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Return list of partitions as reply merge begin ex for produce_only as…
… well
  • Loading branch information
akrambek committed Jan 13, 2024
commit f165cd170eb7add2304e33c31de6a2eb5f92fcbb
Original file line number Diff line number Diff line change
Expand Up @@ -1592,28 +1592,34 @@ private void doMergedReplyBegin(
if (capabilities == FETCH_ONLY)
{
doBegin(sender, originId, routedId, replyId, replySeq, replyAck, replyMax,
traceId, authorization, affinity, beginExToKafka());
traceId, authorization, affinity, beginExToKafka(beginExToKafkaMergedFetchOnly()));
}
else if (capabilities == PRODUCE_ONLY)
{
doBegin(sender, originId, routedId, replyId, replySeq, replyAck, replyMax,
traceId, authorization, affinity, beginExToKafka(beginExToKafkaMergedProduceOnly()));
}
else
{
doBegin(sender, originId, routedId, replyId, replySeq, replyAck, replyMax,
traceId, authorization, affinity, EMPTY_EXTENSION);
traceId, authorization, affinity, EMPTY_EXTENSION);
}

doUnmergedFetchReplyWindowsIfNecessary(traceId);
}

private Flyweight.Builder.Visitor beginExToKafka()
private Flyweight.Builder.Visitor beginExToKafka(
Consumer<KafkaMergedBeginExFW.Builder> beginExToKafkaMerged)
{
return (buffer, offset, maxLimit) ->
kafkaBeginExRW.wrap(buffer, offset, maxLimit)
.typeId(kafkaTypeId)
.merged(beginExToKafkaMerged())
.merged(beginExToKafkaMerged)
.build()
.limit() - offset;
}

private Consumer<KafkaMergedBeginExFW.Builder> beginExToKafkaMerged()
private Consumer<KafkaMergedBeginExFW.Builder> beginExToKafkaMergedFetchOnly()
{
return builder ->
{
Expand All @@ -1640,6 +1646,15 @@ private Consumer<KafkaMergedBeginExFW.Builder> beginExToKafkaMerged()
};
}

private Consumer<KafkaMergedBeginExFW.Builder> beginExToKafkaMergedProduceOnly()
{
return builder ->
{
builder.capabilities(c -> c.set(PRODUCE_ONLY)).topic(topic);
leadersByPartitionId.intForEach((k, v) -> builder.partitionsItem(i -> i.partitionId(k)));
};
}

private void doMergedReplyData(
long traceId,
int flags,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ write zilla:begin.ext ${kafka:beginEx()

connected

read zilla:begin.ext ${kafka:matchBeginEx()
.typeId(zilla:id("kafka"))
.merged()
.capabilities("PRODUCE_ONLY")
.topic("test")
.partition(0, -1)
.partition(1, -1)
.build()
.build()}

write zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
.merged()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,17 @@ read zilla:begin.ext ${kafka:beginEx()

connected

write zilla:begin.ext ${kafka:beginEx()
.typeId(zilla:id("kafka"))
.merged()
.capabilities("PRODUCE_ONLY")
.topic("test")
.partition(0, -1)
.partition(1, -1)
.build()
.build()}
write flush

read zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
.merged()
Expand Down