Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
script update & extract and pass response metadata implementation update
  • Loading branch information
ankitk-me committed Jun 25, 2024
commit cf715fc3287a8ba5ce11e8e06227495791caa05a
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import java.util.function.Supplier;

import org.agrona.DirectBuffer;
import org.agrona.ExpandableDirectByteBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;

import io.aklivity.zilla.runtime.binding.grpc.kafka.config.GrpcKafkaCorrelationConfig;
Expand All @@ -31,9 +33,12 @@
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.KafkaOffsetType;
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.OctetsFW;
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.String16FW;
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.stream.GrpcMetadataFW;

public class GrpcKafkaWithProduceResult
{
public static final String META_PREFIX = "meta:";

private static final KafkaOffsetFW KAFKA_OFFSET_HISTORICAL =
new KafkaOffsetFW.Builder()
.wrap(new UnsafeBuffer(new byte[32]), 0, 32)
Expand All @@ -50,10 +55,13 @@ public class GrpcKafkaWithProduceResult
private final GrpcKafkaWithProduceHash hash;
private final String16FW service;
private final String16FW method;
private final Array32FW<GrpcMetadataFW> metadata;
private final ExpandableDirectByteBuffer buffer;
Comment thread
ankitk-me marked this conversation as resolved.
Outdated

GrpcKafkaWithProduceResult(
String16FW service,
String16FW method,
Array32FW<GrpcMetadataFW> metadata,
String16FW topic,
KafkaAckMode acks,
Supplier<DirectBuffer> keyRef,
Expand All @@ -64,13 +72,16 @@ public class GrpcKafkaWithProduceResult
{
this.service = service;
this.method = method;
this.metadata = metadata;
this.overrides = overrides;
this.replyTo = replyTo;
this.correlation = correlation;
this.topic = topic;
this.acks = acks;
this.keyRef = keyRef;
this.hash = hash;
this.buffer = new ExpandableDirectByteBuffer();
this.buffer.putStringWithoutLengthAscii(0, META_PREFIX);
Comment thread
ankitk-me marked this conversation as resolved.
Outdated

hash.updateHash(correlation.service.value());
hash.updateHash(service.value());
Expand Down Expand Up @@ -140,6 +151,22 @@ public void headers(
builder.item(this::method);
builder.item(this::replyTo);
builder.item(this::correlationId);
metadata.forEach(m -> builder.item(i -> metadata(i, m)));
}

private void metadata(
KafkaHeaderFW.Builder builder,
GrpcMetadataFW metadata)
{
int nameLen = metadata.nameLen();
int nameLenWithPrefix = nameLen + 5;
buffer.putBytes(5, metadata.name().value(), 0, nameLen);
Comment thread
ankitk-me marked this conversation as resolved.
Outdated

builder
.nameLen(nameLenWithPrefix)
.name(buffer, 0, nameLenWithPrefix)
.valueLen(metadata.valueLen())
.value(metadata.value().value(), 0, metadata.valueLen());
}

private void service(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public GrpcKafkaWithProduceResult resolveProduce(

String16FW replyTo = new String16FW(produce.replyTo);

return new GrpcKafkaWithProduceResult(service, method, topic, acks, keyRef, overrides, replyTo,
return new GrpcKafkaWithProduceResult(service, method, metadata, topic, acks, keyRef, overrides, replyTo,
options.correlation, hash);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package io.aklivity.zilla.runtime.binding.grpc.kafka.internal.stream;

import static io.aklivity.zilla.runtime.binding.grpc.kafka.internal.config.GrpcKafkaWithProduceResult.META_PREFIX;
import static io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.KafkaCapabilities.FETCH_ONLY;
import static io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.KafkaCapabilities.PRODUCE_ONLY;
import static java.time.Instant.now;
Expand Down Expand Up @@ -46,7 +47,9 @@
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.stream.GrpcAbortExFW;
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.stream.GrpcBeginExFW;
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.stream.GrpcDataExFW;
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.stream.GrpcMetadataFW;
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.stream.GrpcResetExFW;
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.stream.GrpcType;
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.stream.KafkaBeginExFW;
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.stream.KafkaDataExFW;
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.stream.KafkaMergedBeginExFW;
Expand Down Expand Up @@ -106,16 +109,20 @@ public final class GrpcKafkaProxyFactory implements GrpcKafkaStreamFactory
private final KafkaBeginExFW kafkaBeginExRO = new KafkaBeginExFW();
private final KafkaDataExFW kafkaDataExRO = new KafkaDataExFW();

private final GrpcBeginExFW.Builder grpcBeginExRW = new GrpcBeginExFW.Builder();
private final GrpcDataExFW.Builder grpcDataExRW = new GrpcDataExFW.Builder();
private final GrpcResetExFW.Builder grpcResetExRW = new GrpcResetExFW.Builder();
private final GrpcAbortExFW.Builder grpcAbortExRW = new GrpcAbortExFW.Builder();
private final Array32FW.Builder<GrpcMetadataFW.Builder, GrpcMetadataFW> grpcMetadataRW =
new Array32FW.Builder<>(new GrpcMetadataFW.Builder(), new GrpcMetadataFW());

private final KafkaBeginExFW.Builder kafkaBeginExRW = new KafkaBeginExFW.Builder();
private final KafkaDataExFW.Builder kafkaDataExRW = new KafkaDataExFW.Builder();
private final GrpcKafkaIdHelper messageFieldHelper = new GrpcKafkaIdHelper();

private final MutableDirectBuffer writeBuffer;
private final MutableDirectBuffer extBuffer;
private final MutableDirectBuffer metaBuffer;
private final BindingHandler streamFactory;
private final LongUnaryOperator supplyInitialId;
private final LongUnaryOperator supplyReplyId;
Expand All @@ -131,6 +138,7 @@ public GrpcKafkaProxyFactory(
{
this.writeBuffer = context.writeBuffer();
this.extBuffer = new UnsafeBuffer(new byte[context.writeBuffer().capacity()]);
this.metaBuffer = new UnsafeBuffer(new byte[context.writeBuffer().capacity()]);
this.streamFactory = context.streamFactory();
this.supplyInitialId = context::supplyInitialId;
this.supplyReplyId = context::supplyReplyId;
Expand Down Expand Up @@ -847,8 +855,6 @@ private void onKafkaBegin(
int lenSize = len.sizeof();
replyPad = result.fieldId().sizeof() + lenSize + partitions.sizeof();
}

delegate.onKafkaBegin(traceId, authorization, extension);
}

private void onKafkaData(
Expand Down Expand Up @@ -879,6 +885,8 @@ private void onKafkaData(
final OctetsFW payload = data.payload();
final OctetsFW extension = data.extension();

delegate.onKafkaBegin(traceId, authorization, extension);

final MutableDirectBuffer encodeBuffer = writeBuffer;
final int encodeOffset = DataFW.FIELD_OFFSET_PAYLOAD;
final int payloadSize = payload.sizeof();
Expand Down Expand Up @@ -1278,11 +1286,12 @@ protected void onKafkaAbort(
@Override
protected void onKafkaBegin(
long traceId,
long authorization, OctetsFW extension)
long authorization,
OctetsFW extension)
{
if (!GrpcKafkaState.replyOpening(state))
{
doGrpcBegin(traceId, authorization, 0L, emptyRO);
doGrpcBegin(traceId, authorization, 0L, extension);
}
}

Expand All @@ -1296,6 +1305,11 @@ protected void onKafkaData(
OctetsFW payload,
KafkaDataExFW kafkaDataEx)
{
if (!GrpcKafkaState.replyOpening(state))
{
doGrpcBegin(traceId, authorization, 0L, kafkaDataEx.merged().fetch().headers());
}

if (GrpcKafkaState.replyClosing(state))
{
replySeq += reserved;
Expand Down Expand Up @@ -1388,6 +1402,39 @@ private void doGrpcBegin(
traceId, authorization, affinity, extension);
}

private void doGrpcBegin(
long traceId,
long authorization,
long affinity,
Array32FW<KafkaHeaderFW> headers)
{
state = GrpcKafkaState.openingReply(state);

Array32FW.Builder<GrpcMetadataFW.Builder, GrpcMetadataFW> builder =
grpcMetadataRW.wrap(metaBuffer, 0, metaBuffer.capacity());

headers.forEach(h ->
{
if (META_PREFIX.equals(h.name().value().getStringWithoutLengthUtf8(0, 5)))
Comment thread
ankitk-me marked this conversation as resolved.
Outdated
{
builder.item(m -> m.type(t -> t.set(GrpcType.TEXT))
.nameLen(h.nameLen() - 5)
.name(h.name().value(), 5, h.nameLen() - 5)
.valueLen(h.valueLen())
.value(h.value()));
}
});
Comment thread
ankitk-me marked this conversation as resolved.

GrpcBeginExFW beginEx = grpcBeginExRW
.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(grpcTypeId)
.metadata(builder.build())
.build();

doBegin(grpc, originId, routedId, replyId, replySeq, replyAck, replyMax,
traceId, authorization, affinity, beginEx);
}

private void doGrpcData(
long traceId,
long authorization,
Expand Down Expand Up @@ -1637,7 +1684,7 @@ private void onKafkaBegin(
assert replyAck <= replySeq;

doKafkaWindow(traceId);
delegate.onKafkaBegin(traceId, authorization, extension);
//delegate.onKafkaBegin(traceId, authorization, extension);
}

private void onKafkaEnd(
Expand Down Expand Up @@ -1923,7 +1970,7 @@ private void onKafkaBegin(

assert replyAck <= replySeq;

delegate.onKafkaBegin(traceId, authorization, extension);
//delegate.onKafkaBegin(traceId, authorization, extension);
doKafkaWindow(traceId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
connect "zilla://streams/grpc0"
option zilla:window 8192
option zilla:transmission "half-duplex"
option zilla:update "proactive"

write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
connect "zilla://streams/grpc0"
option zilla:window 8192
option zilla:transmission "half-duplex"
option zilla:update "proactive"

write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
connect "zilla://streams/grpc0"
option zilla:window 8192
option zilla:transmission "half-duplex"
option zilla:update "proactive"

write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
connect "zilla://streams/grpc0"
option zilla:window 8192
option zilla:transmission "half-duplex"
option zilla:update "proactive"

write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
connect "zilla://streams/grpc0"
option zilla:window 8192
option zilla:transmission "half-duplex"
option zilla:update "proactive"

write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
connect "zilla://streams/grpc0"
option zilla:window 8192
option zilla:transmission "half-duplex"
option zilla:update "proactive"

write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
connect "zilla://streams/grpc0"
option zilla:window 8192
option zilla:transmission "half-duplex"
option zilla:update "proactive"

write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ property string100k ${core:randomString(102400)}
connect "zilla://streams/grpc0"
option zilla:window 8192
option zilla:transmission "half-duplex"
option zilla:update "proactive"

write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
connect "zilla://streams/grpc0"
option zilla:window 8192
option zilla:transmission "half-duplex"
option zilla:update "proactive"

write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
connect "zilla://streams/grpc0"
option zilla:window 8192
option zilla:transmission "half-duplex"
option zilla:update "proactive"

write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
connect "zilla://streams/grpc0"
option zilla:window 8192
option zilla:transmission "half-duplex"
option zilla:update "proactive"

write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ write zilla:data.ext ${kafka:dataEx()
.header("zilla:method", "EchoBidiStream")
.header("zilla:reply-to", "responses")
.header("zilla:correlation-id", "59410e57-3e0f-4b61-9328-f645a7968ac8-14f43bae85c2ce394c9bd81ff6fa9d77")
.header("meta:custom", "test")
.header("meta:idempotency-key", "59410e57-3e0f-4b61-9328-f645a7968ac8")
.build()
.build()}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ read zilla:data.ext ${kafka:matchDataEx()
.header("zilla:method", "EchoBidiStream")
.header("zilla:reply-to", "responses")
.header("zilla:correlation-id", "59410e57-3e0f-4b61-9328-f645a7968ac8-14f43bae85c2ce394c9bd81ff6fa9d77")
.header("meta:custom", "test")
.header("meta:idempotency-key", "59410e57-3e0f-4b61-9328-f645a7968ac8")
.build()
.build()}
read zilla:data.null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ write zilla:data.ext ${kafka:dataEx()
.header("zilla:method", "EchoBidiStream")
.header("zilla:reply-to", "responses")
.header("zilla:correlation-id", "59410e57-3e0f-4b61-9328-f645a7968ac8-14f43bae85c2ce394c9bd81ff6fa9d77")
.header("meta:custom", "test")
.header("meta:idempotency-key", "59410e57-3e0f-4b61-9328-f645a7968ac8")
.build()
.build()}
write ${grpc:protobuf()
Expand All @@ -83,6 +85,8 @@ write zilla:data.ext ${kafka:dataEx()
.header("zilla:method", "EchoBidiStream")
.header("zilla:reply-to", "responses")
.header("zilla:correlation-id", "59410e57-3e0f-4b61-9328-f645a7968ac8-14f43bae85c2ce394c9bd81ff6fa9d77")
.header("meta:custom", "test")
.header("meta:idempotency-key", "59410e57-3e0f-4b61-9328-f645a7968ac8")
.build()
.build()}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ read zilla:data.ext ${kafka:matchDataEx()
.header("zilla:method", "EchoBidiStream")
.header("zilla:reply-to", "responses")
.header("zilla:correlation-id", "59410e57-3e0f-4b61-9328-f645a7968ac8-14f43bae85c2ce394c9bd81ff6fa9d77")
.header("meta:custom", "test")
.header("meta:idempotency-key", "59410e57-3e0f-4b61-9328-f645a7968ac8")
.build()
.build()}
read ${grpc:protobuf()
Expand All @@ -81,6 +83,8 @@ read zilla:data.ext ${kafka:matchDataEx()
.header("zilla:method", "EchoBidiStream")
.header("zilla:reply-to", "responses")
.header("zilla:correlation-id", "59410e57-3e0f-4b61-9328-f645a7968ac8-14f43bae85c2ce394c9bd81ff6fa9d77")
.header("meta:custom", "test")
.header("meta:idempotency-key", "59410e57-3e0f-4b61-9328-f645a7968ac8")
.build()
.build()}
read zilla:data.null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ write zilla:data.ext ${kafka:dataEx()
.header("zilla:method", "EchoClientStream")
.header("zilla:reply-to", "responses")
.header("zilla:correlation-id", "59410e57-3e0f-4b61-9328-f645a7968ac8-97e816c79fb6624b7d221026cc36107c")
.header("meta:custom", "test")
.header("meta:idempotency-key", "59410e57-3e0f-4b61-9328-f645a7968ac8")
.build()
.build()}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ read zilla:data.ext ${kafka:matchDataEx()
.header("zilla:method", "EchoClientStream")
.header("zilla:reply-to", "responses")
.header("zilla:correlation-id", "59410e57-3e0f-4b61-9328-f645a7968ac8-97e816c79fb6624b7d221026cc36107c")
.header("meta:custom", "test")
.header("meta:idempotency-key", "59410e57-3e0f-4b61-9328-f645a7968ac8")
.build()
.build()}
read zilla:data.null
Expand Down
Loading