Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.Long2LongHashMap;
import org.agrona.collections.MutableInteger;
import org.agrona.collections.MutableLong;
import org.agrona.collections.MutableReference;
import org.agrona.concurrent.UnsafeBuffer;

Expand Down Expand Up @@ -158,6 +159,8 @@ public final class KafkaMergedFactory implements BindingHandler
private final MutableInteger initialNoAckRW = new MutableInteger();
private final MutableInteger initialPadRW = new MutableInteger();
private final MutableInteger initialMaxRW = new MutableInteger();
private final MutableLong partitionOffsetRW = new MutableLong();
private final StringBuilder metadataRW = new StringBuilder();

private final int kafkaTypeId;
private final MutableDirectBuffer writeBuffer;
Expand Down Expand Up @@ -1614,12 +1617,25 @@ private Consumer<KafkaMergedBeginExFW.Builder> beginExToKafkaMerged()
return builder ->
{
builder.capabilities(c -> c.set(FETCH_ONLY)).topic(topic);
latestOffsetByPartitionId.longForEach((k, v) -> builder
.partitionsItem(i -> i
.partitionId((int) k)
.partitionOffset(0L)
.stableOffset(stableOffsetByPartitionId.get(k))
.latestOffset(v)));
latestOffsetByPartitionId.longForEach((k, v) ->
{
partitionOffsetRW.value = 0;
metadataRW.setLength(0);
if (!offsetsByPartitionId.isEmpty())
{
final KafkaPartitionOffset kafkaPartitionOffset = offsetsByPartitionId.get((int) k);
partitionOffsetRW.value = kafkaPartitionOffset.partitionOffset;
metadataRW.append(kafkaPartitionOffset.metadata);
}

builder
.partitionsItem(i -> i
.partitionId((int) k)
.partitionOffset(partitionOffsetRW.value)
.stableOffset(stableOffsetByPartitionId.get(k))
.latestOffset(v)
.metadata(metadataRW.length() > 0 ? metadataRW.toString() : null));
});
};
}

Expand Down Expand Up @@ -1948,7 +1964,7 @@ private void onTopicOffsetFetchDataChanged(
p.partitionOffset(),
0,
p.leaderEpoch(),
null)));
p.metadata().asString())));

doFetchPartitionsIfNecessary(traceId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,11 +1093,23 @@ public KafkaMergedBeginExBuilder partition(
long offset,
long stableOffset,
long latestOffset)
{
partition(partitionId, offset, stableOffset, latestOffset, null);
return this;
}

public KafkaMergedBeginExBuilder partition(
int partitionId,
long offset,
long stableOffset,
long latestOffset,
String metadata)
{
mergedBeginExRW.partitionsItem(p -> p.partitionId(partitionId)
.partitionOffset(offset)
.stableOffset(stableOffset)
.latestOffset(latestOffset));
.partitionOffset(offset)
.stableOffset(stableOffset)
.latestOffset(latestOffset)
.metadata(metadata));
return this;
}

Expand Down Expand Up @@ -5539,6 +5551,17 @@ public KafkaMergedBeginExMatcherBuilder partition(
long offset,
long stableOffset,
long latestOffset)
{
partition(partitionId, offset, stableOffset, latestOffset, null);
return this;
}

public KafkaMergedBeginExMatcherBuilder partition(
int partitionId,
long offset,
long stableOffset,
long latestOffset,
String metadata)
{
if (partitionsRW == null)
{
Expand All @@ -5549,7 +5572,8 @@ public KafkaMergedBeginExMatcherBuilder partition(
.partitionId(partitionId)
.partitionOffset(offset)
.stableOffset(stableOffset)
.latestOffset(latestOffset));
.latestOffset(latestOffset)
.metadata(metadata));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ write zilla:begin.ext ${kafka:beginEx()

connected

read zilla:begin.ext ${kafka:matchBeginEx()
.typeId(zilla:id("kafka"))
.merged()
.capabilities("FETCH_ONLY")
.topic("test")
.partition(0, 2, 2, 2, "test-meta")
.build()
.build()}

read zilla:data.ext ${kafka:matchDataEx()
.typeId(zilla:id("kafka"))
.merged()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ read zilla:begin.ext ${kafka:beginEx()

connected

write zilla:begin.ext ${kafka:beginEx()
.typeId(zilla:id("kafka"))
.merged()
.capabilities("FETCH_ONLY")
.topic("test")
.partition(0, 2, 2, 2, "test-meta")
.build()
.build()}
write flush

write zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
.merged()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ read zilla:begin.ext ${kafka:beginEx()
read zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
.meta()
.partition(0, 1)
.partition(0, 2)
.partition(1, 2)
.build()
.build()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ write flush
write zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
.meta()
.partition(0, 1)
.partition(0, 2)
.partition(1, 2)
.build()
.build()}
Expand Down