Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Fix NegativeArraySizeException when receiving mqttFlush
  • Loading branch information
bmaidics committed Jun 19, 2024
commit 83d26e1d198b94ba4e12e21df71b2cfd0b8434d1
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
import io.aklivity.zilla.runtime.binding.mqtt.kafka.config.MqttKafkaPublishConfig;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.config.MqttKafkaTopicsConfig;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.MqttKafkaBinding;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.MqttQoS;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.String16FW;
import io.aklivity.zilla.runtime.engine.config.OptionsConfig;
import io.aklivity.zilla.runtime.engine.config.OptionsConfigAdapterSpi;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.MqttQoS;

public class MqttKafkaOptionsConfigAdapter implements OptionsConfigAdapterSpi, JsonbAdapter<OptionsConfig, JsonObject>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.KafkaSkip;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.MqttPayloadFormat;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.MqttQoS;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.MqttSubscribeOffsetMetadataFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.MqttSubscribeOffsetMetadataV1FW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.MqttSubscribeOffsetMetadataV2FW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.MqttTopicFilterFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.OctetsFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.String16FW;
Expand Down Expand Up @@ -109,7 +110,7 @@ public class MqttKafkaSubscribeFactory implements MqttKafkaStreamFactory
private static final int DATA_FLAG_FIN = 0x01;
private static final OctetsFW EMPTY_OCTETS = new OctetsFW().wrap(new UnsafeBuffer(new byte[0]), 0, 0);
private static final String16FW EMPTY_STRING = new String16FW("");
private static final int OFFSET_METADATA_VERSION = 1;
private static final int OFFSET_METADATA_VERSION = 2;

private final OctetsFW emptyRO = new OctetsFW().wrap(new UnsafeBuffer(0L, 0), 0, 0);
private final BeginFW beginRO = new BeginFW();
Expand All @@ -130,7 +131,7 @@ public class MqttKafkaSubscribeFactory implements MqttKafkaStreamFactory
private final WindowFW.Builder windowRW = new WindowFW.Builder();
private final ResetFW.Builder resetRW = new ResetFW.Builder();
private final MqttSubscribeMessageFW.Builder mqttSubscribeMessageRW = new MqttSubscribeMessageFW.Builder();
private final MqttSubscribeOffsetMetadataFW.Builder mqttOffsetMetadataRW = new MqttSubscribeOffsetMetadataFW.Builder();
private final MqttSubscribeOffsetMetadataV2FW.Builder mqttOffsetMetadataV2RW = new MqttSubscribeOffsetMetadataV2FW.Builder();

private final ExtensionFW extensionRO = new ExtensionFW();
private final MqttBeginExFW mqttBeginExRO = new MqttBeginExFW();
Expand All @@ -140,7 +141,8 @@ public class MqttKafkaSubscribeFactory implements MqttKafkaStreamFactory
private final KafkaFlushExFW kafkaFlushExRO = new KafkaFlushExFW();
private final KafkaHeaderFW kafkaHeaderRO = new KafkaHeaderFW();
private final MqttSubscribeMessageFW mqttSubscribeMessageRO = new MqttSubscribeMessageFW();
private final MqttSubscribeOffsetMetadataFW mqttOffsetMetadataRO = new MqttSubscribeOffsetMetadataFW();
private final MqttSubscribeOffsetMetadataV1FW mqttOffsetMetadataV1RO = new MqttSubscribeOffsetMetadataV1FW();
private final MqttSubscribeOffsetMetadataV2FW mqttOffsetMetadataV2RO = new MqttSubscribeOffsetMetadataV2FW();

private final MqttDataExFW.Builder mqttDataExRW = new MqttDataExFW.Builder();
private final MqttFlushExFW.Builder mqttFlushExRW = new MqttFlushExFW.Builder();
Expand Down Expand Up @@ -467,93 +469,118 @@ private void onMqttFlush(
binding.resolveAll(authorization, filters) : null;
final int packetId = mqttSubscribeFlushEx.packetId();

if (!filters.isEmpty())
if (!MqttKafkaState.replyClosed(state))
{
if (routes != null)
if (!filters.isEmpty())
{
routes.forEach(r ->
{
final long routeOrder = r.order;
if (!messages.containsKey(routeOrder))
{
KafkaMessagesProxy messagesProxy = new KafkaMessagesProxy(originId, r, this);
messages.put(routeOrder, messagesProxy);
messagesPerTopicKey.put(messagesProxy.topicKey, r.order);
messagesProxy.doKafkaBegin(traceId, authorization, 0, filters);
}
else
{
messages.get(routeOrder).doKafkaFlush(traceId, authorization, budgetId, reserved, qos, filters);
}
});
onFiltersChanged(traceId, authorization, budgetId, reserved, filters, routes);
}

if (retainAvailable)
else if (packetId > 0)
{
final List<Subscription> retainedFilters = new ArrayList<>();
filters.forEach(filter ->
{
final boolean sendRetained = (filter.flags() & SEND_RETAIN_FLAG) != 0;
if (sendRetained)
{
retainedFilters.add(new Subscription(
(int) filter.subscriptionId(), newString16FW(filter.pattern()), filter.qos(), filter.flags()));
final boolean rap = (filter.flags() & RETAIN_AS_PUBLISHED_FLAG) != 0;
retainAsPublished.put((int) filter.subscriptionId(), rap);
}
});

retainedSubscriptions.removeIf(rf -> !filters.anyMatch(f -> f.pattern().equals(rf.filter)));
if (!retainedFilters.isEmpty())
{
if (MqttKafkaState.initialOpened(retained.state) && !MqttKafkaState.initialClosed(retained.state))
{
retained.doKafkaFlush(traceId, authorization, budgetId, reserved, qos, retainedFilters);
}
else
{
final List<Subscription> newRetainedFilters = new ArrayList<>();
retainedFilters.forEach(subscription ->
{
if (!retainedSubscriptions.contains(subscription))
{
newRetainedFilters.add(subscription);
}
});
retained.doKafkaBegin(traceId, authorization, 0, newRetainedFilters);
}
}
onMessageAcked(traceId, authorization, budgetId, reserved, packetId, mqttSubscribeFlushEx);
}
}
else if (packetId > 0)
{
final int qos = mqttSubscribeFlushEx.qos();
final MqttOffsetStateFlags state = MqttOffsetStateFlags.valueOf(mqttSubscribeFlushEx.state());
final PartitionOffset offset = state == MqttOffsetStateFlags.INCOMPLETE ?
offsetsPerPacketId.get(packetId) : offsetsPerPacketId.remove(packetId);
}

final long topicPartitionKey = topicPartitionKey(offset.topicKey, offset.partitionId);
final long messagesId = messagesPerTopicKey.get(offset.topicKey);
private void onMessageAcked(
long traceId,
long authorization,
long budgetId,
int reserved,
int packetId,
MqttSubscribeFlushExFW mqttSubscribeFlushEx)
{
final int qos = mqttSubscribeFlushEx.qos();
final MqttOffsetStateFlags state = MqttOffsetStateFlags.valueOf(mqttSubscribeFlushEx.state());
final PartitionOffset offset = state == MqttOffsetStateFlags.INCOMPLETE ?
offsetsPerPacketId.get(packetId) : offsetsPerPacketId.remove(packetId);

OffsetCommit offsetCommit = new OffsetCommit(offset, qos, state, packetId);
final long topicPartitionKey = topicPartitionKey(offset.topicKey, offset.partitionId);
final long messagesId = messagesPerTopicKey.get(offset.topicKey);

final OffsetHighWaterMark highWaterMark = highWaterMarks.get(topicPartitionKey);
OffsetCommit offsetCommit = new OffsetCommit(offset, qos, state, packetId);

final KafkaProxy proxy = messagesId != -1 ? messages.get(messagesId) : retained;
final OffsetHighWaterMark highWaterMark = highWaterMarks.get(topicPartitionKey);

final KafkaProxy proxy = messagesId != -1 ? messages.get(messagesId) : retained;

if (highWaterMark.offset >= offset.offset)
{
highWaterMark.increase();
commitOffset(traceId, authorization, budgetId, reserved, proxy, offsetCommit);
commitDeferredOffsets(traceId, authorization, budgetId, reserved, highWaterMark);
}
else if (qos == MqttQoS.EXACTLY_ONCE.value() && state != MqttOffsetStateFlags.INCOMPLETE)
{
commitOffset(traceId, authorization, budgetId, reserved, proxy, offsetCommit);
}
else
{
highWaterMark.deferOffsetCommit(offsetCommit, proxy);
}
}

if (highWaterMark.offset >= offset.offset)
private void onFiltersChanged(
long traceId,
long authorization,
long budgetId,
int reserved,
Array32FW<MqttTopicFilterFW> filters,
List<MqttKafkaRouteConfig> routes)
{
if (routes != null)
{
routes.forEach(r ->
{
highWaterMark.increase();
commitOffset(traceId, authorization, budgetId, reserved, proxy, offsetCommit);
commitDeferredOffsets(traceId, authorization, budgetId, reserved, highWaterMark);
}
else if (qos == MqttQoS.EXACTLY_ONCE.value() && state != MqttOffsetStateFlags.INCOMPLETE)
final long routeOrder = r.order;
if (!messages.containsKey(routeOrder))
{
KafkaMessagesProxy messagesProxy = new KafkaMessagesProxy(originId, r, this);
messages.put(routeOrder, messagesProxy);
messagesPerTopicKey.put(messagesProxy.topicKey, r.order);
messagesProxy.doKafkaBegin(traceId, authorization, 0, filters);
}
else
{
messages.get(routeOrder).doKafkaFlush(traceId, authorization, budgetId, reserved, qos, filters);
}
});
}

if (retainAvailable)
{
final List<Subscription> retainedFilters = new ArrayList<>();
filters.forEach(filter ->
{
commitOffset(traceId, authorization, budgetId, reserved, proxy, offsetCommit);
}
else
final boolean sendRetained = (filter.flags() & SEND_RETAIN_FLAG) != 0;
if (sendRetained)
{
retainedFilters.add(new Subscription((int) filter.subscriptionId(),
newString16FW(filter.pattern()), filter.qos(), filter.flags()));
final boolean rap = (filter.flags() & RETAIN_AS_PUBLISHED_FLAG) != 0;
retainAsPublished.put((int) filter.subscriptionId(), rap);
}
});

retainedSubscriptions.removeIf(rf -> !filters.anyMatch(f -> f.pattern().equals(rf.filter)));
if (!retainedFilters.isEmpty())
{
highWaterMark.deferOffsetCommit(offsetCommit, proxy);
if (MqttKafkaState.initialOpened(retained.state) && !MqttKafkaState.initialClosed(retained.state))
{
retained.doKafkaFlush(traceId, authorization, budgetId, reserved, qos, retainedFilters);
}
else
{
final List<Subscription> newRetainedFilters = new ArrayList<>();
retainedFilters.forEach(subscription ->
{
if (!retainedSubscriptions.contains(subscription))
{
newRetainedFilters.add(subscription);
}
});
retained.doKafkaBegin(traceId, authorization, 0, newRetainedFilters);
}
}
}
}
Expand Down Expand Up @@ -1845,18 +1872,28 @@ private IntArrayList stringToOffsetMetadataList(
{
final IntArrayList metadataList = new IntArrayList();
UnsafeBuffer buffer = new UnsafeBuffer(BitUtil.fromHex(metadata.asString()));
final MqttSubscribeOffsetMetadataFW offsetMetadata = mqttOffsetMetadataRO.wrap(buffer, 0, buffer.capacity());
offsetMetadata.packetIds().forEachRemaining((IntConsumer) metadataList::add);
final byte version = buffer.getByte(0);
switch (version)
{
case 1:
final MqttSubscribeOffsetMetadataV1FW offsetMetadataV1 = mqttOffsetMetadataV1RO.wrap(buffer, 0, buffer.capacity());
offsetMetadataV1.packetIds().forEachRemaining((IntConsumer) metadataList::add);
break;
case 2:
final MqttSubscribeOffsetMetadataV2FW offsetMetadataV2 = mqttOffsetMetadataV2RO.wrap(buffer, 0, buffer.capacity());
offsetMetadataV2.packetIds().forEachRemaining((IntConsumer) metadataList::add);
break;
}
Comment thread
bmaidics marked this conversation as resolved.
return metadataList;
}

private String16FW offsetMetadataListToString(
IntArrayList metadataList)
{
mqttOffsetMetadataRW.wrap(offsetBuffer, 0, offsetBuffer.capacity());
mqttOffsetMetadataRW.version(OFFSET_METADATA_VERSION);
metadataList.forEach(p -> mqttOffsetMetadataRW.appendPacketIds(p.shortValue()));
final MqttSubscribeOffsetMetadataFW offsetMetadata = mqttOffsetMetadataRW.build();
mqttOffsetMetadataV2RW.wrap(offsetBuffer, 0, offsetBuffer.capacity());
mqttOffsetMetadataV2RW.version(OFFSET_METADATA_VERSION);
metadataList.forEach(p -> mqttOffsetMetadataV2RW.appendPacketIds(p.shortValue()));
final MqttSubscribeOffsetMetadataV2FW offsetMetadata = mqttOffsetMetadataV2RW.build();
return new String16FW(BitUtil.toHex(offsetMetadata.buffer().byteArray(),
offsetMetadata.offset(), offsetMetadata.limit()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,17 @@ public void shouldReceiveMessageQoS2() throws Exception
k3po.finish();
}

@Test
@Configuration("proxy.yaml")
@Configure(name = WILL_AVAILABLE_NAME, value = "false")
@Specification({
"${mqtt}/subscribe.qos2.version1.offset.metadata/client",
"${kafka}/subscribe.qos2.version1.offset.metadata/server"})
public void shouldReceiveMessageQoS2WithVersion1OffsetMetadata() throws Exception
{
k3po.finish();
}

@Test
@Configuration("proxy.yaml")
@Configure(name = WILL_AVAILABLE_NAME, value = "false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import io.aklivity.k3po.runtime.lang.el.Function;
import io.aklivity.k3po.runtime.lang.el.spi.FunctionMapperSpi;
import io.aklivity.zilla.specs.binding.mqtt.kafka.internal.types.MqttPublishOffsetMetadataFW;
import io.aklivity.zilla.specs.binding.mqtt.kafka.internal.types.MqttSubscribeOffsetMetadataFW;
import io.aklivity.zilla.specs.binding.mqtt.kafka.internal.types.MqttSubscribeOffsetMetadataV2FW;

public final class MqttKafkaFunctions
{
Expand All @@ -40,9 +40,9 @@ public static MqttPublishOffsetMetadataBuilder publishMetadata()

public static final class MqttSubscribeOffsetMetadataBuilder
{
private final MqttSubscribeOffsetMetadataFW.Builder offsetMetadataRW = new MqttSubscribeOffsetMetadataFW.Builder();
private final MqttSubscribeOffsetMetadataV2FW.Builder offsetMetadataRW = new MqttSubscribeOffsetMetadataV2FW.Builder();

byte version = 1;
byte version = 2;


private MqttSubscribeOffsetMetadataBuilder()
Expand All @@ -61,7 +61,7 @@ public MqttSubscribeOffsetMetadataBuilder metadata(

public String build()
{
final MqttSubscribeOffsetMetadataFW offsetMetadata = offsetMetadataRW.build();
final MqttSubscribeOffsetMetadataV2FW offsetMetadata = offsetMetadataRW.build();
return BitUtil.toHex(offsetMetadata.buffer().byteArray(), offsetMetadata.offset(), offsetMetadata.limit());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,20 @@

scope mqtt_kafka
{
struct MqttSubscribeOffsetMetadata
struct MqttSubscribeOffsetMetadataV1
{
uint8 version = 1;
int8 length;
int16[length] packetIds = null;
}

struct MqttSubscribeOffsetMetadataV2
{
uint8 version = 2;
int16 length;
int16[length] packetIds = null;
}

struct MqttPublishOffsetMetadata
{
uint8 version = 1;
Expand Down
Loading