Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add 3.1.1 qos1 and 2 support
  • Loading branch information
bmaidics committed Dec 3, 2023
commit 3aa7b53a3da134d2d3e64f09d10d4e162e76a2dc
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.agrona.collections.IntArrayList;

import io.aklivity.zilla.runtime.engine.EngineContext;
import io.aklivity.zilla.runtime.engine.binding.Binding;

import org.agrona.collections.IntArrayList;

public final class MqttBinding implements Binding
{
public static final String NAME = "mqtt";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@

import java.util.EnumMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.agrona.collections.IntArrayList;

import io.aklivity.zilla.runtime.binding.mqtt.internal.stream.MqttClientFactory;
import io.aklivity.zilla.runtime.binding.mqtt.internal.stream.MqttServerFactory;
import io.aklivity.zilla.runtime.binding.mqtt.internal.stream.MqttStreamFactory;
Expand All @@ -32,8 +33,6 @@
import io.aklivity.zilla.runtime.engine.config.BindingConfig;
import io.aklivity.zilla.runtime.engine.config.KindConfig;

import org.agrona.collections.IntArrayList;

final class MqttBindingContext implements BindingContext
{
private final Map<KindConfig, MqttStreamFactory> factories;
Expand Down

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion runtime/binding-mqtt/src/main/zilla/protocol.idl
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,21 @@
octets payload;
}

struct MqttPublishQosV4 extends MqttPacketHeader
{
string16 topicName;
uint16 packetId;
octets payload;
}

struct MqttPublishV5 extends MqttPacketHeader
{
string16 topicName;
MqttProperties properties;
octets payload;
}

struct MqttPublishQos extends MqttPacketHeader
struct MqttPublishQosV5 extends MqttPacketHeader
{
string16 topicName;
uint16 packetId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,26 @@ public void shouldRejectUnroutable() throws Exception
k3po.finish();
}

@Test
@Configuration("server.yaml")
@Specification({
"${net}/publish.qos1.dup.after.puback/client",
"${app}/publish.qos1.dup.after.puback/server"})
public void shouldPublishQoS1Message() throws Exception
{
k3po.finish();
}

@Test
@Configuration("server.yaml")
@Specification({
"${net}/publish.qos2.no.dupicate.before.pubrel/client",
"${app}/publish.qos2.no.dupicate.before.pubrel/server"})
public void shouldPublishQoS2Message() throws Exception
{
k3po.finish();
}

@Before
public void setSubscriptionId()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,127 @@ public void shouldRejectUnroutable() throws Exception
k3po.finish();
}

@Test
@Configuration("server.yaml")
@Specification({
"${net}/subscribe.receive.message.qos1/client",
"${app}/subscribe.receive.message.qos1/server"})
public void shouldReceiveMessageQoS1() throws Exception
{
k3po.finish();
}

@Test
@Configuration("server.yaml")
@Specification({
"${net}/subscribe.receive.message.qos2/client",
"${app}/subscribe.receive.message.qos2/server"})
public void shouldReceiveMessageQoS2() throws Exception
{
k3po.finish();
}

@Test
@Configuration("server.yaml")
@Specification({
"${net}/subscribe.receive.message.qos1/client",
"${app}/subscribe.receive.message.qos1.published.qos2/server"})
public void shouldReceiveMessageQoS1PublishedAsQoS2() throws Exception
{
k3po.finish();
}

@Test
@Configuration("server.yaml")
@Specification({
"${net}/subscribe.one.message/client",
"${app}/subscribe.receive.message.qos0.published.qos1/server"})
public void shouldReceiveMessageQoS0PublishedAsQoS1() throws Exception
{
k3po.finish();
}

@Test
@Configuration("server.yaml")
@Specification({
"${net}/subscribe.one.message/client",
"${app}/subscribe.receive.message.qos0.published.qos2/server"})
public void shouldReceiveMessageQoS0PublishedAsQoS2() throws Exception
{
k3po.finish();
}

@Test
@Configuration("server.yaml")
@Specification({
"${net}/subscribe.receive.messages.mixture.qos/client",
"${app}/subscribe.receive.messages.mixture.qos/server"})
public void shouldReceiveMessagesMixtureQos() throws Exception
{
k3po.finish();
}

@Test
@Configuration("server.yaml")
@Specification({
"${net}/subscribe.receive.message.overlapping.wildcard.mixed.qos/client",
"${app}/subscribe.receive.message.overlapping.wildcard.mixed.qos/server"})
public void shouldReceiveMessageOverlappingWildcardMixedQos() throws Exception
{
k3po.finish();
}

@Test
@Configuration("server.yaml")
@Specification({
"${net}/subscribe.replay.retained.message.qos1/client",
"${app}/subscribe.replay.retained.message.qos1.v4/server"})
public void shouldReplayRetainedQos1() throws Exception
{
k3po.finish();
}

@Test
@Configuration("server.yaml")
@Specification({
"${net}/subscribe.replay.retained.message.qos2/client",
"${app}/subscribe.replay.retained.message.qos2.v4/server"})
public void shouldReplayRetainedQos2() throws Exception
{
k3po.finish();
}


@Test
@Configuration("server.yaml")
@Specification({
"${net}/subscribe.reconnect.replay.qos1.unacked.message/client",
"${app}/subscribe.reconnect.replay.qos1.unacked.message/server"})
public void shouldReplayUnackedQoS1MessageAtReconnect() throws Exception
{
k3po.finish();
}

@Test
@Configuration("server.yaml")
@Specification({
"${net}/subscribe.reconnect.replay.qos2.unreceived.message/client",
"${app}/subscribe.reconnect.replay.qos2.unreceived.message/server"})
public void shouldReplayUnreceivedQoS2MessageAtReconnect() throws Exception
{
k3po.finish();
}

@Test
@Configuration("server.yaml")
@Specification({
"${net}/subscribe.reconnect.replay.qos2.incomplete.message/client",
"${app}/subscribe.reconnect.replay.qos2.incomplete.message/server"})
public void shouldReplayIncompleteQoS2MessageAtReconnect() throws Exception
{
k3po.finish();
}

@Before
public void setSubscriptionId()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#
# Copyright 2021-2023 Aklivity Inc.
#
# Aklivity licenses this file to you under the Apache License,
# version 2.0 (the "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#

connect "zilla://streams/app0"
option zilla:window 8192
option zilla:transmission "duplex"

write zilla:begin.ext ${mqtt:beginEx()
.typeId(zilla:id("mqtt"))
.session()
.flags("CLEAN_START")
.clientId("client")
.build()
.build()}

read zilla:begin.ext ${mqtt:matchBeginEx()
.typeId(zilla:id("mqtt"))
.session()
.flags("CLEAN_START")
.qosMax(2)
.packetSizeMax(66560)
.capabilities("RETAIN", "WILDCARD", "SUBSCRIPTION_IDS", "SHARED_SUBSCRIPTIONS")
.clientId("client")
.build()
.build()}

connected

read zilla:data.empty

write zilla:data.ext ${mqtt:dataEx()
.typeId(zilla:id("mqtt"))
.session()
.kind("STATE")
.build()
.build()}

write ${mqtt:session()
.subscription("sensor/one", 1, "AT_LEAST_ONCE", "SEND_RETAINED")
.build()}
write flush

read ${mqtt:session()
.subscription("sensor/one", 1, "AT_LEAST_ONCE", "SEND_RETAINED")
.build()}
read notify RECEIVED_SESSION_STATE


connect await RECEIVED_SESSION_STATE
"zilla://streams/app0"
option zilla:window 8192
option zilla:transmission "duplex"

write zilla:begin.ext ${mqtt:beginEx()
.typeId(zilla:id("mqtt"))
.subscribe()
.clientId("client")
.filter("sensor/one", 1, "AT_LEAST_ONCE", "SEND_RETAINED")
.build()
.build()}

connected


connect await RECEIVED_SESSION_STATE
"zilla://streams/app0"
option zilla:window 8192
option zilla:transmission "duplex"

write zilla:begin.ext ${mqtt:beginEx()
.typeId(zilla:id("mqtt"))
.subscribe()
.clientId("client")
.qos("AT_LEAST_ONCE")
.filter("sensor/one", 1, "AT_LEAST_ONCE", "SEND_RETAINED")
.build()
.build()}

connected

read zilla:data.ext ${mqtt:matchDataEx()
.typeId(zilla:id("mqtt"))
.subscribe()
.topic("sensor/one")
.packetId(1)
.qos("AT_LEAST_ONCE")
.flags("RETAIN")
.subscriptionId(1)
.format("TEXT")
.build()
.build()}
read "message"

write advise zilla:flush ${mqtt:flushEx()
.typeId(zilla:id("mqtt"))
.subscribe()
.qos("AT_LEAST_ONCE")
.packetId(1)
.build()
.build()}
Loading