Skip to content
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
2cef977
Adjust padding to accommodate good enough headers and don't include …
akrambek Oct 25, 2023
d201582
Merge branch 'develop' into feature/consumer-group-cont
akrambek Oct 25, 2023
76bf9de
Merge branch 'feature/consumer-group-cont' into develop
akrambek Oct 26, 2023
29ae79c
Merge branch 'aklivity:develop' into develop
akrambek Oct 30, 2023
ec1b39e
Merge branch 'aklivity:develop' into develop
akrambek Oct 30, 2023
51a9f0e
Merge branch 'aklivity:develop' into develop
akrambek Oct 31, 2023
4394783
Merge branch 'aklivity:develop' into develop
akrambek Oct 31, 2023
e8696ce
Merge branch 'aklivity:develop' into develop
akrambek Nov 2, 2023
51c37b1
Merge branch 'aklivity:develop' into develop
akrambek Nov 2, 2023
5da5f04
Merge branch 'aklivity:develop' into develop
akrambek Nov 2, 2023
db1e17c
Merge branch 'aklivity:develop' into develop
akrambek Nov 4, 2023
40f73dc
Merge branch 'aklivity:develop' into develop
akrambek Nov 6, 2023
d1a0492
Merge branch 'aklivity:develop' into develop
akrambek Nov 23, 2023
45799ce
Merge branch 'aklivity:develop' into develop
akrambek Nov 29, 2023
b6f7884
WIP
akrambek Nov 29, 2023
3660eec
Fix offset commit
akrambek Nov 29, 2023
b26ea7f
Merge branch 'bug/commit' into feature/kafka-grpc-group
akrambek Nov 29, 2023
d98fb23
Adjust the scripts
akrambek Nov 29, 2023
cf2cbb9
Refactor
akrambek Nov 30, 2023
ef40877
Merge branch 'bug/commit' into feature/kafka-grpc-group
akrambek Nov 30, 2023
a613924
WIP
akrambek Nov 30, 2023
1e55162
Merge branch 'aklivity:develop' into develop
akrambek Nov 30, 2023
946f01d
WIP
akrambek Nov 30, 2023
45dc9e8
Merge branch 'develop' into feature/kafka-grpc-group
akrambek Nov 30, 2023
211d7c7
WIP
akrambek Nov 30, 2023
7281108
Fix typo
akrambek Nov 30, 2023
2a07377
WIP
akrambek Dec 2, 2023
fedc41f
Merge branch 'aklivity:develop' into develop
akrambek Dec 4, 2023
18a8d74
Merge branch 'aklivity:develop' into develop
akrambek Dec 4, 2023
f160aad
Merge branch 'aklivity:develop' into develop
akrambek Dec 4, 2023
e0e7d5a
Merge branch 'aklivity:develop' into develop
akrambek Dec 6, 2023
9f4a8a6
Merge branch 'aklivity:develop' into develop
akrambek Dec 8, 2023
c0d096f
Merge branch 'develop' into feature/kafka-grpc-group
akrambek Dec 8, 2023
512bedc
Apply feedback from PR
akrambek Dec 8, 2023
456f111
Merge branch 'aklivity:develop' into develop
akrambek Dec 8, 2023
d466096
Merge branch 'develop' into feature/kafka-grpc-group
akrambek Dec 8, 2023
8c2be5c
Apply feedback from PR
akrambek Dec 9, 2023
0d27262
Merge branch 'aklivity:develop' into develop
akrambek Dec 9, 2023
4436e2d
Merge branch 'develop' into feature/kafka-grpc-group
akrambek Dec 9, 2023
0457c75
Apply feedback from PR
akrambek Dec 9, 2023
1c0a33c
Fix formatting
akrambek Dec 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.aklivity.zilla.runtime.binding.kafka.grpc.config.KafkaGrpcCorrelationConfig;
import io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.Array32FW;
import io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.KafkaHeaderFW;
import io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.KafkaOffsetFW;
import io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.OctetsFW;
import io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.stream.KafkaDataExFW;
import io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.stream.KafkaMergedFetchDataExFW;
Expand All @@ -32,6 +33,8 @@ public final class KafkaGrpcFetchHeaderHelper
private final OctetsFW methodRO = new OctetsFW();
private final OctetsFW replyToRO = new OctetsFW();
private final OctetsFW correlatedIdRO = new OctetsFW();
public int partitionId;
public long partitionOffset;

public OctetsFW service;
public OctetsFW method;
Expand Down Expand Up @@ -65,6 +68,11 @@ public void visit(
{
final KafkaMergedFetchDataExFW kafkaMergedFetchDataEx = dataEx.merged().fetch();
final Array32FW<KafkaHeaderFW> headers = kafkaMergedFetchDataEx.headers();
final KafkaOffsetFW partition = kafkaMergedFetchDataEx.partition();

partitionId = partition.partitionId();
partitionOffset = partition.partitionOffset();

headers.forEach(this::dispatch);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,15 @@
import io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.stream.DataFW;
import io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.stream.EndFW;
import io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.stream.ExtensionFW;
import io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.stream.FlushFW;
import io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.stream.GrpcAbortExFW;
import io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.stream.GrpcBeginExFW;
import io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.stream.GrpcDataExFW;
import io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.stream.GrpcResetExFW;
import io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.stream.KafkaBeginExFW;
import io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.stream.KafkaDataExFW;
import io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.stream.KafkaFlushExFW;
import io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.stream.ResetFW;
import io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.stream.SignalFW;
import io.aklivity.zilla.runtime.binding.kafka.grpc.internal.types.stream.WindowFW;
import io.aklivity.zilla.runtime.engine.EngineContext;
import io.aklivity.zilla.runtime.engine.binding.BindingHandler;
Expand All @@ -78,7 +79,6 @@ public final class KafkaGrpcRemoteServerFactory implements KafkaGrpcStreamFactor

private static final String16FW HEADER_VALUE_GRPC_OK = new String16FW("0");
private static final String16FW HEADER_VALUE_GRPC_ABORTED = new String16FW("10");
private static final String16FW HEADER_VALUE_GRPC_UNIMPLEMENTED = new String16FW("12");
private static final String16FW HEADER_VALUE_GRPC_INTERNAL_ERROR = new String16FW("13");

private final OctetsFW emptyRO = new OctetsFW().wrap(new UnsafeBuffer(0L, 0), 0, 0);
Expand All @@ -92,26 +92,23 @@ public final class KafkaGrpcRemoteServerFactory implements KafkaGrpcStreamFactor
private final DataFW.Builder dataRW = new DataFW.Builder();
private final EndFW.Builder endRW = new EndFW.Builder();
private final AbortFW.Builder abortRW = new AbortFW.Builder();
private final FlushFW.Builder flushRW = new FlushFW.Builder();

private final WindowFW windowRO = new WindowFW();
private final ResetFW resetRO = new ResetFW();
private final SignalFW signalRO = new SignalFW();

private final WindowFW.Builder windowRW = new WindowFW.Builder();
private final ResetFW.Builder resetRW = new ResetFW.Builder();

private final GrpcBeginExFW grpcBeginExRO = new GrpcBeginExFW();
private final GrpcDataExFW grpcDataExRO = new GrpcDataExFW();
private final GrpcResetExFW resetExRO = new GrpcResetExFW();
private final GrpcAbortExFW abortExRO = new GrpcAbortExFW();

private final ExtensionFW extensionRO = new ExtensionFW();
private final KafkaBeginExFW kafkaBeginExRO = new KafkaBeginExFW();
private final KafkaDataExFW kafkaDataExRO = new KafkaDataExFW();
private final KafkaFlushExFW.Builder kafkaFlushExRW = new KafkaFlushExFW.Builder();
private final GrpcQueueMessageFW queueMessageRO = new GrpcQueueMessageFW();

private final GrpcBeginExFW.Builder grpcBeginExRW = new GrpcBeginExFW.Builder();
private final GrpcDataExFW.Builder grpcDataExRW = new GrpcDataExFW.Builder();
private final GrpcResetExFW.Builder grpcResetExRW = new GrpcResetExFW.Builder();
private final GrpcAbortExFW.Builder grpcAbortExRW = new GrpcAbortExFW.Builder();

Expand Down Expand Up @@ -284,16 +281,6 @@ private void removeIfClosed(
}
}

private int replyPendingAck()
{
return (int)(replySeq - replyAck);
}

private int replyWindow()
{
return replyMax - replyPendingAck();
}

private void doKafkaBegin(
long traceId,
long authorization,
Expand All @@ -308,19 +295,6 @@ private void doKafkaBegin(
}
}

private void doKafkaEnd(
long traceId,
long authorization)
{
if (!KafkaGrpcState.initialClosed(state))
{
state = KafkaGrpcState.closeInitial(state);

doEnd(kafka, originId, routedId, initialId, initialSeq, initialAck, initialMax,
traceId, authorization);
}
}

private void doKafkaAbort(
long traceId,
long authorization)
Expand Down Expand Up @@ -412,14 +386,17 @@ private void onKafkaData(

assert replyAck <= replySeq;

if ((flags & DATA_FLAG_INIT) != 0x00 &&
payload != null)
if ((flags & DATA_FLAG_INIT) != 0x00)
{
final ExtensionFW dataEx = extension.get(extensionRO::tryWrap);
final KafkaDataExFW kafkaDataEx =
dataEx != null && dataEx.typeId() == kafkaTypeId ? extension.get(kafkaDataExRO::tryWrap) : null;
final KafkaDataExFW kafkaDataEx = dataEx != null && dataEx.typeId() == kafkaTypeId ?
extension.get(kafkaDataExRO::tryWrap) : null;

helper.visit(kafkaDataEx);
}

if ((flags & DATA_FLAG_INIT) != 0x00 && payload != null)
{
if (helper.resolved())
{
GrpcClient grpcClient = grpcClients.get(helper.correlationId);
Expand All @@ -435,8 +412,8 @@ private void onKafkaData(
helper.replyTo, newCorrelationId);
}

flushGrpcClientData(grpcClient, traceId, authorization, helper.service, helper.method, flags,
reserved, payload);
flushGrpcClientData(grpcClient, traceId, authorization, helper.service, helper.method,
helper.partitionId, helper.partitionOffset, flags, reserved, payload);
}
else if (helper.correlationId != null)
{
Expand All @@ -448,8 +425,8 @@ else if (helper.correlationId != null)
GrpcClient grpcClient = grpcClients.get(lastCorrelationId);
if (grpcClient != null)
{
flushGrpcClientData(grpcClient, traceId, authorization, null, null, flags,
reserved, payload);
flushGrpcClientData(grpcClient, traceId, authorization, null, null,
helper.partitionId, helper.partitionOffset, flags, reserved, payload);
}
}

Expand Down Expand Up @@ -491,6 +468,8 @@ private void flushGrpcMessagesIfBuffered(
final OctetsFW method = queueMessage.method();
final long messageTraceId = queueMessage.traceId();
final long messageAuthorization = queueMessage.authorization();
final int partitionId = queueMessage.partitionId();
final long partitionOffset = queueMessage.partitionOffset();
final int flags = queueMessage.flags();
final int reserved = queueMessage.reserved();
final int valueLength = queueMessage.valueLength();
Expand All @@ -507,7 +486,7 @@ private void flushGrpcMessagesIfBuffered(
newGrpcClient(traceId, authorization, service, method, helper.replyTo, messageCorrelationId);

final int progress = grpcClient.onKafkaData(messageTraceId, messageAuthorization,
flags, payload);
partitionId, partitionOffset, flags, payload);

if (progress == valueLength)
{
Expand All @@ -521,8 +500,8 @@ private void flushGrpcMessagesIfBuffered(
else if (progress > 0)
{
final int remainingPayload = queuedMessageSize - progress;
queueGrpcMessage(traceId, authorization, lastCorrelationId, service, method, flags, reserved,
payload, remainingPayload);
queueGrpcMessage(traceId, authorization, partitionId, partitionOffset, lastCorrelationId,
service, method, flags, reserved, payload, remainingPayload);
final int remainingMessageOffset = grpcQueueSlotOffset - progressOffset;
grpcQueueBuffer.putBytes(oldProgressOffset, grpcQueueBuffer, progressOffset, remainingMessageOffset);
grpcQueueSlotOffset -= queuedMessageSize;
Expand All @@ -546,11 +525,14 @@ private void flushGrpcClientData(
long authorization,
OctetsFW service,
OctetsFW method,
int partitionId,
long partitionOffset,
int flags,
int reserved,
OctetsFW payload)
{
final int progress = grpcClient.onKafkaData(traceId, authorization, flags, payload);
final int progress = grpcClient.onKafkaData(traceId, authorization, partitionId, partitionOffset,
flags, payload);
int length = payload != null ? payload.sizeof() : 0;
final int remaining = length - progress;

Expand All @@ -565,14 +547,16 @@ private void flushGrpcClientData(
{
flags = progress == 0 ? flags : DATA_FLAG_CON;
payload = payload == null ? emptyRO : payload;
queueGrpcMessage(traceId, authorization, grpcClient.correlationId, service, method,
flags, reserved, payload, remaining);
queueGrpcMessage(traceId, authorization, partitionId, partitionOffset,
grpcClient.correlationId, service, method, flags, reserved, payload, remaining);
}
}

private void queueGrpcMessage(
long traceId,
long authorization,
int partitionId,
long partitionOffset,
OctetsFW correlationId,
OctetsFW service,
OctetsFW method,
Expand All @@ -590,6 +574,8 @@ private void queueGrpcMessage(
.method(method)
.traceId(traceId)
.authorization(authorization)
.partitionId(partitionId)
.partitionOffset(partitionOffset)
.flags(flags)
.reserved(reserved)
.value(payload.buffer(), payload.offset(), length)
Expand All @@ -598,6 +584,25 @@ private void queueGrpcMessage(
grpcQueueSlotOffset = queueMessage.limit();
}

private void doKafkaCommitOffset(
long traceId,
long authorization,
int partitionId,
long partitionOffset)
{

Flyweight commitFlushEx = kafkaFlushExRW
.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(kafkaTypeId)
.merged(m -> m.consumer(mc -> mc
.progress(p -> p
.partitionId(partitionId)
.partitionOffset(partitionOffset + 1L))))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please hoist this as a local variable called nextPartitionOffset to better document the code.

.build();

doKafkaFlush(traceId, authorization, 0, 0, commitFlushEx);
}

private void cleanupQueueSlotIfNecessary()
{
if (grpcQueueSlot != NO_SLOT && grpcQueueSlotOffset == 0)
Expand Down Expand Up @@ -706,6 +711,21 @@ private void doKafkaWindow(
doWindow(kafka, originId, routedId, replyId, replySeq, replyAck, replyMax,
traceId, authorization, replyBud, replyPad, replyCap);
}

private void doKafkaFlush(
long traceId,
long authorization,
long budgetId,
int reserved,
Flyweight extension)
{
doFlush(kafka, originId, routedId, initialId, initialSeq, initialAck, initialMax,
traceId, authorization, budgetId, reserved, extension);

initialSeq += reserved;

assert initialSeq <= initialAck + initialMax;
}
}

private final class KafkaCorrelateProxy
Expand Down Expand Up @@ -1525,6 +1545,8 @@ private void onKafkaAbort(
private int onKafkaData(
long traceId,
long authorization,
int partitionId,
long partitionOffset,
int flags,
OctetsFW payload)
{
Expand All @@ -1537,6 +1559,8 @@ private int onKafkaData(
doGrpcData(traceId, authorization, initialBud, length + initialPad,
newFlags, payload.value(), 0, length);

server.doKafkaCommitOffset(traceId, authorization, partitionId, partitionOffset);

if ((newFlags & DATA_FLAG_FIN) != 0x00) // FIN
{
state = KafkaGrpcState.closingInitial(state);
Expand All @@ -1546,6 +1570,8 @@ private int onKafkaData(
if ((payload == null || payload.equals(emptyRO)) &&
KafkaGrpcState.initialClosing(state))
{
server.doKafkaCommitOffset(traceId, authorization, partitionId, partitionOffset);

doGrpcEnd(traceId, authorization);
}

Expand Down Expand Up @@ -1683,34 +1709,6 @@ private void doGrpcReset(
}
}

private void doBegin(
MessageConsumer receiver,
long originId,
long routedId,
long streamId,
long sequence,
long acknowledge,
int maximum,
long traceId,
long authorization,
long affinity,
Flyweight extension)
{
final BeginFW begin = beginRW.wrap(writeBuffer, 0, writeBuffer.capacity())
.originId(originId)
.routedId(routedId)
.streamId(streamId)
.sequence(sequence)
.acknowledge(acknowledge)
.maximum(maximum)
.traceId(traceId)
.authorization(authorization)
.affinity(affinity)
.extension(extension.buffer(), extension.offset(), extension.sizeof())
.build();

receiver.accept(begin.typeId(), begin.buffer(), begin.offset(), begin.sizeof());
}

private void doData(
MessageConsumer receiver,
Expand Down Expand Up @@ -1836,6 +1834,37 @@ private void doAbort(
receiver.accept(abort.typeId(), abort.buffer(), abort.offset(), abort.sizeof());
}

private void doFlush(
MessageConsumer receiver,
long originId,
long routedId,
long streamId,
long sequence,
long acknowledge,
int maximum,
long traceId,
long authorization,
long budgetId,
int reserved,
Flyweight extension)
{
final FlushFW flush = flushRW.wrap(writeBuffer, 0, writeBuffer.capacity())
.originId(originId)
.routedId(routedId)
.streamId(streamId)
.sequence(sequence)
.acknowledge(acknowledge)
.maximum(maximum)
.traceId(traceId)
.authorization(authorization)
.budgetId(budgetId)
.reserved(reserved)
.extension(extension.buffer(), extension.offset(), extension.sizeof())
.build();

receiver.accept(flush.typeId(), flush.buffer(), flush.offset(), flush.sizeof());
}

private MessageConsumer newKafkaProducer(
MessageConsumer sender,
long originId,
Expand Down Expand Up @@ -1956,6 +1985,7 @@ private MessageConsumer newKafkaFetch(
.typeId(kafkaTypeId)
.merged(m -> m.capabilities(c -> c.set(FETCH_ONLY))
.topic(condition.topic())
.groupId("zilla-kafka-grpc")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be configurable, I'm thinking as part of KafkaConfiguration, agree?

.partitions(condition::partitions)
.filters(condition::filters))
.build();
Expand Down
Loading