Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public final class KafkaCachePartition
private static final long NO_ANCESTOR_OFFSET = -1L;
private static final long NO_DESCENDANT_OFFSET = -1L;
private static final int NO_SEQUENCE = -1;
private static final short NO_PRODUCER_ID = -1;
private static final short NO_PRODUCER_EPOCH = -1;
private static final int NO_ACKNOWLEDGE = 0;
private static final int NO_DELTA_POSITION = -1;

Expand All @@ -98,7 +100,8 @@ public final class KafkaCachePartition
private final KafkaCacheEntryFW logEntryRO = new KafkaCacheEntryFW();
private final KafkaCacheDeltaFW deltaEntryRO = new KafkaCacheDeltaFW();

private final MutableDirectBuffer entryInfo = new UnsafeBuffer(new byte[6 * Long.BYTES + 3 * Integer.BYTES + Short.BYTES]);
private final MutableDirectBuffer entryInfo =
new UnsafeBuffer(new byte[7 * Long.BYTES + 3 * Integer.BYTES + 2 * Short.BYTES]);
private final MutableDirectBuffer valueInfo = new UnsafeBuffer(new byte[Integer.BYTES]);

private final Array32FW<KafkaHeaderFW> headersRO = new Array32FW<KafkaHeaderFW>(new KafkaHeaderFW());
Expand Down Expand Up @@ -374,12 +377,14 @@ public void writeEntryStart(
entryInfo.putLong(Long.BYTES, timestamp);
entryInfo.putLong(2 * Long.BYTES, producerId);
entryInfo.putLong(3 * Long.BYTES, NO_ACKNOWLEDGE);
entryInfo.putInt(4 * Long.BYTES, NO_SEQUENCE);
entryInfo.putLong(4 * Long.BYTES + Integer.BYTES, ancestorOffset);
entryInfo.putLong(5 * Long.BYTES + Integer.BYTES, NO_DESCENDANT_OFFSET);
entryInfo.putInt(6 * Long.BYTES + Integer.BYTES, entryFlags);
entryInfo.putInt(6 * Long.BYTES + 2 * Integer.BYTES, deltaPosition);
entryInfo.putShort(6 * Long.BYTES + 3 * Integer.BYTES, KafkaAckMode.NONE.value());
entryInfo.putLong(4 * Long.BYTES, NO_PRODUCER_ID);
entryInfo.putShort(5 * Long.BYTES, NO_PRODUCER_EPOCH);
entryInfo.putInt(5 * Long.BYTES + Short.BYTES, NO_SEQUENCE);
entryInfo.putLong(5 * Long.BYTES + Integer.BYTES + Short.BYTES, ancestorOffset);
entryInfo.putLong(6 * Long.BYTES + Integer.BYTES + Short.BYTES, NO_DESCENDANT_OFFSET);
entryInfo.putInt(7 * Long.BYTES + Integer.BYTES + Short.BYTES, entryFlags);
entryInfo.putInt(7 * Long.BYTES + 2 * Integer.BYTES + Short.BYTES, deltaPosition);
entryInfo.putShort(7 * Long.BYTES + 3 * Integer.BYTES + Short.BYTES, KafkaAckMode.NONE.value());

logFile.appendBytes(entryInfo);
logFile.appendBytes(key);
Expand Down Expand Up @@ -554,13 +559,14 @@ public void writeProduceEntryStart(
entryInfo.putLong(Long.BYTES, timestamp);
entryInfo.putLong(2 * Long.BYTES, ownerId);
entryInfo.putLong(3 * Long.BYTES, NO_ACKNOWLEDGE);
entryInfo.putInt(4 * Long.BYTES, sequence);
entryInfo.putLong(4 * Long.BYTES + Integer.BYTES, NO_ANCESTOR_OFFSET);
entryInfo.putLong(5 * Long.BYTES + Integer.BYTES, NO_DESCENDANT_OFFSET);
entryInfo.putInt(6 * Long.BYTES + Integer.BYTES, 0x00);
entryInfo.putInt(6 * Long.BYTES + 2 * Integer.BYTES, NO_DELTA_POSITION);
entryInfo.putShort(6 * Long.BYTES + 3 * Integer.BYTES, ackMode.value());

entryInfo.putLong(4 * Long.BYTES, producerId);
entryInfo.putShort(5 * Long.BYTES, producerEpoch);
entryInfo.putInt(5 * Long.BYTES + Short.BYTES, sequence);
entryInfo.putLong(5 * Long.BYTES + Integer.BYTES + Short.BYTES, NO_ANCESTOR_OFFSET);
entryInfo.putLong(6 * Long.BYTES + Integer.BYTES + Short.BYTES, NO_DESCENDANT_OFFSET);
entryInfo.putInt(7 * Long.BYTES + Integer.BYTES + Short.BYTES, 0x00);
entryInfo.putInt(7 * Long.BYTES + 2 * Integer.BYTES + Short.BYTES, NO_DELTA_POSITION);
entryInfo.putShort(7 * Long.BYTES + 3 * Integer.BYTES + Short.BYTES, ackMode.value());

logFile.appendBytes(entryInfo);
logFile.appendBytes(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,8 @@ private void doProduceInitialData(
{
final long partitionOffset = nextEntry.offset$();
final long timestamp = nextEntry.timestamp();
final long producerId = nextEntry.producerId();
final short producerEpoch = nextEntry.producerEpoch();
final int sequence = nextEntry.sequence();
final KafkaAckMode ackMode = KafkaAckMode.valueOf(nextEntry.ackMode());
final KafkaKeyFW key = nextEntry.key();
Expand Down Expand Up @@ -1234,11 +1236,11 @@ private void doProduceInitialData(
switch (flags)
{
case FLAG_INIT | FLAG_FIN:
doServerInitialDataFull(traceId, timestamp, sequence, checksum,
doServerInitialDataFull(traceId, timestamp, producerId, producerEpoch, sequence, checksum,
ackMode, key, headers, trailers, fragment, reserved, flags);
break;
case FLAG_INIT:
doServerInitialDataInit(traceId, deferred, timestamp, sequence,
doServerInitialDataInit(traceId, deferred, timestamp, producerId, producerEpoch, sequence,
checksum, ackMode, key, headers, trailers, fragment, reserved, flags);
break;
case FLAG_NONE:
Expand Down Expand Up @@ -1277,6 +1279,8 @@ private void doProduceInitialData(
private void doServerInitialDataFull(
long traceId,
long timestamp,
long producerId,
short produceEpoch,
int sequence,
long checksum,
KafkaAckMode ackMode,
Expand All @@ -1291,6 +1295,8 @@ private void doServerInitialDataFull(
ex -> ex.set((b, o, l) -> kafkaDataExRW.wrap(b, o, l)
.typeId(kafkaTypeId)
.produce(f -> f.timestamp(timestamp)
.producerId(producerId)
.producerEpoch(produceEpoch)
.sequence(sequence)
.crc32c(checksum)
.ackMode(a -> a.set(ackMode))
Expand All @@ -1308,6 +1314,8 @@ private void doServerInitialDataInit(
long traceId,
int deferred,
long timestamp,
long producerId,
short produceEpoch,
int sequence,
long checksum,
KafkaAckMode ackMode,
Expand All @@ -1323,6 +1331,8 @@ private void doServerInitialDataInit(
.typeId(kafkaTypeId)
.produce(f -> f.deferred(deferred)
.timestamp(timestamp)
.producerId(producerId)
.producerEpoch(produceEpoch)
.sequence(sequence)
.crc32c(checksum)
.ackMode(a -> a.set(ackMode))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ public final class KafkaClientProduceFactory extends KafkaClientSaslHandshaker i
private static final byte RECORD_BATCH_MAGIC = 2;
private static final short RECORD_BATCH_ATTRIBUTES_NONE = 0;
private static final short RECORD_BATCH_ATTRIBUTES_NO_TIMESTAMP = 0x08;
private static final int RECORD_BATCH_PRODUCER_ID_NONE = -1;
private static final long RECORD_BATCH_PRODUCER_ID_NONE = -1;
private static final short RECORD_BATCH_PRODUCER_EPOCH_NONE = -1;
private static final short RECORD_BATCH_SEQUENCE_NONE = -1;
private static final int RECORD_BATCH_SEQUENCE_NONE = -1;
private static final byte RECORD_ATTRIBUTES_NONE = 0;

private static final String TRANSACTION_ID_NONE = null;
Expand Down Expand Up @@ -531,6 +531,9 @@ private int flushRecordInit(
assert kafkaDataEx.kind() == KafkaDataExFW.KIND_PRODUCE;
final KafkaProduceDataExFW kafkaProduceDataEx = kafkaDataEx.produce();
final long timestamp = kafkaProduceDataEx.timestamp();
final long producerId = kafkaProduceDataEx.producerId();
final short producerEpoch = kafkaProduceDataEx.producerEpoch();
final int sequence = kafkaProduceDataEx.sequence();
final KafkaAckMode ackMode = kafkaProduceDataEx.ackMode().get();
final KafkaKeyFW key = kafkaProduceDataEx.key();
final Array32FW<KafkaHeaderFW> headers = kafkaProduceDataEx.headers();
Expand All @@ -542,11 +545,21 @@ private int flushRecordInit(
final int maxEncodeableBytes = client.encodeSlotLimit + client.valueCompleteSize + produceRecordFramingSize;

if (client.encodeSlot != NO_SLOT &&
maxEncodeableBytes > encodePool.slotCapacity())
(maxEncodeableBytes > encodePool.slotCapacity() ||
client.producerId != producerId && client.producerEpoch != producerEpoch))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
client.producerId != producerId && client.producerEpoch != producerEpoch))
client.producerId != producerId || client.producerEpoch != producerEpoch))

only need one of these to differ to trigger request since both are expected to be consistent for entire batch, right?

{
client.doEncodeRequestIfNecessary(traceId, budgetId);
}

if (client.producerId == RECORD_BATCH_PRODUCER_ID_NONE ||
client.producerId != producerId)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be && instead?

Please remove extra whitespace around !=.

{
client.sequence = sequence;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest we rename to client.baseSequence.

}

client.producerId = producerId;
client.producerEpoch = producerEpoch;

client.doEncodeRecordInit(traceId, timestamp, ackMode, key, payload, headers);
if (client.encodeSlot != NO_SLOT)
{
Expand Down Expand Up @@ -1237,6 +1250,10 @@ private final class KafkaProduceClient extends KafkaSaslClient
private LongLongConsumer encoder;
private boolean flushable;

private long producerId = RECORD_BATCH_PRODUCER_ID_NONE;
private short producerEpoch = RECORD_BATCH_PRODUCER_EPOCH_NONE;
private int sequence = RECORD_BATCH_SEQUENCE_NONE;

KafkaProduceClient(
KafkaProduceStream stream,
long resolvedId,
Expand Down Expand Up @@ -1878,6 +1895,9 @@ private void doEncodeProduceRequest(
? RECORD_BATCH_ATTRIBUTES_NO_TIMESTAMP
: RECORD_BATCH_ATTRIBUTES_NONE;

final int sequence = client.producerId == RECORD_BATCH_PRODUCER_ID_NONE ? RECORD_BATCH_SEQUENCE_NONE :
client.sequence;

final RecordBatchFW recordBatch = recordBatchRW.wrap(encodeBuffer, encodeProgress, encodeLimit)
.baseOffset(0)
.length(recordBatchLength)
Expand All @@ -1888,9 +1908,9 @@ private void doEncodeProduceRequest(
.lastOffsetDelta(encodeableRecordCount - 1)
.firstTimestamp(encodeableRecordBatchTimestamp)
.maxTimestamp(encodeableRecordBatchTimestampMax)
.producerId(RECORD_BATCH_PRODUCER_ID_NONE)
.producerEpoch(RECORD_BATCH_PRODUCER_EPOCH_NONE)
.baseSequence(RECORD_BATCH_SEQUENCE_NONE)
.producerId(client.producerId)
.producerEpoch(client.producerEpoch)
.baseSequence(sequence)
.recordCount(encodeableRecordCount)
.build();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to reset producerId and producerEpoch after encoding request so that next request can batch as expected?


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1592,28 +1592,34 @@ private void doMergedReplyBegin(
if (capabilities == FETCH_ONLY)
{
doBegin(sender, originId, routedId, replyId, replySeq, replyAck, replyMax,
traceId, authorization, affinity, beginExToKafka());
traceId, authorization, affinity, beginExToKafka(beginExToKafkaMergedFetchOnly()));
}
else if (capabilities == PRODUCE_ONLY)
{
doBegin(sender, originId, routedId, replyId, replySeq, replyAck, replyMax,
traceId, authorization, affinity, beginExToKafka(beginExToKafkaMergedProduceOnly()));
}
else
{
doBegin(sender, originId, routedId, replyId, replySeq, replyAck, replyMax,
traceId, authorization, affinity, EMPTY_EXTENSION);
traceId, authorization, affinity, EMPTY_EXTENSION);
}

doUnmergedFetchReplyWindowsIfNecessary(traceId);
}

private Flyweight.Builder.Visitor beginExToKafka()
private Flyweight.Builder.Visitor beginExToKafka(
Consumer<KafkaMergedBeginExFW.Builder> beginExToKafkaMerged)
{
return (buffer, offset, maxLimit) ->
kafkaBeginExRW.wrap(buffer, offset, maxLimit)
.typeId(kafkaTypeId)
.merged(beginExToKafkaMerged())
.merged(beginExToKafkaMerged)
.build()
.limit() - offset;
}

private Consumer<KafkaMergedBeginExFW.Builder> beginExToKafkaMerged()
private Consumer<KafkaMergedBeginExFW.Builder> beginExToKafkaMergedFetchOnly()
{
return builder ->
{
Expand All @@ -1640,6 +1646,15 @@ private Consumer<KafkaMergedBeginExFW.Builder> beginExToKafkaMerged()
};
}

private Consumer<KafkaMergedBeginExFW.Builder> beginExToKafkaMergedProduceOnly()
{
return builder ->
{
builder.capabilities(c -> c.set(PRODUCE_ONLY)).topic(topic);
leadersByPartitionId.intForEach((k, v) -> builder.partitionsItem(i -> i.partitionId(k)));
};
}

private void doMergedReplyData(
long traceId,
int flags,
Expand Down
2 changes: 2 additions & 0 deletions runtime/binding-kafka/src/main/zilla/internal.idl
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ scope internal
int64 timestamp;
int64 ownerId;
int64 acknowledge = 0;
int64 producerId = -1;
int16 producerEpoch = -1;
int32 sequence = -1;
int64 ancestor;
int64 descendant;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2021-2023 Aklivity Inc.
*
* Aklivity licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.aklivity.zilla.runtime.binding.kafka.internal.stream;

import static io.aklivity.zilla.runtime.binding.kafka.internal.KafkaConfigurationTest.KAFKA_CLIENT_SASL_SCRAM_NONCE_NAME;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.rules.RuleChain.outerRule;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.DisableOnDebug;
import org.junit.rules.TestRule;
import org.junit.rules.Timeout;
import org.kaazing.k3po.junit.annotation.Specification;
import org.kaazing.k3po.junit.rules.K3poRule;

import io.aklivity.zilla.runtime.engine.test.EngineRule;
import io.aklivity.zilla.runtime.engine.test.annotation.Configuration;
import io.aklivity.zilla.runtime.engine.test.annotation.Configure;

public class ClientInitProducerIdSaslIT
{
private final K3poRule k3po = new K3poRule()
.addScriptRoot("net", "io/aklivity/zilla/specs/binding/kafka/streams/network/init.producer.id.v4.sasl.handshake.v1")
.addScriptRoot("app", "io/aklivity/zilla/specs/binding/kafka/streams/application/init.producer.id");

private final TestRule timeout = new DisableOnDebug(new Timeout(15, SECONDS));

private final EngineRule engine = new EngineRule()
.directory("target/zilla-itests")
.countersBufferCapacity(8192)
.configurationRoot("io/aklivity/zilla/specs/binding/kafka/config")
.external("net0")
.clean();

@Rule
public final TestRule chain = outerRule(engine).around(k3po).around(timeout);


@Test
@Configuration("client.options.sasl.plain.yaml")
@Specification({
"${app}/produce.new.id/client",
"${net}/produce.new.id.sasl.plain/server"})
public void shouldGenerateNewProducerIdWithSaslPlain() throws Exception
{
k3po.finish();
}

@Test
@Configuration("client.options.sasl.scram.yaml")
@Specification({
"${app}/produce.new.id/client",
"${net}/produce.new.id.sasl.scram/server"})
@Configure(name = KAFKA_CLIENT_SASL_SCRAM_NONCE_NAME,
value = "io.aklivity.zilla.runtime.binding.kafka.internal.stream.ClientInitProducerIdSaslIT::supplyNonce")
public void shouldGenerateNewProducerIdWithSaslScram() throws Exception
{
k3po.finish();
}

public static String supplyNonce()
{
return "fyko+d2lbbFgONRv9qkxdawL";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,26 @@ public void shouldSendMessageKeyDistinct() throws Exception
k3po.finish();
}

@Test
@Configuration("client.when.topic.yaml")
@Specification({
"${app}/message.producer.id/client",
"${net}/message.producer.id/server"})
public void shouldSendMessageValueWithProducerId() throws Exception
{
k3po.finish();
}

@Test
@Configuration("client.when.topic.yaml")
@Specification({
"${app}/message.values.producer.id/client",
"${net}/message.values.producer.id/server"})
public void shouldSendMessageValuesWithProducerId() throws Exception
{
k3po.finish();
}

@Test
@Configuration("client.when.topic.yaml")
@Specification({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,4 @@ write zilla:begin.ext ${kafka:beginEx()
.producerEpoch(2)
.build()
.build()}
write flush
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ write zilla:begin.ext ${kafka:beginEx()

connected

read zilla:begin.ext ${kafka:matchBeginEx()
.typeId(zilla:id("kafka"))
.merged()
.capabilities("PRODUCE_ONLY")
.topic("test")
.partition(0, -1)
.partition(1, -1)
.build()
.build()}

write zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
.merged()
Expand Down
Loading