Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
addressing review comments
  • Loading branch information
ankitk-me committed Jul 16, 2024
commit 6ff7bc0c913a7fa500f52c1c1eb4b55e378d1ef9
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public final class GrpcKafkaProxyFactory implements GrpcKafkaStreamFactory
new Varuint32FW.Builder().wrap(new UnsafeBuffer(new byte[1024 * 8]), 0, 1024 * 8);;

private final OctetsFW emptyRO = new OctetsFW().wrap(new UnsafeBuffer(0L, 0), 0, 0);
private final Array32FW<KafkaHeaderFW> emptyHeaderRO = new Array32FW<>(new KafkaHeaderFW());

private final BeginFW beginRO = new BeginFW();
private final DataFW dataRO = new DataFW();
Expand Down Expand Up @@ -582,7 +583,17 @@ protected void onKafkaData(
final KafkaDataExFW kafkaDataEx =
dataEx != null && dataEx.typeId() == kafkaTypeId ? extension.get(kafkaDataExRO::tryWrap) : null;

doGrpcBegin(traceId, authorization, 0L, kafkaDataEx.merged().fetch().headers());
if (kafkaDataEx != null &&
kafkaDataEx.merged() != null &&
kafkaDataEx.merged().fetch() != null &&
kafkaDataEx.merged().fetch().headers() != null)
{
doGrpcBegin(traceId, authorization, 0L, kafkaDataEx.merged().fetch().headers());
}
else
{
doGrpcBegin(traceId, authorization, 0L, emptyHeaderRO);
}
}

doGrpcData(traceId, authorization, budgetId, reserved, flags, buffer, offset, length);
Expand Down Expand Up @@ -1380,7 +1391,17 @@ protected void onKafkaData(
{
if (!GrpcKafkaState.replyOpening(state))
{
doGrpcBegin(traceId, authorization, 0L, kafkaDataEx.merged().fetch().headers());
if (kafkaDataEx != null &&
kafkaDataEx.merged() != null &&
kafkaDataEx.merged().fetch() != null &&
kafkaDataEx.merged().fetch().headers() != null)
{
doGrpcBegin(traceId, authorization, 0L, kafkaDataEx.merged().fetch().headers());
}
else
{
doGrpcBegin(traceId, authorization, 0L, emptyHeaderRO);
}
}

if (GrpcKafkaState.replyClosing(state))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ write close
read zilla:begin.ext ${grpc:matchBeginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.metadata("BASE64", "customProperty", "dGVzdA==")
.metadataBase64("customProperty", "dGVzdA==")
.build()}

read ${grpc:protobuf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ read closed
write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.metadata("BASE64", "customProperty", "dGVzdA==")
.metadataBase64("customProperty", "dGVzdA==")
.build()}

write ${grpc:protobuf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ write close
read zilla:begin.ext ${grpc:matchBeginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.metadata("BASE64", "customProperty", "dGVzdA==")
.metadataBase64("customProperty", "dGVzdA==")
.build()}

read ${grpc:protobuf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ read closed
write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.metadata("BASE64", "customProperty", "dGVzdA==")
.metadataBase64("customProperty", "dGVzdA==")
.build()}

write ${grpc:protobuf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ write flush
read zilla:begin.ext ${grpc:matchBeginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.metadata("BASE64", "customProperty", "dGVzdA==")
.metadataBase64("customProperty", "dGVzdA==")
.build()}

read ${grpc:protobuf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ read ${grpc:protobuf()
write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.metadata("BASE64", "customProperty", "dGVzdA==")
.metadataBase64("customProperty", "dGVzdA==")
.build()}

write ${grpc:protobuf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ write close
read zilla:begin.ext ${grpc:matchBeginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.metadata("BASE64", "customProperty", "dGVzdA==")
.metadataBase64("customProperty", "dGVzdA==")
.build()}

read ${grpc:protobuf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ read closed
write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.metadata("BASE64", "customProperty", "dGVzdA==")
.metadataBase64("customProperty", "dGVzdA==")
.build()}

write ${grpc:protobuf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ write close
read zilla:begin.ext ${grpc:matchBeginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.metadata("BASE64", "customProperty", "dGVzdA==")
.metadataBase64("customProperty", "dGVzdA==")
.build()}

read ${grpc:protobuf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ read closed
write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.metadata("BASE64", "customProperty", "dGVzdA==")
.metadataBase64("customProperty", "dGVzdA==")
.build()}

write ${grpc:protobuf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ write close
read zilla:begin.ext ${grpc:matchBeginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.metadata("BASE64", "customProperty", "dGVzdA==")
.metadataBase64("customProperty", "dGVzdA==")
.build()}

read ${grpc:protobuf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ read closed
write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.metadata("BASE64", "customProperty", "dGVzdA==")
.metadataBase64("customProperty", "dGVzdA==")
.build()}

write ${grpc:protobuf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public GrpcBeginExBuilder metadataBase64(
return metadata("BASE64", name, value);
}

public GrpcBeginExBuilder metadata(
private GrpcBeginExBuilder metadata(
String type,
String name,
String value)
Expand Down Expand Up @@ -238,7 +238,7 @@ public GrpcBeginExMatcherBuilder metadataBase64(
return metadata("BASE64", name, value);
}

public GrpcBeginExMatcherBuilder metadata(
private GrpcBeginExMatcherBuilder metadata(
String type,
String name,
String value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ write flush
read zilla:begin.ext ${grpc:matchBeginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.metadata("BASE64", "customProperty", "dGVzdA==")
.metadataBase64("customProperty", "dGVzdA==")
.build()}

read ${grpc:protobuf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ read ${grpc:protobuf()
write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.metadata("BASE64", "customProperty", "dGVzdA==")
.metadataBase64("customProperty", "dGVzdA==")
.build()}
write flush

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ write close
read zilla:begin.ext ${grpc:matchBeginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.metadata("BASE64", "customProperty", "dGVzdA==")
.metadataBase64("customProperty", "dGVzdA==")
.build()}

read ${grpc:protobuf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ read closed
write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.metadata("BASE64", "customProperty", "dGVzdA==")
.metadataBase64("customProperty", "dGVzdA==")
.build()}
write flush

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ write close
read zilla:begin.ext ${grpc:matchBeginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.metadata("BASE64", "customProperty", "dGVzdA==")
.metadataBase64("customProperty", "dGVzdA==")
.build()}

read ${grpc:protobuf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ read closed
write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.metadata("BASE64", "customProperty", "dGVzdA==")
.metadataBase64("customProperty", "dGVzdA==")
.build()}
write flush

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ write close
read zilla:begin.ext ${grpc:matchBeginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.metadata("BASE64", "customProperty", "dGVzdA==")
.metadataBase64("customProperty", "dGVzdA==")
.build()}

read ${grpc:protobuf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ read closed
write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.metadata("BASE64", "customProperty", "dGVzdA==")
.metadataBase64("customProperty", "dGVzdA==")
.build()}
write flush

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ write close
read zilla:begin.ext ${grpc:matchBeginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.metadata("BASE64", "customProperty", "dGVzdA==")
.metadataBase64("customProperty", "dGVzdA==")
.build()}

read ${grpc:protobuf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ read closed
write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.metadata("BASE64", "customProperty", "dGVzdA==")
.metadataBase64("customProperty", "dGVzdA==")
.build()}
write flush

Expand Down