Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
*/
package io.aklivity.zilla.runtime.binding.grpc.kafka.internal.config;

import static io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.stream.GrpcType.BASE64;

import java.util.List;
import java.util.function.Supplier;

import org.agrona.DirectBuffer;
import org.agrona.ExpandableDirectByteBuffer;
import org.agrona.concurrent.UnsafeBuffer;

import io.aklivity.zilla.runtime.binding.grpc.kafka.config.GrpcKafkaCorrelationConfig;
Expand All @@ -31,9 +34,17 @@
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.KafkaOffsetType;
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.OctetsFW;
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.String16FW;
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.stream.GrpcMetadataFW;
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.stream.GrpcType;

public class GrpcKafkaWithProduceResult
{
public static final String META_PREFIX = "meta:";
public static final String BIN_SUFFIX = "-bin";

private static final int META_PREFIX_LENGTH = META_PREFIX.length();
private static final int BIN_SUFFIX_LENGTH = BIN_SUFFIX.length();

private static final KafkaOffsetFW KAFKA_OFFSET_HISTORICAL =
new KafkaOffsetFW.Builder()
.wrap(new UnsafeBuffer(new byte[32]), 0, 32)
Expand All @@ -50,10 +61,13 @@ public class GrpcKafkaWithProduceResult
private final GrpcKafkaWithProduceHash hash;
private final String16FW service;
private final String16FW method;
private final Array32FW<GrpcMetadataFW> metadata;
private final ExpandableDirectByteBuffer nameBuffer;

GrpcKafkaWithProduceResult(
String16FW service,
String16FW method,
Array32FW<GrpcMetadataFW> metadata,
String16FW topic,
KafkaAckMode acks,
Supplier<DirectBuffer> keyRef,
Expand All @@ -64,13 +78,16 @@ public class GrpcKafkaWithProduceResult
{
this.service = service;
this.method = method;
this.metadata = metadata;
this.overrides = overrides;
this.replyTo = replyTo;
this.correlation = correlation;
this.topic = topic;
this.acks = acks;
this.keyRef = keyRef;
this.hash = hash;
this.nameBuffer = new ExpandableDirectByteBuffer();
this.nameBuffer.putStringWithoutLengthAscii(0, META_PREFIX);

hash.updateHash(correlation.service.value());
hash.updateHash(service.value());
Expand Down Expand Up @@ -140,6 +157,27 @@ public void headers(
builder.item(this::method);
builder.item(this::replyTo);
builder.item(this::correlationId);
metadata.forEach(m -> builder.item(i -> metadata(i, m)));
}

private void metadata(
KafkaHeaderFW.Builder builder,
GrpcMetadataFW metadata)
{
GrpcType type = metadata.type().get();
DirectBuffer name = metadata.name().value();
int nameLen = META_PREFIX_LENGTH + metadata.nameLen();
nameBuffer.putBytes(META_PREFIX_LENGTH, name, 0, name.capacity());
if (type == BASE64)
{
nameBuffer.putStringWithoutLengthAscii(nameLen, BIN_SUFFIX);
nameLen += BIN_SUFFIX_LENGTH;
}
builder
.nameLen(nameLen)
.name(nameBuffer, 0, nameLen)
.valueLen(metadata.valueLen())
.value(metadata.value().value(), 0, metadata.valueLen());
}

private void service(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public GrpcKafkaWithProduceResult resolveProduce(

String16FW replyTo = new String16FW(produce.replyTo);

return new GrpcKafkaWithProduceResult(service, method, topic, acks, keyRef, overrides, replyTo,
return new GrpcKafkaWithProduceResult(service, method, metadata, topic, acks, keyRef, overrides, replyTo,
options.correlation, hash);
}

Expand Down
Loading