Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
GrpcFunctions matcher bug fix & binary metadata IT
  • Loading branch information
ankitk-me committed Jul 4, 2024
commit fbad7fa2fdbad02b6c93f8019c34524efc89399a
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
*/
package io.aklivity.zilla.runtime.binding.grpc.kafka.internal.config;

import static io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.stream.GrpcType.BASE64;

import java.util.List;
import java.util.function.Supplier;

Expand All @@ -37,6 +39,9 @@
public class GrpcKafkaWithProduceResult
{
public static final String META_PREFIX = "meta:";
public static final String BIN_SUFFIX = "-bin";
private static final int META_PREFIX_LENGTH = 5;
private static final int BIN_SUFFIX_LENGTH = 4;

private static final KafkaOffsetFW KAFKA_OFFSET_HISTORICAL =
new KafkaOffsetFW.Builder()
Expand Down Expand Up @@ -158,8 +163,14 @@ private void metadata(
GrpcMetadataFW metadata)
{
int nameLen = metadata.nameLen();
int nameLenWithPrefix = nameLen + 5;
buffer.putBytes(5, metadata.name().value(), 0, nameLen);
int nameLenWithPrefix = nameLen + META_PREFIX_LENGTH;
buffer.putBytes(META_PREFIX_LENGTH, metadata.name().value(), 0, nameLen);

if (metadata.type().get() == BASE64)
{
buffer.putStringWithoutLengthAscii(nameLenWithPrefix, BIN_SUFFIX);
nameLenWithPrefix += BIN_SUFFIX_LENGTH;
}

builder
.nameLen(nameLenWithPrefix)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
*/
package io.aklivity.zilla.runtime.binding.grpc.kafka.internal.stream;

import static io.aklivity.zilla.runtime.binding.grpc.kafka.internal.config.GrpcKafkaWithProduceResult.META_PREFIX;
import static io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.KafkaCapabilities.FETCH_ONLY;
import static io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.KafkaCapabilities.PRODUCE_ONLY;
import static io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.stream.GrpcType.BASE64;
Expand Down Expand Up @@ -71,9 +70,7 @@ public final class GrpcKafkaProxyFactory implements GrpcKafkaStreamFactory
private static final String GRPC_TYPE_NAME = "grpc";
private static final String KAFKA_TYPE_NAME = "kafka";
private static final int META_PREFIX_LENGTH = 5;
private static final byte[] HEADER_META_PREFIX = new byte[5];
private static final byte[] META_PREFIX = "meta:".getBytes();
private static final byte[] HEADER_BIN_SUFFIX = new byte[4];
private static final byte[] BIN_SUFFIX = "-bin".getBytes();

private static final int DATA_FLAG_INIT = 0x02;
Expand All @@ -85,6 +82,9 @@ public final class GrpcKafkaProxyFactory implements GrpcKafkaStreamFactory
private static final String16FW HEADER_VALUE_GRPC_ABORTED = new String16FW("10");
private static final String16FW HEADER_VALUE_GRPC_INTERNAL_ERROR = new String16FW("13");

private final byte[] headerPrefix = new byte[5];
private final byte[] headerSuffix = new byte[4];
Comment thread
ankitk-me marked this conversation as resolved.
Outdated

private final Varuint32FW.Builder lenRW =
new Varuint32FW.Builder().wrap(new UnsafeBuffer(new byte[1024 * 8]), 0, 1024 * 8);;

Expand Down Expand Up @@ -1433,16 +1433,18 @@ private void doGrpcBegin(
final int offset = name.offset();
final int limit = name.limit();

name.buffer().getBytes(offset, HEADER_META_PREFIX);
name.buffer().getBytes(limit - BIN_SUFFIX.length, HEADER_BIN_SUFFIX);
name.buffer().getBytes(offset, headerPrefix);
name.buffer().getBytes(limit - BIN_SUFFIX.length, headerSuffix);

if (Arrays.equals(META_PREFIX, HEADER_META_PREFIX))
if (Arrays.equals(META_PREFIX, headerPrefix))
{
final GrpcType type = Arrays.equals(BIN_SUFFIX, HEADER_BIN_SUFFIX) ? BASE64 : TEXT;
final GrpcType type = Arrays.equals(BIN_SUFFIX, headerSuffix) ? BASE64 : TEXT;
int length = h.nameLen() - META_PREFIX_LENGTH;
final int metadataNameLength = type == BASE64 ? length - BIN_SUFFIX.length : length;

builder.item(m -> m.type(t -> t.set(type))
.nameLen(h.nameLen() - META_PREFIX_LENGTH)
.name(h.name().value(), META_PREFIX_LENGTH, h.nameLen() - META_PREFIX_LENGTH)
.nameLen(metadataNameLength)
.name(h.name().value(), META_PREFIX_LENGTH, metadataNameLength)
.valueLen(h.valueLen())
.value(h.value()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@
public final class KafkaGrpcFetchHeaderHelper
{
private static final int META_PREFIX_LENGTH = 5;
private static final byte[] HEADER_META_PREFIX = new byte[5];
private static final byte[] META_PREFIX = "meta:".getBytes();
private static final byte[] HEADER_BIN_SUFFIX = new byte[4];
private static final byte[] BIN_SUFFIX = "-bin".getBytes();

private final Map<OctetsFW, Consumer<OctetsFW>> visitors;
Expand All @@ -50,6 +48,8 @@ public final class KafkaGrpcFetchHeaderHelper
private final Array32FW.Builder<GrpcMetadataFW.Builder, GrpcMetadataFW> grpcMetadataRW =
new Array32FW.Builder<>(new GrpcMetadataFW.Builder(), new GrpcMetadataFW());
private final MutableDirectBuffer metaBuffer;
private final byte[] headerPrefix = new byte[5];
private final byte[] headerSuffix = new byte[4];

public int partitionId;
public long partitionOffset;
Expand Down Expand Up @@ -117,20 +117,22 @@ private boolean dispatch(
final int offset = name.offset();
final int limit = name.limit();

name.buffer().getBytes(offset, HEADER_META_PREFIX);
name.buffer().getBytes(limit - BIN_SUFFIX.length, HEADER_BIN_SUFFIX);
name.buffer().getBytes(offset, headerPrefix);
name.buffer().getBytes(limit - BIN_SUFFIX.length, headerSuffix);

Comment thread
ankitk-me marked this conversation as resolved.
final Consumer<OctetsFW> visitor = visitors.get(name);
if (visitor != null)
{
visitor.accept(value);
}
else if (Arrays.equals(META_PREFIX, HEADER_META_PREFIX))
else if (Arrays.equals(META_PREFIX, headerPrefix))
{
final GrpcType type = Arrays.equals(BIN_SUFFIX, HEADER_BIN_SUFFIX) ? BASE64 : TEXT;
final GrpcType type = Arrays.equals(BIN_SUFFIX, headerSuffix) ? BASE64 : TEXT;
int length = header.nameLen() - META_PREFIX_LENGTH;
final int metadataNameLength = type == BASE64 ? length - BIN_SUFFIX.length : length;
grpcMetadataRW.item(m -> m.type(t -> t.set(type))
.nameLen(header.nameLen() - META_PREFIX_LENGTH)
.name(name.value(), META_PREFIX_LENGTH, header.nameLen() - META_PREFIX_LENGTH)
.nameLen(metadataNameLength)
.name(name.value(), META_PREFIX_LENGTH, metadataNameLength)
.valueLen(header.valueLen())
.value(value));
}
Comment thread
ankitk-me marked this conversation as resolved.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import static io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.KafkaCapabilities.FETCH_ONLY;
import static io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.KafkaCapabilities.PRODUCE_ONLY;
import static io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.stream.GrpcType.BASE64;
import static io.aklivity.zilla.runtime.engine.budget.BudgetDebitor.NO_DEBITOR_INDEX;
import static io.aklivity.zilla.runtime.engine.buffer.BufferPool.NO_SLOT;
import static io.aklivity.zilla.runtime.engine.concurrent.Signaler.NO_CANCEL_ID;
Expand Down Expand Up @@ -77,6 +78,9 @@ public final class KafkaGrpcRemoteServerFactory implements KafkaGrpcStreamFactor
private static final String GRPC_TYPE_NAME = "grpc";
private static final String KAFKA_TYPE_NAME = "kafka";
private static final String META_PREFIX = "meta:";
private static final String BIN_SUFFIX = "-bin";
private static final int META_PREFIX_LENGTH = 5;
private static final int BIN_SUFFIX_LENGTH = 4;

private static final int SIGNAL_INITIATE_KAFKA_STREAM = 1;
private static final int GRPC_QUEUE_MESSAGE_PADDING = 3 * 256 + 33;
Expand Down Expand Up @@ -1714,8 +1718,14 @@ private void metadata(
GrpcMetadataFW metadata)
{
int nameLen = metadata.nameLen();
int nameLenWithPrefix = nameLen + 5;
buffer.putBytes(5, metadata.name().value(), 0, nameLen);
int nameLenWithPrefix = nameLen + META_PREFIX_LENGTH;
buffer.putBytes(META_PREFIX_LENGTH, metadata.name().value(), 0, nameLen);

if (metadata.type().get() == BASE64)
{
buffer.putStringWithoutLengthAscii(nameLenWithPrefix, BIN_SUFFIX);
nameLenWithPrefix += BIN_SUFFIX_LENGTH;
}

builder
.nameLen(nameLenWithPrefix)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ write zilla:begin.ext ${grpc:beginEx()
.authority("localhost:8080")
.service("example.EchoService")
.method("EchoBidiStream")
.metadata("custom", "test")
.metadata("BASE64", "custom", "dGVzdA==")
Comment thread
ankitk-me marked this conversation as resolved.
Outdated
.metadata("idempotency-key", "59410e57-3e0f-4b61-9328-f645a7968ac8")
.build()}
connected
Expand All @@ -42,6 +42,7 @@ write flush
read zilla:begin.ext ${grpc:matchBeginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.metadata("BASE64", "customProperty", "dGVzdA==")
.build()}

read ${grpc:protobuf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ read zilla:begin.ext ${grpc:matchBeginEx()
.authority("localhost:8080")
.service("example.EchoService")
.method("EchoBidiStream")
.metadata("custom", "test")
.metadata("BASE64", "custom", "dGVzdA==")
.metadata("idempotency-key", "59410e57-3e0f-4b61-9328-f645a7968ac8")
.build()}
connected
Expand All @@ -40,6 +40,7 @@ read ${grpc:protobuf()
write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.metadata("BASE64", "customProperty", "dGVzdA==")
.build()}

write ${grpc:protobuf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ write zilla:begin.ext ${grpc:beginEx()
.authority("localhost:8080")
.service("example.EchoService")
.method("EchoClientStream")
.metadata("custom", "test")
.metadata("BASE64", "custom", "dGVzdA==")
.metadata("idempotency-key", "59410e57-3e0f-4b61-9328-f645a7968ac8")
.build()}
connected
Expand All @@ -44,6 +44,7 @@ write close
read zilla:begin.ext ${grpc:matchBeginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.metadata("BASE64", "customProperty", "dGVzdA==")
.build()}

read ${grpc:protobuf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ read zilla:begin.ext ${grpc:matchBeginEx()
.authority("localhost:8080")
.service("example.EchoService")
.method("EchoClientStream")
.metadata("custom", "test")
.metadata("BASE64", "custom", "dGVzdA==")
.metadata("idempotency-key", "59410e57-3e0f-4b61-9328-f645a7968ac8")
.build()}
connected
Expand All @@ -42,6 +42,7 @@ read closed
write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.metadata("BASE64", "customProperty", "dGVzdA==")
.build()}

write ${grpc:protobuf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ write zilla:begin.ext ${grpc:beginEx()
.authority("localhost:8080")
.service("example.EchoService")
.method("EchoServerStream")
.metadata("custom", "test")
.metadata("BASE64", "custom", "dGVzdA==")
.metadata("idempotency-key", "59410e57-3e0f-4b61-9328-f645a7968ac8")
.build()}

Expand All @@ -40,6 +40,7 @@ write close
read zilla:begin.ext ${grpc:matchBeginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.metadata("BASE64", "customProperty", "dGVzdA==")
.build()}

read ${grpc:protobuf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ read zilla:begin.ext ${grpc:matchBeginEx()
.authority("localhost:8080")
.service("example.EchoService")
.method("EchoServerStream")
.metadata("custom", "test")
.metadata("BASE64", "custom", "dGVzdA==")
.metadata("idempotency-key", "59410e57-3e0f-4b61-9328-f645a7968ac8")
.build()}
connected
Expand All @@ -38,6 +38,7 @@ read closed
write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.metadata("BASE64", "customProperty", "dGVzdA==")
.build()}

write ${grpc:protobuf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ write zilla:begin.ext ${grpc:beginEx()
.authority("localhost:8080")
.service("example.EchoService")
.method("EchoUnary")
.metadata("custom", "test")
.metadata("BASE64", "custom", "dGVzdA==")
.metadata("idempotency-key", "59410e57-3e0f-4b61-9328-f645a7968ac8")
.build()}
connected
Expand All @@ -39,6 +39,7 @@ write close
read zilla:begin.ext ${grpc:matchBeginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.metadata("BASE64", "customProperty", "dGVzdA==")
.build()}

read ${grpc:protobuf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ read zilla:begin.ext ${grpc:matchBeginEx()
.authority("localhost:8080")
.service("example.EchoService")
.method("EchoUnary")
.metadata("custom", "test")
.metadata("BASE64", "custom", "dGVzdA==")
.metadata("idempotency-key", "59410e57-3e0f-4b61-9328-f645a7968ac8")
.build()}
connected
Expand All @@ -38,6 +38,7 @@ read closed
write zilla:begin.ext ${grpc:beginEx()
.typeId(zilla:id("grpc"))
.metadata("custom", "value")
.metadata("BASE64", "customProperty", "dGVzdA==")
.build()}

write ${grpc:protobuf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ write zilla:data.ext ${kafka:dataEx()
.header("zilla:method", "EchoBidiStream")
.header("zilla:reply-to", "responses")
.header("zilla:correlation-id", "59410e57-3e0f-4b61-9328-f645a7968ac8-14f43bae85c2ce394c9bd81ff6fa9d77")
.header("meta:custom", "test")
.header("meta:custom-bin", "dGVzdA==")
.header("meta:idempotency-key", "59410e57-3e0f-4b61-9328-f645a7968ac8")
.build()
.build()}
Expand All @@ -62,7 +62,7 @@ write zilla:data.ext ${kafka:dataEx()
.header("zilla:method", "EchoBidiStream")
.header("zilla:reply-to", "responses")
.header("zilla:correlation-id", "59410e57-3e0f-4b61-9328-f645a7968ac8-14f43bae85c2ce394c9bd81ff6fa9d77")
.header("meta:custom", "test")
.header("meta:custom-bin", "dGVzdA==")
.header("meta:idempotency-key", "59410e57-3e0f-4b61-9328-f645a7968ac8")
.build()
.build()}
Expand All @@ -85,7 +85,7 @@ write zilla:data.ext ${kafka:dataEx()
.header("zilla:method", "EchoBidiStream")
.header("zilla:reply-to", "responses")
.header("zilla:correlation-id", "59410e57-3e0f-4b61-9328-f645a7968ac8-14f43bae85c2ce394c9bd81ff6fa9d77")
.header("meta:custom", "test")
.header("meta:custom-bin", "dGVzdA==")
.header("meta:idempotency-key", "59410e57-3e0f-4b61-9328-f645a7968ac8")
.build()
.build()}
Expand Down Expand Up @@ -123,6 +123,7 @@ read zilla:data.ext ${kafka:matchDataEx()
.progress(1, 1)
.key("test")
.header("meta:custom", "value")
.header("meta:customProperty-bin", "dGVzdA==")
.build()
.build()}
read ${grpc:protobuf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ read zilla:data.ext ${kafka:matchDataEx()
.header("zilla:method", "EchoBidiStream")
.header("zilla:reply-to", "responses")
.header("zilla:correlation-id", "59410e57-3e0f-4b61-9328-f645a7968ac8-14f43bae85c2ce394c9bd81ff6fa9d77")
.header("meta:custom", "test")
.header("meta:custom-bin", "dGVzdA==")
.header("meta:idempotency-key", "59410e57-3e0f-4b61-9328-f645a7968ac8")
.build()
.build()}
Expand All @@ -63,7 +63,7 @@ read zilla:data.ext ${kafka:matchDataEx()
.header("zilla:method", "EchoBidiStream")
.header("zilla:reply-to", "responses")
.header("zilla:correlation-id", "59410e57-3e0f-4b61-9328-f645a7968ac8-14f43bae85c2ce394c9bd81ff6fa9d77")
.header("meta:custom", "test")
.header("meta:custom-bin", "dGVzdA==")
.header("meta:idempotency-key", "59410e57-3e0f-4b61-9328-f645a7968ac8")
.build()
.build()}
Expand All @@ -83,7 +83,7 @@ read zilla:data.ext ${kafka:matchDataEx()
.header("zilla:method", "EchoBidiStream")
.header("zilla:reply-to", "responses")
.header("zilla:correlation-id", "59410e57-3e0f-4b61-9328-f645a7968ac8-14f43bae85c2ce394c9bd81ff6fa9d77")
.header("meta:custom", "test")
.header("meta:custom-bin", "dGVzdA==")
.header("meta:idempotency-key", "59410e57-3e0f-4b61-9328-f645a7968ac8")
.build()
.build()}
Expand Down Expand Up @@ -118,6 +118,7 @@ write zilla:data.ext ${kafka:dataEx()
.progress(1, 1)
.key("test")
.header("meta:custom", "value")
.header("meta:customProperty-bin", "dGVzdA==")
.build()
.build()}
write ${grpc:protobuf()
Expand Down
Loading