Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Include metadata and partitionOffset into merge reply begin
  • Loading branch information
akrambek committed Dec 1, 2023
commit e2fcf2cefb97550a184c45e3f301660c1802fe11
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.Long2LongHashMap;
import org.agrona.collections.MutableInteger;
import org.agrona.collections.MutableLong;
import org.agrona.collections.MutableReference;
import org.agrona.concurrent.UnsafeBuffer;

Expand Down Expand Up @@ -158,6 +159,8 @@ public final class KafkaMergedFactory implements BindingHandler
private final MutableInteger initialNoAckRW = new MutableInteger();
private final MutableInteger initialPadRW = new MutableInteger();
private final MutableInteger initialMaxRW = new MutableInteger();
private final MutableLong partitionOffsetRW = new MutableLong();
private final StringBuilder metadataRW = new StringBuilder();

private final int kafkaTypeId;
private final MutableDirectBuffer writeBuffer;
Expand Down Expand Up @@ -1614,12 +1617,25 @@ private Consumer<KafkaMergedBeginExFW.Builder> beginExToKafkaMerged()
return builder ->
{
builder.capabilities(c -> c.set(FETCH_ONLY)).topic(topic);
latestOffsetByPartitionId.longForEach((k, v) -> builder
.partitionsItem(i -> i
.partitionId((int) k)
.partitionOffset(0L)
.stableOffset(stableOffsetByPartitionId.get(k))
.latestOffset(v)));
latestOffsetByPartitionId.longForEach((k, v) ->
{
partitionOffsetRW.value = 0;
metadataRW.setLength(0);
if (!offsetsByPartitionId.isEmpty())
{
final KafkaPartitionOffset kafkaPartitionOffset = offsetsByPartitionId.get(k);
partitionOffsetRW.value = kafkaPartitionOffset.partitionOffset;
metadataRW.append(kafkaPartitionOffset.metadata);
}

builder
.partitionsItem(i -> i
.partitionId((int) k)
.partitionOffset(partitionOffsetRW.value)
.stableOffset(stableOffsetByPartitionId.get(k))
.latestOffset(v)
.metadata(metadataRW.toString()));
});
};
}

Expand Down