Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
901f0b2
separating publish streams based on qos
bmaidics Jan 10, 2024
79ddafd
Fix test
bmaidics Jan 11, 2024
1265671
Checkpoint
bmaidics Jan 9, 2024
7847580
Checkpoint
bmaidics Jan 9, 2024
7895cd5
check
bmaidics Jan 9, 2024
f831ff8
start of idempotent work
bmaidics Jan 13, 2024
412019a
Merge remote-tracking branch 'upstream/feature/mqtt-kafka' into featu…
bmaidics Jan 15, 2024
167a33a
Optimize memory allocation for mqtt-kafka offset tracking (#694)
bmaidics Jan 15, 2024
cf08e0f
checkpoint
bmaidics Jan 15, 2024
80c0e09
checkpoint
bmaidics Jan 18, 2024
62db2cb
Merge remote-tracking branch 'upstream/feature/mqtt-kafka' into qos2_…
bmaidics Jan 18, 2024
49a7b35
Checkpoint with retained offsetCommit stream
bmaidics Jan 18, 2024
92b262a
checkpoint
bmaidics Jan 19, 2024
29d24f7
checkpoint
bmaidics Jan 22, 2024
f9fe624
mqtt-kafka checkpoint
bmaidics Jan 23, 2024
5195271
checkpoint
bmaidics Jan 24, 2024
99f1c1f
checkpoint
bmaidics Jan 24, 2024
8db664d
Fixes
bmaidics Jan 24, 2024
7038c04
Fix flaky test
bmaidics Jan 24, 2024
e00dd8a
Merge remote-tracking branch 'upstream/feature/mqtt-kafka' into qos2_…
bmaidics Jan 25, 2024
e3d92ee
fixes
bmaidics Jan 25, 2024
31e26ed
fix dump
attilakreiner Jan 25, 2024
f4b27c1
Fix init produce id request
akrambek Jan 25, 2024
9c5fd24
fix
bmaidics Jan 26, 2024
35c6976
Fix bug
bmaidics Jan 26, 2024
a3c7e09
Fix
bmaidics Jan 29, 2024
5cf9068
Don't flush early if the sequence number is not set
akrambek Jan 29, 2024
bc38ac7
Draft
bmaidics Jan 31, 2024
724bd70
checkpoint
bmaidics Feb 1, 2024
7df5123
Fixes
bmaidics Feb 1, 2024
4204bc1
Adrress review comments
bmaidics Feb 1, 2024
2a0204f
Merge remote-tracking branch 'upstream/feature/mqtt-kafka' into qos2_…
bmaidics Feb 1, 2024
ebd9cee
Merge fixes
bmaidics Feb 1, 2024
8988e6b
Include producerId and producerEpoch into cache entry
akrambek Feb 2, 2024
739f79d
fix dump
attilakreiner Feb 2, 2024
878ec78
Fix qos2 large message
bmaidics Feb 3, 2024
0563e13
Fix typo
akrambek Feb 5, 2024
7f2ca2a
Merge branch 'qos2_idempontent' of github.com:bmaidics/zilla into qos…
akrambek Feb 5, 2024
7d9afee
reviews
bmaidics Feb 5, 2024
c274636
more feedback
bmaidics Feb 5, 2024
215791e
checkpoint
bmaidics Feb 6, 2024
d3635b3
Refactor
bmaidics Feb 6, 2024
007e8d1
Adjust code coverage ratio
jfallows Feb 6, 2024
ccae27c
Ignore IT that fails only on GitHub Actions, see issue #786
jfallows Feb 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
checkpoint
  • Loading branch information
bmaidics committed Jan 24, 2024
commit 519527169574e5cc057c2561c331e4e2e3e06afe
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,8 @@
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.KafkaResetExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttBeginExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttDataExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttFlushExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttPublishBeginExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttPublishDataExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttPublishFlushExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttResetExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.ResetFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.WindowFW;
Expand Down Expand Up @@ -122,7 +120,6 @@ public class MqttKafkaPublishFactory implements MqttKafkaStreamFactory

private final ExtensionFW extensionRO = new ExtensionFW();
private final MqttBeginExFW mqttBeginExRO = new MqttBeginExFW();
private final MqttFlushExFW mqttFlushExRO = new MqttFlushExFW();
private final MqttDataExFW mqttDataExRO = new MqttDataExFW();
private final KafkaResetExFW kafkaResetExRO = new KafkaResetExFW();
private final KafkaBeginExFW kafkaBeginExRO = new KafkaBeginExFW();
Expand All @@ -132,7 +129,6 @@ public class MqttKafkaPublishFactory implements MqttKafkaStreamFactory
private final KafkaFlushExFW.Builder kafkaFlushExRW = new KafkaFlushExFW.Builder();
private final KafkaDataExFW.Builder kafkaDataExRW = new KafkaDataExFW.Builder();
private final MqttResetExFW.Builder mqttResetExRW = new MqttResetExFW.Builder();
private final MqttFlushExFW.Builder mqttFlushExRW = new MqttFlushExFW.Builder();
private final Array32FW.Builder<KafkaHeaderFW.Builder, KafkaHeaderFW> kafkaHeadersRW =
new Array32FW.Builder<>(new KafkaHeaderFW.Builder(), new KafkaHeaderFW());

Expand Down Expand Up @@ -338,10 +334,6 @@ private void onMqttMessage(
final BeginFW begin = beginRO.wrap(buffer, index, index + length);
onMqttBegin(begin);
break;
case FlushFW.TYPE_ID:
final FlushFW flush = flushRO.wrap(buffer, index, index + length);
onMqttFlush(flush);
break;
case DataFW.TYPE_ID:
final DataFW data = dataRO.wrap(buffer, index, index + length);
onMqttData(data);
Expand Down Expand Up @@ -483,43 +475,6 @@ private String clientHashKey(
return clientHashKey;
}

private void onMqttFlush(
FlushFW flush)
{
final long sequence = flush.sequence();
final long acknowledge = flush.acknowledge();
final long traceId = flush.traceId();
final long authorization = flush.authorization();

assert acknowledge <= sequence;
assert sequence >= initialSeq;
assert acknowledge >= initialAck;

initialSeq = sequence;

assert initialAck <= initialSeq;

final OctetsFW extension = flush.extension();
final MqttFlushExFW mqttFlushEx = extension.get(mqttFlushExRO::tryWrap);

assert mqttFlushEx.kind() == MqttFlushExFW.KIND_PUBLISH;
final MqttPublishFlushExFW mqttPublishFlushEx = mqttFlushEx.publish();

final int packetId = mqttPublishFlushEx.packetId();

final MqttKafkaBindingConfig binding = supplyBinding.apply(routedId);
MqttKafkaSessionFactory.MqttSessionProxy session = binding.sessions.get(affinity);

session.commitOffsetComplete(traceId, authorization, messages.topicString,
messages.qos2PartitionId, packetId, this);

if (hasPublishFlagRetained(publishFlags))
{
session.commitOffsetComplete(traceId, authorization, retained.topicString,
retained.qos2PartitionId, packetId, this);
}
}

private void onMqttData(
DataFW data)
{
Expand Down Expand Up @@ -653,9 +608,8 @@ private void onMqttData(

if ((flags & DATA_FLAG_FIN) != 0x00 && qos == MqttQoS.EXACTLY_ONCE.value())
{
//TODO: when the ack comes for this we need to send the FIN bit, not the
session.commitOffsetIncomplete(traceId, authorization, messages.topicString,
messages.qos2PartitionId, packetId, messages);
messages.qos2PartitionId, packetId, messages, false);
}

if (retainAvailable)
Expand Down Expand Up @@ -695,7 +649,7 @@ private void onMqttData(
if ((flags & DATA_FLAG_FIN) != 0x00 && qos == MqttQoS.EXACTLY_ONCE.value())
{
session.commitOffsetIncomplete(traceId, authorization, retained.topicString,
retained.qos2PartitionId, packetId, retained);
retained.qos2PartitionId, packetId, retained, true);
}
}
else
Expand Down Expand Up @@ -864,30 +818,6 @@ private void doMqttBegin(
traceId, authorization, affinity);
}

public void doMqttFlush(
long traceId,
long authorization,
long budgetId,
int reserved,
int packetId)
{
if (!hasPublishFlagRetained(publishFlags))
{
final MqttFlushExFW mqttFlushEx =
mqttFlushExRW.wrap(extBuffer, FlushFW.FIELD_OFFSET_EXTENSION, extBuffer.capacity())
.typeId(mqttTypeId)
.publish(p -> p.packetId(packetId))
.build();

doFlush(mqtt, originId, routedId, replyId, replySeq, replyAck, replyMax, traceId, authorization,
budgetId, reserved, mqttFlushEx);
}
else
{
publishFlags = 0;
}
}

private void doMqttAbort(
long traceId,
long authorization)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,14 @@
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.KafkaTopicPartitionOffsetFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttBeginExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttDataExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttFlushExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttPublishOffsetMetadataFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttResetExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttServerCapabilities;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttSessionBeginExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttSessionDataExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttSessionDataKind;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttSessionFlushExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.ResetFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.SignalFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.WindowFW;
Expand Down Expand Up @@ -208,6 +210,7 @@ public class MqttKafkaSessionFactory implements MqttKafkaStreamFactory

private final ExtensionFW extensionRO = new ExtensionFW();
private final MqttBeginExFW mqttBeginExRO = new MqttBeginExFW();
private final MqttFlushExFW mqttFlushExRO = new MqttFlushExFW();
private final MqttSessionStateFW mqttSessionStateRO = new MqttSessionStateFW();
private final MqttSessionSignalFW mqttSessionSignalRO = new MqttSessionSignalFW();
private final MqttWillMessageFW mqttWillRO = new MqttWillMessageFW();
Expand All @@ -216,6 +219,7 @@ public class MqttKafkaSessionFactory implements MqttKafkaStreamFactory
private final MqttPublishOffsetMetadataFW mqttOffsetMetadataRO = new MqttPublishOffsetMetadataFW();

private final MqttResetExFW.Builder mqttResetExRW = new MqttResetExFW.Builder();
private final MqttFlushExFW.Builder mqttFlushExRW = new MqttFlushExFW.Builder();
private final KafkaBeginExFW kafkaBeginExRO = new KafkaBeginExFW();
private final KafkaDataExFW kafkaDataExRO = new KafkaDataExFW();
private final KafkaResetExFW kafkaResetExRO = new KafkaResetExFW();
Expand Down Expand Up @@ -402,6 +406,8 @@ public final class MqttSessionProxy
private final String16FW sessionsTopic;
private final Object2ObjectHashMap<String16FW, MqttKafkaPublishFactory.KafkaMessagesProxy> qos2Publishes;
private final Long2ObjectHashMap<PublishOffsetMetadata> offsets;
private final Int2ObjectHashMap<KafkaTopicPartition> partitions;
private final Int2ObjectHashMap<KafkaTopicPartition> retainedPartitions;
private final List<KafkaPartition> initialisablePartitions;
Comment thread
bmaidics marked this conversation as resolved.
Outdated
private final Long2LongHashMap leaderEpochs;

Expand Down Expand Up @@ -461,6 +467,8 @@ private MqttSessionProxy(
this.qos2Publishes = new Object2ObjectHashMap<>();
this.offsetFetches = new ArrayList<>();
this.offsets = new Long2ObjectHashMap<>();
this.partitions = new Int2ObjectHashMap<>();
this.retainedPartitions = new Int2ObjectHashMap<>();
this.initialisablePartitions = new ArrayList<>();
}

Expand All @@ -476,6 +484,10 @@ private void onMqttMessage(
final BeginFW begin = beginRO.wrap(buffer, index, index + length);
onMqttBegin(begin);
break;
case FlushFW.TYPE_ID:
final FlushFW flush = flushRO.wrap(buffer, index, index + length);
onMqttFlush(flush);
break;
case DataFW.TYPE_ID:
final DataFW data = dataRO.wrap(buffer, index, index + length);
onMqttData(data);
Expand Down Expand Up @@ -552,6 +564,40 @@ private void onMqttBegin(
}
}

private void onMqttFlush(
FlushFW flush)
{
final long sequence = flush.sequence();
final long acknowledge = flush.acknowledge();
final long traceId = flush.traceId();
final long authorization = flush.authorization();

assert acknowledge <= sequence;
assert sequence >= initialSeq;
assert acknowledge >= initialAck;

initialSeq = sequence;

assert initialAck <= initialSeq;

final OctetsFW extension = flush.extension();
final MqttFlushExFW mqttFlushEx = extension.get(mqttFlushExRO::tryWrap);

assert mqttFlushEx.kind() == MqttFlushExFW.KIND_SESSION;
final MqttSessionFlushExFW mqttPublishFlushEx = mqttFlushEx.session();

final int packetId = mqttPublishFlushEx.packetId();

final KafkaTopicPartition partition = partitions.remove(packetId);
commitOffsetComplete(traceId, authorization, partition.topic, partition.partitionId, packetId);
Comment thread
bmaidics marked this conversation as resolved.
Outdated

if (retainedPartitions.containsKey(packetId))
{
final KafkaTopicPartition retainedPartition = retainedPartitions.remove(packetId);
commitOffsetComplete(traceId, authorization, retainedPartition.topic, retainedPartition.partitionId, packetId);
}
}

private void onMqttData(
DataFW data)
{
Expand Down Expand Up @@ -762,7 +808,8 @@ public void commitOffsetIncomplete(
String topic,
int partitionId,
int packetId,
MqttKafkaPublishFactory.KafkaProxy kafka)
MqttKafkaPublishFactory.KafkaProxy kafka,
boolean retained)
{
final long offsetKey = offsetKey(topic, partitionId);
final PublishOffsetMetadata metadata = offsets.get(offsetKey);
Expand All @@ -782,6 +829,14 @@ public void commitOffsetIncomplete(

offsetCommit.unfinishedKafkas.add(kafka);
offsetCommit.unackedPacketIds.add(INCOMPLETE_PACKET_ID);
if (retained)
{
retainedPartitions.put(packetId, new KafkaTopicPartition(topic, partitionId));
}
else
{
partitions.put(packetId, new KafkaTopicPartition(topic, partitionId));
}
offsetCommit.doKafkaData(traceId, authorization, 0, DATA_FLAG_COMPLETE, offsetCommitEx);
}

Expand All @@ -790,8 +845,7 @@ public void commitOffsetComplete(
long authorization,
String topic,
int partitionId,
int packetId,
MqttKafkaPublishFactory.MqttPublishProxy publish)
int packetId)
{
final long offsetKey = offsetKey(topic, partitionId);
final PublishOffsetMetadata metadata = offsets.get(offsetKey);
Expand All @@ -809,7 +863,6 @@ public void commitOffsetComplete(
.leaderEpoch((int) leaderEpochs.get(offsetKey)))
.build();

offsetCommit.unackedPublishes.add(publish);
offsetCommit.unackedPacketIds.add(packetId);
offsetCommit.doKafkaData(traceId, authorization, 0, DATA_FLAG_COMPLETE, offsetCommitEx);
metadata.sequence++;
Expand Down Expand Up @@ -1127,6 +1180,26 @@ private void doMqttData(
assert replySeq <= replyAck + replyMax;
}

public void doMqttFlush(
long traceId,
long authorization,
long budgetId,
int reserved,
int packetId)
{
if (!partitions.containsKey(packetId) && !retainedPartitions.containsKey(packetId))
{
final MqttFlushExFW mqttFlushEx =
mqttFlushExRW.wrap(extBuffer, FlushFW.FIELD_OFFSET_EXTENSION, extBuffer.capacity())
.typeId(mqttTypeId)
.session(p -> p.packetId(packetId))
.build();

doFlush(mqtt, originId, routedId, replyId, replySeq, replyAck, replyMax, traceId, authorization,
budgetId, reserved, mqttFlushEx);
}
}

private void doMqttAbort(
long traceId,
long authorization)
Expand Down Expand Up @@ -3960,7 +4033,9 @@ private void onKafkaData(
{
final long offset = partition.partitionOffset();
final String16FW metadata = partition.metadata();
final long offsetKey = offsetKey(topic, partition.partitionId());
final int partitionId = partition.partitionId();
final long offsetKey = offsetKey(topic, partitionId);

delegate.leaderEpochs.put(offsetKey, partition.leaderEpoch());

PublishOffsetMetadata offsetMetadata;
Expand All @@ -3977,6 +4052,17 @@ private void onKafkaData(
delegate.offsetCommit.doKafkaBegin(traceId, authorization, 0);
}
delegate.offsets.put(offsetKey, offsetMetadata);
if (!retained)
{
offsetMetadata.packetIds.forEach(p ->
delegate.partitions.put(p, new KafkaTopicPartition(topic, partitionId)));
}
else
{
offsetMetadata.packetIds.forEach(p ->
delegate.retainedPartitions.put(p, new KafkaTopicPartition(topic, partitionId)));
}

}
else
{
Expand Down Expand Up @@ -4515,8 +4601,19 @@ else if (wasOpen)
}
else
{
final MqttKafkaPublishFactory.MqttPublishProxy publish = unackedPublishes.remove();
publish.doMqttFlush(traceId, authorization, 0, 0, packetId);
if (delegate.partitions.containsKey(packetId))
{
delegate.partitions.remove(packetId);
}
else if (delegate.retainedPartitions.containsKey(packetId))
{
delegate.retainedPartitions.remove(packetId);
}

if (!delegate.partitions.containsKey(packetId) && !delegate.retainedPartitions.containsKey(packetId))
{
delegate.doMqttFlush(traceId, authorization, 0, 0, packetId);
}
}
}
else
Expand Down Expand Up @@ -4676,6 +4773,20 @@ private KafkaPartition(
}
}

public static final class KafkaTopicPartition
{
private final String topic;
private final int partitionId;

KafkaTopicPartition(
String topic,
int partitionId)
{
this.topic = topic;
this.partitionId = partitionId;
}
}

public static final class PublishOffsetMetadata
{
public final long producerId;
Expand Down
Loading