Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
addressing review comments
  • Loading branch information
ankitk-me committed Jul 9, 2024
commit 2cbff6c06ea56c3d48c88ec3dc6e1029b251d97a
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.OctetsFW;
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.String16FW;
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.stream.GrpcMetadataFW;
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.stream.GrpcType;

public class GrpcKafkaWithProduceResult
{
public static final String META_PREFIX = "meta:";
public static final String BIN_SUFFIX = "-bin";
private static final int META_PREFIX_LENGTH = 5;
private static final int BIN_SUFFIX_LENGTH = 4;

private static final int META_PREFIX_LENGTH = META_PREFIX.length();
private static final int BIN_SUFFIX_LENGTH = BIN_SUFFIX.length();

private static final KafkaOffsetFW KAFKA_OFFSET_HISTORICAL =
new KafkaOffsetFW.Builder()
Expand All @@ -60,7 +62,7 @@ public class GrpcKafkaWithProduceResult
private final String16FW service;
private final String16FW method;
private final Array32FW<GrpcMetadataFW> metadata;
private final ExpandableDirectByteBuffer buffer;
private final ExpandableDirectByteBuffer nameBuffer;

GrpcKafkaWithProduceResult(
String16FW service,
Expand All @@ -84,8 +86,8 @@ public class GrpcKafkaWithProduceResult
this.acks = acks;
this.keyRef = keyRef;
this.hash = hash;
this.buffer = new ExpandableDirectByteBuffer();
this.buffer.putStringWithoutLengthAscii(0, META_PREFIX);
this.nameBuffer = new ExpandableDirectByteBuffer();
this.nameBuffer.putStringWithoutLengthAscii(0, META_PREFIX);

hash.updateHash(correlation.service.value());
hash.updateHash(service.value());
Expand Down Expand Up @@ -162,19 +164,18 @@ private void metadata(
KafkaHeaderFW.Builder builder,
GrpcMetadataFW metadata)
{
int nameLen = metadata.nameLen();
int nameLenWithPrefix = nameLen + META_PREFIX_LENGTH;
buffer.putBytes(META_PREFIX_LENGTH, metadata.name().value(), 0, nameLen);

if (metadata.type().get() == BASE64)
GrpcType type = metadata.type().get();
DirectBuffer name = metadata.name().value();
int nameLen = META_PREFIX_LENGTH + metadata.nameLen();
nameBuffer.putBytes(META_PREFIX_LENGTH, name, 0, name.capacity());
if (type == BASE64)
{
buffer.putStringWithoutLengthAscii(nameLenWithPrefix, BIN_SUFFIX);
nameLenWithPrefix += BIN_SUFFIX_LENGTH;
nameBuffer.putStringWithoutLengthAscii(nameLen, BIN_SUFFIX);
nameLen += BIN_SUFFIX_LENGTH;
}

builder
.nameLen(nameLenWithPrefix)
.name(buffer, 0, nameLenWithPrefix)
.nameLen(nameLen)
.name(nameBuffer, 0, nameLen)
.valueLen(metadata.valueLen())
.value(metadata.value().value(), 0, metadata.valueLen());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ public final class GrpcKafkaProxyFactory implements GrpcKafkaStreamFactory
{
private static final String GRPC_TYPE_NAME = "grpc";
private static final String KAFKA_TYPE_NAME = "kafka";
private static final int META_PREFIX_LENGTH = 5;
private static final byte[] META_PREFIX = "meta:".getBytes();
private static final byte[] BIN_SUFFIX = "-bin".getBytes();
private static final int META_PREFIX_LENGTH = META_PREFIX.length;
private static final int BIN_SUFFIX_LENGTH = BIN_SUFFIX.length;

private static final int DATA_FLAG_INIT = 0x02;
private static final int DATA_FLAG_FIN = 0x01;
Expand All @@ -82,8 +83,8 @@ public final class GrpcKafkaProxyFactory implements GrpcKafkaStreamFactory
private static final String16FW HEADER_VALUE_GRPC_ABORTED = new String16FW("10");
private static final String16FW HEADER_VALUE_GRPC_INTERNAL_ERROR = new String16FW("13");

private final byte[] headerPrefix = new byte[5];
private final byte[] headerSuffix = new byte[4];
private final byte[] headerPrefix = new byte[META_PREFIX_LENGTH];
private final byte[] headerSuffix = new byte[BIN_SUFFIX_LENGTH];

private final Varuint32FW.Builder lenRW =
new Varuint32FW.Builder().wrap(new UnsafeBuffer(new byte[1024 * 8]), 0, 1024 * 8);;
Expand Down Expand Up @@ -1430,21 +1431,21 @@ private void doGrpcBegin(
headers.forEach(h ->
{
final OctetsFW name = h.name();
final DirectBuffer buffer = name.buffer();
final int offset = name.offset();
final int limit = name.limit();

name.buffer().getBytes(offset, headerPrefix);
name.buffer().getBytes(limit - BIN_SUFFIX.length, headerSuffix);

buffer.getBytes(offset, headerPrefix);
buffer.getBytes(limit - BIN_SUFFIX.length, headerSuffix);
if (Arrays.equals(META_PREFIX, headerPrefix))
{
final GrpcType type = Arrays.equals(BIN_SUFFIX, headerSuffix) ? BASE64 : TEXT;
int length = h.nameLen() - META_PREFIX_LENGTH;
final int metadataNameLength = type == BASE64 ? length - BIN_SUFFIX.length : length;

builder.item(m -> m.type(t -> t.set(type))
.nameLen(metadataNameLength)
.name(h.name().value(), META_PREFIX_LENGTH, metadataNameLength)
final int nameLen = type == BASE64
? h.nameLen() - META_PREFIX_LENGTH - BIN_SUFFIX_LENGTH
: h.nameLen() - META_PREFIX_LENGTH;
builder.item(m -> m
.type(t -> t.set(type))
.nameLen(nameLen)
.name(name.value(), META_PREFIX_LENGTH, nameLen)
.valueLen(h.valueLen())
.value(h.value()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;
import java.util.function.Consumer;

import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;

import io.aklivity.zilla.runtime.binding.kafka.grpc.config.KafkaGrpcCorrelationConfig;
Expand All @@ -36,9 +37,10 @@

public final class KafkaGrpcFetchHeaderHelper
{
private static final int META_PREFIX_LENGTH = 5;
private static final byte[] META_PREFIX = "meta:".getBytes();
private static final byte[] BIN_SUFFIX = "-bin".getBytes();
private static final int META_PREFIX_LENGTH = META_PREFIX.length;
private static final int BIN_SUFFIX_LENGTH = BIN_SUFFIX.length;

private final Map<OctetsFW, Consumer<OctetsFW>> visitors;
private final OctetsFW serviceRO = new OctetsFW();
Expand All @@ -48,8 +50,8 @@ public final class KafkaGrpcFetchHeaderHelper
private final Array32FW.Builder<GrpcMetadataFW.Builder, GrpcMetadataFW> grpcMetadataRW =
new Array32FW.Builder<>(new GrpcMetadataFW.Builder(), new GrpcMetadataFW());
private final MutableDirectBuffer metaBuffer;
private final byte[] headerPrefix = new byte[5];
private final byte[] headerSuffix = new byte[4];
private final byte[] headerPrefix = new byte[META_PREFIX_LENGTH];
private final byte[] headerSuffix = new byte[BIN_SUFFIX_LENGTH];

public int partitionId;
public long partitionOffset;
Expand All @@ -60,6 +62,8 @@ public final class KafkaGrpcFetchHeaderHelper
public OctetsFW correlationId;
public Array32FW<GrpcMetadataFW> metadata;

private Array32FW.Builder<GrpcMetadataFW.Builder, GrpcMetadataFW> builder;

public KafkaGrpcFetchHeaderHelper(
KafkaGrpcCorrelationConfig correlation,
MutableDirectBuffer metaBuffer)
Expand All @@ -85,7 +89,7 @@ public void visit(
replyTo = null;
correlationId = null;

grpcMetadataRW.wrap(metaBuffer, 0, metaBuffer.capacity());
builder = grpcMetadataRW.wrap(metaBuffer, 0, metaBuffer.capacity());

if (dataEx != null)
{
Expand Down Expand Up @@ -114,11 +118,11 @@ private boolean dispatch(
{
final OctetsFW name = header.name();
final OctetsFW value = header.value();
final DirectBuffer buffer = name.buffer();
final int offset = name.offset();
final int limit = name.limit();

name.buffer().getBytes(offset, headerPrefix);
name.buffer().getBytes(limit - BIN_SUFFIX.length, headerSuffix);
buffer.getBytes(offset, headerPrefix);
buffer.getBytes(limit - BIN_SUFFIX_LENGTH, headerSuffix);

Comment thread
ankitk-me marked this conversation as resolved.
final Consumer<OctetsFW> visitor = visitors.get(name);
if (visitor != null)
Expand All @@ -128,13 +132,15 @@ private boolean dispatch(
else if (Arrays.equals(META_PREFIX, headerPrefix))
{
final GrpcType type = Arrays.equals(BIN_SUFFIX, headerSuffix) ? BASE64 : TEXT;
int length = header.nameLen() - META_PREFIX_LENGTH;
final int metadataNameLength = type == BASE64 ? length - BIN_SUFFIX.length : length;
grpcMetadataRW.item(m -> m.type(t -> t.set(type))
.nameLen(metadataNameLength)
.name(name.value(), META_PREFIX_LENGTH, metadataNameLength)
final int nameLen = type == BASE64
? header.nameLen() - META_PREFIX_LENGTH - BIN_SUFFIX_LENGTH
: header.nameLen() - META_PREFIX_LENGTH;
builder.item(m -> m
.type(t -> t.set(type))
.nameLen(nameLen)
.name(name.value(), META_PREFIX_LENGTH, nameLen)
.valueLen(header.valueLen())
.value(value));
.value(header.value()));
}
Comment thread
ankitk-me marked this conversation as resolved.

return service != null &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.stream.GrpcDataExFW;
import io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.stream.GrpcMetadataFW;
import io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.stream.GrpcResetExFW;
import io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.stream.GrpcType;
import io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.stream.KafkaBeginExFW;
import io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.stream.KafkaDataExFW;
import io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.stream.KafkaFlushExFW;
Expand All @@ -79,8 +80,8 @@ public final class KafkaGrpcRemoteServerFactory implements KafkaGrpcStreamFactor
private static final String KAFKA_TYPE_NAME = "kafka";
private static final String META_PREFIX = "meta:";
private static final String BIN_SUFFIX = "-bin";
private static final int META_PREFIX_LENGTH = 5;
private static final int BIN_SUFFIX_LENGTH = 4;
private static final int META_PREFIX_LENGTH = META_PREFIX.length();
private static final int BIN_SUFFIX_LENGTH = BIN_SUFFIX.length();

private static final int SIGNAL_INITIATE_KAFKA_STREAM = 1;
private static final int GRPC_QUEUE_MESSAGE_PADDING = 3 * 256 + 33;
Expand Down Expand Up @@ -138,6 +139,7 @@ public final class KafkaGrpcRemoteServerFactory implements KafkaGrpcStreamFactor
private final MutableDirectBuffer writeBuffer;
private final MutableDirectBuffer extBuffer;
private final MutableDirectBuffer metaBuffer;
private final ExpandableDirectByteBuffer nameBuffer;
private final BindingHandler streamFactory;
private final LongUnaryOperator supplyInitialId;
private final LongUnaryOperator supplyReplyId;
Expand Down Expand Up @@ -177,6 +179,8 @@ public KafkaGrpcRemoteServerFactory(
this.grpcTypeId = context.supplyTypeId(GRPC_TYPE_NAME);
this.kafkaTypeId = context.supplyTypeId(KAFKA_TYPE_NAME);
this.groupIdFormat = config.groupIdFormat();
this.nameBuffer = new ExpandableDirectByteBuffer();
this.nameBuffer.putStringWithoutLengthAscii(0, META_PREFIX);
}

@Override
Expand Down Expand Up @@ -1335,7 +1339,6 @@ private final class GrpcClient
private final KafkaRemoteServer server;
private final KafkaCorrelateProxy correlater;
private final OctetsFW correlationId;
private final ExpandableDirectByteBuffer buffer;
private final long originId;
private final long routedId;
private final long initialId;
Expand Down Expand Up @@ -1373,8 +1376,6 @@ private GrpcClient(
this.initialId = supplyInitialId.applyAsLong(routedId);
this.replyId = supplyReplyId.applyAsLong(initialId);
this.correlater = new KafkaCorrelateProxy(originId, resolveId, replyTo, server.condition, this);
this.buffer = new ExpandableDirectByteBuffer();
this.buffer.putStringWithoutLengthAscii(0, META_PREFIX);
}

private int initialPendingAck()
Expand Down Expand Up @@ -1717,19 +1718,18 @@ private void metadata(
KafkaHeaderFW.Builder builder,
GrpcMetadataFW metadata)
{
int nameLen = metadata.nameLen();
int nameLenWithPrefix = nameLen + META_PREFIX_LENGTH;
buffer.putBytes(META_PREFIX_LENGTH, metadata.name().value(), 0, nameLen);

if (metadata.type().get() == BASE64)
GrpcType type = metadata.type().get();
DirectBuffer name = metadata.name().value();
int nameLen = META_PREFIX_LENGTH + metadata.nameLen();
nameBuffer.putBytes(META_PREFIX_LENGTH, name, 0, name.capacity());
if (type == BASE64)
{
buffer.putStringWithoutLengthAscii(nameLenWithPrefix, BIN_SUFFIX);
nameLenWithPrefix += BIN_SUFFIX_LENGTH;
nameBuffer.putStringWithoutLengthAscii(nameLen, BIN_SUFFIX);
nameLen += BIN_SUFFIX_LENGTH;
}

builder
.nameLen(nameLenWithPrefix)
.name(buffer, 0, nameLenWithPrefix)
.nameLen(nameLen)
.name(nameBuffer, 0, nameLen)
.valueLen(metadata.valueLen())
.value(metadata.value().value(), 0, metadata.valueLen());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ write zilla:begin.ext ${grpc:beginEx()
.authority("localhost:8080")
.service("example.EchoService")
.method("EchoBidiStream")
.metadata("BASE64", "custom", "dGVzdA==")
.metadataBase64("custom", "dGVzdA==")
.metadata("idempotency-key", "59410e57-3e0f-4b61-9328-f645a7968ac8")
.build()}
connected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ read zilla:begin.ext ${grpc:matchBeginEx()
.authority("localhost:8080")
.service("example.EchoService")
.method("EchoBidiStream")
.metadata("BASE64", "custom", "dGVzdA==")
.metadataBase64("custom", "dGVzdA==")
.metadata("idempotency-key", "59410e57-3e0f-4b61-9328-f645a7968ac8")
.build()}
connected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ write zilla:begin.ext ${grpc:beginEx()
.authority("localhost:8080")
.service("example.EchoService")
.method("EchoClientStream")
.metadata("BASE64", "custom", "dGVzdA==")
.metadataBase64("custom", "dGVzdA==")
.metadata("idempotency-key", "59410e57-3e0f-4b61-9328-f645a7968ac8")
.build()}
connected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ read zilla:begin.ext ${grpc:matchBeginEx()
.authority("localhost:8080")
.service("example.EchoService")
.method("EchoClientStream")
.metadata("BASE64", "custom", "dGVzdA==")
.metadataBase64("custom", "dGVzdA==")
.metadata("idempotency-key", "59410e57-3e0f-4b61-9328-f645a7968ac8")
.build()}
connected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ write zilla:begin.ext ${grpc:beginEx()
.authority("localhost:8080")
.service("example.EchoService")
.method("EchoServerStream")
.metadata("BASE64", "custom", "dGVzdA==")
.metadataBase64("custom", "dGVzdA==")
.metadata("idempotency-key", "59410e57-3e0f-4b61-9328-f645a7968ac8")
.build()}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ read zilla:begin.ext ${grpc:matchBeginEx()
.authority("localhost:8080")
.service("example.EchoService")
.method("EchoServerStream")
.metadata("BASE64", "custom", "dGVzdA==")
.metadataBase64("custom", "dGVzdA==")
.metadata("idempotency-key", "59410e57-3e0f-4b61-9328-f645a7968ac8")
.build()}
connected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ write zilla:begin.ext ${grpc:beginEx()
.authority("localhost:8080")
.service("example.EchoService")
.method("EchoUnary")
.metadata("BASE64", "custom", "dGVzdA==")
.metadataBase64("custom", "dGVzdA==")
.metadata("idempotency-key", "59410e57-3e0f-4b61-9328-f645a7968ac8")
.build()}
connected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ read zilla:begin.ext ${grpc:matchBeginEx()
.authority("localhost:8080")
.service("example.EchoService")
.method("EchoUnary")
.metadata("BASE64", "custom", "dGVzdA==")
.metadataBase64("custom", "dGVzdA==")
.metadata("idempotency-key", "59410e57-3e0f-4b61-9328-f645a7968ac8")
.build()}
connected
Expand Down
Loading