Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Address feedback
  • Loading branch information
bmaidics committed Jan 14, 2024
commit babb66933b0492da5d79f3adf3d3d45cf74e1ea5
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.IntConsumer;
import java.util.function.LongFunction;
import java.util.function.LongUnaryOperator;
import java.util.function.Supplier;
Expand Down Expand Up @@ -82,6 +83,7 @@
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttBeginExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttDataExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttFlushExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttOffsetMetadataFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttOffsetStateFlags;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttSubscribeBeginExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.stream.MqttSubscribeFlushExFW;
Expand All @@ -108,6 +110,7 @@ public class MqttKafkaSubscribeFactory implements MqttKafkaStreamFactory
private static final int DATA_FLAG_FIN = 0x01;
private static final OctetsFW EMPTY_OCTETS = new OctetsFW().wrap(new UnsafeBuffer(new byte[0]), 0, 0);
private static final String16FW EMPTY_STRING = new String16FW("");
private static final int OFFSET_METADATA_VERSION = 1;

private final OctetsFW emptyRO = new OctetsFW().wrap(new UnsafeBuffer(0L, 0), 0, 0);
private final BeginFW beginRO = new BeginFW();
Expand All @@ -128,6 +131,7 @@ public class MqttKafkaSubscribeFactory implements MqttKafkaStreamFactory
private final WindowFW.Builder windowRW = new WindowFW.Builder();
private final ResetFW.Builder resetRW = new ResetFW.Builder();
private final MqttSubscribeMessageFW.Builder mqttSubscribeMessageRW = new MqttSubscribeMessageFW.Builder();
private final MqttOffsetMetadataFW.Builder mqttOffsetMetadataRW = new MqttOffsetMetadataFW.Builder();

private final ExtensionFW extensionRO = new ExtensionFW();
private final MqttBeginExFW mqttBeginExRO = new MqttBeginExFW();
Expand All @@ -137,6 +141,7 @@ public class MqttKafkaSubscribeFactory implements MqttKafkaStreamFactory
private final KafkaFlushExFW kafkaFlushExRO = new KafkaFlushExFW();
private final KafkaHeaderFW kafkaHeaderRO = new KafkaHeaderFW();
private final MqttSubscribeMessageFW mqttSubscribeMessageRO = new MqttSubscribeMessageFW();
private final MqttOffsetMetadataFW mqttOffsetMetadataRO = new MqttOffsetMetadataFW();

private final MqttDataExFW.Builder mqttDataExRW = new MqttDataExFW.Builder();
private final MqttFlushExFW.Builder mqttFlushExRW = new MqttFlushExFW.Builder();
Expand Down Expand Up @@ -1828,35 +1833,21 @@ private IntArrayList stringToOffsetMetadataList(
String16FW metadata)
{
final IntArrayList metadataList = new IntArrayList();
int offset = 0;
final DirectBuffer buffer = metadata.value();
byte version = buffer.getByte(offset++);
for (; offset < buffer.capacity(); offset += BitUtil.SIZE_OF_SHORT)
{
metadataList.add((int) buffer.getShort(offset));
}

UnsafeBuffer buffer = new UnsafeBuffer(BitUtil.fromHex(metadata.asString()));
final MqttOffsetMetadataFW offsetMetadata = mqttOffsetMetadataRO.wrap(buffer, 0, buffer.capacity());
offsetMetadata.packetIds().forEachRemaining((IntConsumer) metadataList::add);
return metadataList;
}

private String16FW offsetMetadataListToString(
IntArrayList metadataList)
{
final int length = metadataList.size() * BitUtil.SIZE_OF_SHORT + 1;
final int capacity = BitUtil.SIZE_OF_SHORT + length;
int offset = 0;

offsetBuffer.putShort(offset, (short) length);
offset += BitUtil.SIZE_OF_SHORT;
offsetBuffer.putByte(offset++, (byte) 1);

for (int value : metadataList)
{
offsetBuffer.putShort(offset, (short) value);
offset += BitUtil.SIZE_OF_SHORT;
}

return new String16FW().wrap(offsetBuffer, 0, capacity);
mqttOffsetMetadataRW.wrap(offsetBuffer, 0, offsetBuffer.capacity());
mqttOffsetMetadataRW.version(OFFSET_METADATA_VERSION);
metadataList.forEach(p -> mqttOffsetMetadataRW.appendPacketIds(p.shortValue()));
final MqttOffsetMetadataFW offsetMetadata = mqttOffsetMetadataRW.build();
return new String16FW(BitUtil.toHex(offsetMetadata.buffer().byteArray(),
offsetMetadata.offset(), offsetMetadata.limit()));
}

final class KafkaRetainedProxy extends KafkaProxy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.agrona.BitUtil;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.IntArrayList;
import org.agrona.concurrent.UnsafeBuffer;
import org.kaazing.k3po.lang.el.BytesMatcher;
import org.kaazing.k3po.lang.el.Function;
Expand Down Expand Up @@ -54,6 +53,7 @@
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttDataExFW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttExtensionKind;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttFlushExFW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttOffsetMetadataFW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttOffsetStateFlags;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttPublishBeginExFW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttPublishDataExFW;
Expand Down Expand Up @@ -848,42 +848,29 @@ public byte[] build()

public static final class MqttOffsetMetadataBuilder
{
final MutableDirectBuffer writeBuffer;
final IntArrayList packetIds;
private final MqttOffsetMetadataFW.Builder offsetMetadataRW = new MqttOffsetMetadataFW.Builder();

byte version = 1;


private MqttOffsetMetadataBuilder()
{
writeBuffer = new UnsafeBuffer(new byte[1024 * 8]);
packetIds = new IntArrayList();
MutableDirectBuffer writeBuffer = new UnsafeBuffer(new byte[1024 * 8]);
offsetMetadataRW.wrap(writeBuffer, 0, writeBuffer.capacity());
offsetMetadataRW.version(version);
}

public MqttOffsetMetadataBuilder metadata(
int packetId)
{
packetIds.add(packetId);
offsetMetadataRW.appendPacketIds((short) packetId);
return this;
}

public String build()
{
final int length = packetIds.size() * BitUtil.SIZE_OF_SHORT + 1;
final int capacity = BitUtil.SIZE_OF_SHORT + length;
int offset = 0;

writeBuffer.putShort(offset, (short) length);
offset += BitUtil.SIZE_OF_SHORT;
writeBuffer.putByte(offset++, (byte) 1);

for (int value : packetIds)
{
writeBuffer.putShort(offset, (short) value);
offset += BitUtil.SIZE_OF_SHORT;
}

return new String16FW().wrap(writeBuffer, 0, capacity).asString();
final MqttOffsetMetadataFW offsetMetadata = offsetMetadataRW.build();
return BitUtil.toHex(offsetMetadata.buffer().byteArray(), offsetMetadata.offset(), offsetMetadata.limit());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,5 +248,12 @@ scope mqtt
COMPLETE(0),
INCOMPLETE(1)
}

struct MqttOffsetMetadata
{
uint8 version = 1;
uint8 length;
int16[length] packetIds;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.function.IntConsumer;

import org.agrona.BitUtil;
import org.agrona.DirectBuffer;
Expand All @@ -36,10 +37,10 @@
import io.aklivity.zilla.specs.binding.mqtt.internal.types.MqttSessionSignalType;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.MqttSessionStateFW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.MqttWillMessageFW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.String16FW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttBeginExFW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttDataExFW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttFlushExFW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttOffsetMetadataFW;
import io.aklivity.zilla.specs.binding.mqtt.internal.types.stream.MqttResetExFW;

public class MqttFunctionsTest
Expand Down Expand Up @@ -1274,15 +1275,11 @@ public void shouldEncodeMqttOffsetMetadata()
.build();

final IntArrayList metadataList = new IntArrayList();
int offset = 0;
final DirectBuffer buffer = new String16FW(state).value();
byte version = buffer.getByte(offset++);
for (; offset < buffer.capacity(); offset += BitUtil.SIZE_OF_SHORT)
{
metadataList.add((int) buffer.getShort(offset));
}

assertEquals(1, version);
UnsafeBuffer buffer = new UnsafeBuffer(BitUtil.fromHex(state));
MqttOffsetMetadataFW offsetMetadata = new MqttOffsetMetadataFW().wrap(buffer, 0, buffer.capacity());
offsetMetadata.packetIds().forEachRemaining((IntConsumer) metadataList::add);

assertEquals(1, offsetMetadata.version());
assertEquals(1, (int) metadataList.get(0));
assertEquals(2, (int) metadataList.get(1));
}
Expand Down