Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.function.Supplier;

import org.agrona.DirectBuffer;
import org.agrona.ExpandableDirectByteBuffer;
import org.agrona.concurrent.UnsafeBuffer;

import io.aklivity.zilla.runtime.binding.grpc.kafka.config.GrpcKafkaCorrelationConfig;
Expand All @@ -31,9 +32,12 @@
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.KafkaOffsetType;
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.OctetsFW;
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.String16FW;
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.stream.GrpcMetadataFW;

public class GrpcKafkaWithProduceResult
{
public static final String META_PREFIX = "meta:";

private static final KafkaOffsetFW KAFKA_OFFSET_HISTORICAL =
new KafkaOffsetFW.Builder()
.wrap(new UnsafeBuffer(new byte[32]), 0, 32)
Expand All @@ -50,10 +54,13 @@ public class GrpcKafkaWithProduceResult
private final GrpcKafkaWithProduceHash hash;
private final String16FW service;
private final String16FW method;
private final Array32FW<GrpcMetadataFW> metadata;
private final ExpandableDirectByteBuffer buffer;
Comment thread
ankitk-me marked this conversation as resolved.
Outdated

GrpcKafkaWithProduceResult(
String16FW service,
String16FW method,
Array32FW<GrpcMetadataFW> metadata,
String16FW topic,
KafkaAckMode acks,
Supplier<DirectBuffer> keyRef,
Expand All @@ -64,13 +71,16 @@ public class GrpcKafkaWithProduceResult
{
this.service = service;
this.method = method;
this.metadata = metadata;
this.overrides = overrides;
this.replyTo = replyTo;
this.correlation = correlation;
this.topic = topic;
this.acks = acks;
this.keyRef = keyRef;
this.hash = hash;
this.buffer = new ExpandableDirectByteBuffer();
this.buffer.putStringWithoutLengthAscii(0, META_PREFIX);
Comment thread
ankitk-me marked this conversation as resolved.
Outdated

hash.updateHash(correlation.service.value());
hash.updateHash(service.value());
Expand Down Expand Up @@ -140,6 +150,22 @@ public void headers(
builder.item(this::method);
builder.item(this::replyTo);
builder.item(this::correlationId);
metadata.forEach(m -> builder.item(i -> metadata(i, m)));
}

private void metadata(
KafkaHeaderFW.Builder builder,
GrpcMetadataFW metadata)
{
int nameLen = metadata.nameLen();
int nameLenWithPrefix = nameLen + 5;
buffer.putBytes(5, metadata.name().value(), 0, nameLen);
Comment thread
ankitk-me marked this conversation as resolved.
Outdated

builder
.nameLen(nameLenWithPrefix)
.name(buffer, 0, nameLenWithPrefix)
.valueLen(metadata.valueLen())
.value(metadata.value().value(), 0, metadata.valueLen());
}

private void service(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public GrpcKafkaWithProduceResult resolveProduce(

String16FW replyTo = new String16FW(produce.replyTo);

return new GrpcKafkaWithProduceResult(service, method, topic, acks, keyRef, overrides, replyTo,
return new GrpcKafkaWithProduceResult(service, method, metadata, topic, acks, keyRef, overrides, replyTo,
options.correlation, hash);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package io.aklivity.zilla.runtime.binding.grpc.kafka.internal.stream;

import static io.aklivity.zilla.runtime.binding.grpc.kafka.internal.config.GrpcKafkaWithProduceResult.META_PREFIX;
import static io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.KafkaCapabilities.FETCH_ONLY;
import static io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.KafkaCapabilities.PRODUCE_ONLY;
import static java.time.Instant.now;
Expand Down Expand Up @@ -46,7 +47,9 @@
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.stream.GrpcAbortExFW;
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.stream.GrpcBeginExFW;
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.stream.GrpcDataExFW;
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.stream.GrpcMetadataFW;
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.stream.GrpcResetExFW;
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.stream.GrpcType;
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.stream.KafkaBeginExFW;
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.stream.KafkaDataExFW;
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.stream.KafkaMergedBeginExFW;
Expand Down Expand Up @@ -106,16 +109,20 @@ public final class GrpcKafkaProxyFactory implements GrpcKafkaStreamFactory
private final KafkaBeginExFW kafkaBeginExRO = new KafkaBeginExFW();
private final KafkaDataExFW kafkaDataExRO = new KafkaDataExFW();

private final GrpcBeginExFW.Builder grpcBeginExRW = new GrpcBeginExFW.Builder();
private final GrpcDataExFW.Builder grpcDataExRW = new GrpcDataExFW.Builder();
private final GrpcResetExFW.Builder grpcResetExRW = new GrpcResetExFW.Builder();
private final GrpcAbortExFW.Builder grpcAbortExRW = new GrpcAbortExFW.Builder();
private final Array32FW.Builder<GrpcMetadataFW.Builder, GrpcMetadataFW> grpcMetadataRW =
new Array32FW.Builder<>(new GrpcMetadataFW.Builder(), new GrpcMetadataFW());

private final KafkaBeginExFW.Builder kafkaBeginExRW = new KafkaBeginExFW.Builder();
private final KafkaDataExFW.Builder kafkaDataExRW = new KafkaDataExFW.Builder();
private final GrpcKafkaIdHelper messageFieldHelper = new GrpcKafkaIdHelper();

private final MutableDirectBuffer writeBuffer;
private final MutableDirectBuffer extBuffer;
private final MutableDirectBuffer metaBuffer;
private final BindingHandler streamFactory;
private final LongUnaryOperator supplyInitialId;
private final LongUnaryOperator supplyReplyId;
Expand All @@ -131,6 +138,7 @@ public GrpcKafkaProxyFactory(
{
this.writeBuffer = context.writeBuffer();
this.extBuffer = new UnsafeBuffer(new byte[context.writeBuffer().capacity()]);
this.metaBuffer = new UnsafeBuffer(new byte[context.writeBuffer().capacity()]);
this.streamFactory = context.streamFactory();
this.supplyInitialId = context::supplyInitialId;
this.supplyReplyId = context::supplyReplyId;
Expand Down Expand Up @@ -531,7 +539,8 @@ protected void onKafkaAbort(
@Override
protected void onKafkaBegin(
long traceId,
long authorization, OctetsFW extension)
long authorization,
OctetsFW extension)
{
if (!GrpcKafkaState.replyOpening(state))
{
Expand Down Expand Up @@ -1139,7 +1148,7 @@ private void onGrpcData(
assert acknowledge <= sequence;
assert sequence >= initialSeq;

initialSeq = sequence;
initialSeq = sequence + reserved;

assert initialAck <= initialSeq;

Expand Down Expand Up @@ -1272,17 +1281,23 @@ protected void onKafkaAbort(
long traceId,
long authorization)
{
if (!GrpcKafkaState.replyOpening(state))
{
doGrpcBegin(traceId, authorization, 0L, emptyRO);
}

cleanup(traceId, authorization);
}

@Override
protected void onKafkaBegin(
long traceId,
long authorization, OctetsFW extension)
long authorization,
OctetsFW extension)
{
if (!GrpcKafkaState.replyOpening(state))
{
doGrpcBegin(traceId, authorization, 0L, emptyRO);
doGrpcBegin(traceId, authorization, 0L, extension);
}
}

Expand All @@ -1296,6 +1311,11 @@ protected void onKafkaData(
OctetsFW payload,
KafkaDataExFW kafkaDataEx)
{
if (!GrpcKafkaState.replyOpening(state))
{
doGrpcBegin(traceId, authorization, 0L, kafkaDataEx.merged().fetch().headers());
}

if (GrpcKafkaState.replyClosing(state))
{
replySeq += reserved;
Expand Down Expand Up @@ -1388,6 +1408,39 @@ private void doGrpcBegin(
traceId, authorization, affinity, extension);
}

private void doGrpcBegin(
long traceId,
long authorization,
long affinity,
Array32FW<KafkaHeaderFW> headers)
{
state = GrpcKafkaState.openingReply(state);

Array32FW.Builder<GrpcMetadataFW.Builder, GrpcMetadataFW> builder =
grpcMetadataRW.wrap(metaBuffer, 0, metaBuffer.capacity());

headers.forEach(h ->
{
if (META_PREFIX.equals(h.name().value().getStringWithoutLengthUtf8(0, 5)))
Comment thread
ankitk-me marked this conversation as resolved.
Outdated
{
builder.item(m -> m.type(t -> t.set(GrpcType.TEXT))
.nameLen(h.nameLen() - 5)
.name(h.name().value(), 5, h.nameLen() - 5)
.valueLen(h.valueLen())
.value(h.value()));
}
});
Comment thread
ankitk-me marked this conversation as resolved.

GrpcBeginExFW beginEx = grpcBeginExRW
.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(grpcTypeId)
.metadata(builder.build())
.build();

doBegin(grpc, originId, routedId, replyId, replySeq, replyAck, replyMax,
traceId, authorization, affinity, beginEx);
}

private void doGrpcData(
long traceId,
long authorization,
Expand Down Expand Up @@ -1623,8 +1676,6 @@ private void onKafkaBegin(
final long sequence = begin.sequence();
final long acknowledge = begin.acknowledge();
final long traceId = begin.traceId();
final long authorization = begin.authorization();
final OctetsFW extension = begin.extension();

assert acknowledge <= sequence;
assert sequence >= replySeq;
Expand All @@ -1637,7 +1688,6 @@ private void onKafkaBegin(
assert replyAck <= replySeq;

doKafkaWindow(traceId);
delegate.onKafkaBegin(traceId, authorization, extension);
}

private void onKafkaEnd(
Expand Down Expand Up @@ -1910,8 +1960,6 @@ private void onKafkaBegin(
final long sequence = begin.sequence();
final long acknowledge = begin.acknowledge();
final long traceId = begin.traceId();
final long authorization = begin.authorization();
final OctetsFW extension = begin.extension();

assert acknowledge <= sequence;
assert sequence >= replySeq;
Expand All @@ -1923,7 +1971,6 @@ private void onKafkaBegin(

assert replyAck <= replySeq;

delegate.onKafkaBegin(traceId, authorization, extension);
doKafkaWindow(traceId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
connect "zilla://streams/grpc0"
option zilla:window 8192
option zilla:transmission "half-duplex"
option zilla:update "proactive"

write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
Expand All @@ -33,11 +34,17 @@ write ${grpc:protobuf()
.build()}
write flush

read zilla:begin.ext ${grpc:matchBeginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.build()}

read ${grpc:protobuf()
.string(1, "Hello World1")
.build()}

read notify DATA_RECEIVED

read zilla:reset.ext ${grpc:resetEx()
.typeId(zilla:id("grpc"))
.status("13")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ read ${grpc:protobuf()
.string(1, "Hello World1")
.build()}

write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.build()}

write ${grpc:protobuf()
.string(1, "Hello World1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
connect "zilla://streams/grpc0"
option zilla:window 8192
option zilla:transmission "half-duplex"
option zilla:update "proactive"

write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
Expand All @@ -38,6 +39,11 @@ write ${grpc:protobuf()
.build()}
write flush

read zilla:begin.ext ${grpc:matchBeginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.build()}

read ${grpc:protobuf()
.string(1, "Hello World1")
.build()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ read ${grpc:protobuf()
.string(1, "Hello World2")
.build()}

write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.build()}

write ${grpc:protobuf()
.string(1, "Hello World1")
.build()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
connect "zilla://streams/grpc0"
option zilla:window 8192
option zilla:transmission "half-duplex"
option zilla:update "proactive"

write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
connect "zilla://streams/grpc0"
option zilla:window 8192
option zilla:transmission "half-duplex"
option zilla:update "proactive"

write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
Expand All @@ -40,6 +41,11 @@ write flush

write close

read zilla:begin.ext ${grpc:matchBeginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.build()}

read ${grpc:protobuf()
.string(1, "Hello World")
.build()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ read ${grpc:protobuf()

read closed

write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.build()}

write ${grpc:protobuf()
.string(1, "Hello World")
.build()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
connect "zilla://streams/grpc0"
option zilla:window 8192
option zilla:transmission "half-duplex"
option zilla:update "proactive"

write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
connect "zilla://streams/grpc0"
option zilla:window 8192
option zilla:transmission "half-duplex"
option zilla:update "proactive"

write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
Expand All @@ -36,6 +37,11 @@ write flush

write close

read zilla:begin.ext ${grpc:matchBeginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.build()}

read ${grpc:protobuf()
.string(1, "Hello World1")
.build()}
Expand Down
Loading