From 7e8dec5cf071f3e64a3d2a98a54216049019757c Mon Sep 17 00:00:00 2001 From: bmaidics Date: Wed, 13 Nov 2024 20:40:15 +0100 Subject: [PATCH] Fix MQTT binding selecting incorrect publish stream --- .../internal/stream/MqttServerFactory.java | 2 +- .../internal/stream/server/v4/PublishIT.java | 10 ++ .../internal/stream/server/v5/PublishIT.java | 10 ++ .../client.rpt | 105 ++++++++++++++++++ .../server.rpt | 101 +++++++++++++++++ .../client.rpt | 45 ++++++++ .../server.rpt | 46 ++++++++ .../client.rpt | 51 +++++++++ .../server.rpt | 52 +++++++++ .../mqtt/streams/application/PublishIT.java | 9 ++ .../mqtt/streams/network/v4/PublishIT.java | 9 ++ .../mqtt/streams/network/v5/PublishIT.java | 9 ++ 12 files changed, 448 insertions(+), 1 deletion(-) create mode 100644 specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.retained.multiple.topic/client.rpt create mode 100644 specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.retained.multiple.topic/server.rpt create mode 100644 specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v4/publish.retained.multiple.topic/client.rpt create mode 100644 specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v4/publish.retained.multiple.topic/server.rpt create mode 100644 specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.retained.multiple.topic/client.rpt create mode 100644 specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.retained.multiple.topic/server.rpt diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java index 1d96c03d96..8c43c9781a 100644 --- a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java @@ -1311,8 +1311,8 @@ private int decodePublishV4( { break decode; } - server.decodePublisherKey = topicKey; } + server.decodePublisherKey = topicKey; server.decodedQos = mqttPublishHelper.qos; server.decodedRetained = mqttPublishHelper.retained; diff --git a/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v4/PublishIT.java b/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v4/PublishIT.java index 70e882cd0c..9a9fa38a7f 100644 --- a/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v4/PublishIT.java +++ b/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v4/PublishIT.java @@ -89,6 +89,16 @@ public void shouldPublishRetainedMessage() throws Exception k3po.finish(); } + @Test + @Configuration("server.yaml") + @Specification({ + "${net}/publish.retained.multiple.topic/client", + "${app}/publish.retained.multiple.topic/server"}) + public void shouldPublishRetainedMessageMultipleTopic() throws Exception + { + k3po.finish(); + } + @Test @Configuration("server.yaml") @Specification({ diff --git a/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v5/PublishIT.java b/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v5/PublishIT.java index ddd5776f9d..8b33e513f8 100644 --- a/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v5/PublishIT.java +++ b/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/v5/PublishIT.java @@ -136,6 +136,16 @@ public void shouldPublishRetainedMessage() throws Exception k3po.finish(); } + @Test + @Configuration("server.yaml") + @Specification({ + "${net}/publish.retained.multiple.topic/client", + "${app}/publish.retained.multiple.topic/server"}) + public void shouldPublishRetainedMessageMultipleTopic() throws Exception + { + k3po.finish(); + } + @Test @Configuration("server.yaml") @Specification({ diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.retained.multiple.topic/client.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.retained.multiple.topic/client.rpt new file mode 100644 index 0000000000..ad4bc5146f --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.retained.multiple.topic/client.rpt @@ -0,0 +1,105 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${mqtt:beginEx() + .typeId(zilla:id("mqtt")) + .session() + .flags("CLEAN_START") + .clientId("client") + .build() + .build()} + +read zilla:begin.ext ${mqtt:matchBeginEx() + .typeId(zilla:id("mqtt")) + .session() + .flags("CLEAN_START") + .subscribeQosMax(2) + .publishQosMax(2) + .packetSizeMax(66560) + .capabilities("RETAIN", "WILDCARD", "SUBSCRIPTION_IDS", "SHARED_SUBSCRIPTIONS") + .clientId("client") + .build() + .build()} + +connected + +read zilla:data.empty +read notify RECEIVED_SESSION_STATE + + +connect await RECEIVED_SESSION_STATE + "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${mqtt:beginEx() + .typeId(zilla:id("mqtt")) + .publish() + .clientId("client") + .topic("sensor/one") + .flags("RETAIN") + .build() + .build()} + +connected + +write zilla:data.ext ${mqtt:dataEx() + .typeId(zilla:id("mqtt")) + .publish() + .flags("RETAIN") + .build() + .build()} + +write "message" + +write zilla:data.ext ${mqtt:dataEx() + .typeId(zilla:id("mqtt")) + .publish() + .flags("RETAIN") + .build() + .build()} + +write "message2" + + +connect await RECEIVED_SESSION_STATE + "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${mqtt:beginEx() + .typeId(zilla:id("mqtt")) + .publish() + .clientId("client") + .topic("sensor/two") + .flags("RETAIN") + .build() + .build()} + +connected + +write zilla:data.ext ${mqtt:dataEx() + .typeId(zilla:id("mqtt")) + .publish() + .flags("RETAIN") + .build() + .build()} + +write "message" diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.retained.multiple.topic/server.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.retained.multiple.topic/server.rpt new file mode 100644 index 0000000000..dbb0a52519 --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.retained.multiple.topic/server.rpt @@ -0,0 +1,101 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +accept "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "duplex" + +accepted + +read zilla:begin.ext ${mqtt:matchBeginEx() + .typeId(zilla:id("mqtt")) + .session() + .flags("CLEAN_START") + .clientId("client") + .build() + .build()} + +write zilla:begin.ext ${mqtt:beginEx() + .typeId(zilla:id("mqtt")) + .session() + .flags("CLEAN_START") + .subscribeQosMax(2) + .publishQosMax(2) + .packetSizeMax(66560) + .capabilities("RETAIN", "WILDCARD", "SUBSCRIPTION_IDS", "SHARED_SUBSCRIPTIONS") + .clientId("client") + .build() + .build()} + +connected + +write zilla:data.empty +write flush + + +accepted + +read zilla:begin.ext ${mqtt:matchBeginEx() + .typeId(zilla:id("mqtt")) + .publish() + .clientId("client") + .topic("sensor/one") + .flags("RETAIN") + .build() + .build()} + +connected + +read zilla:data.ext ${mqtt:matchDataEx() + .typeId(zilla:id("mqtt")) + .publish() + .flags("RETAIN") + .build() + .build()} + +read "message" + +read zilla:data.ext ${mqtt:matchDataEx() + .typeId(zilla:id("mqtt")) + .publish() + .flags("RETAIN") + .build() + .build()} + +read "message2" + + +accepted + +read zilla:begin.ext ${mqtt:matchBeginEx() + .typeId(zilla:id("mqtt")) + .publish() + .clientId("client") + .topic("sensor/two") + .flags("RETAIN") + .build() + .build()} + +connected + +read zilla:data.ext ${mqtt:matchDataEx() + .typeId(zilla:id("mqtt")) + .publish() + .flags("RETAIN") + .build() + .build()} + +read "message" diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v4/publish.retained.multiple.topic/client.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v4/publish.retained.multiple.topic/client.rpt new file mode 100644 index 0000000000..e76f55d0a0 --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v4/publish.retained.multiple.topic/client.rpt @@ -0,0 +1,45 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/net0" + option zilla:window 8192 + option zilla:transmission "duplex" + option zilla:byteorder "network" + +connected + +write [0x10 0x12] # CONNECT + [0x00 0x04] "MQTT" # protocol name + [0x04] # protocol version + [0x02] # flags = clean start + [0x00 0x3c] # keep alive = 60s + [0x00 0x06] "client" # client id + +read [0x20 0x02] # CONNACK + [0x00] # flags = none + [0x00] # reason code + +write [0x31 0x13] # PUBLISH flags = at-most-once, retain + [0x00 0x0a] "sensor/one" # topic name + "message" # payload + +write [0x31 0x13] # PUBLISH flags = at-most-once, retain + [0x00 0x0a] "sensor/two" # topic name + "message" # payload + +write [0x31 0x14] # PUBLISH flags = at-most-once, retain + [0x00 0x0a] "sensor/one" # topic name + "message2" # payload diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v4/publish.retained.multiple.topic/server.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v4/publish.retained.multiple.topic/server.rpt new file mode 100644 index 0000000000..3ca6882723 --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v4/publish.retained.multiple.topic/server.rpt @@ -0,0 +1,46 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +accept "zilla://streams/net0" + option zilla:window 8192 + option zilla:transmission "duplex" + option zilla:byteorder "network" + +accepted +connected + +read [0x10 0x12] # CONNECT + [0x00 0x04] "MQTT" # protocol name + [0x04] # protocol version + [0x02] # flags = clean start + [0x00 0x3c] # keep alive = 60s + [0x00 0x06] "client" # client id + +write [0x20 0x02] # CONNACK + [0x00] # flags = none + [0x00] # reason code + +read [0x31 0x13] # PUBLISH flags = at-most-once, retain + [0x00 0x0a] "sensor/one" # topic name + "message" # payload + +read [0x31 0x13] # PUBLISH flags = at-most-once, retain + [0x00 0x0a] "sensor/two" # topic name + "message" # payload + +read [0x31 0x14] # PUBLISH flags = at-most-once, retain + [0x00 0x0a] "sensor/one" # topic name + "message2" # payload diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.retained.multiple.topic/client.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.retained.multiple.topic/client.rpt new file mode 100644 index 0000000000..70bd81326b --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.retained.multiple.topic/client.rpt @@ -0,0 +1,51 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/net0" + option zilla:window 8192 + option zilla:transmission "duplex" + option zilla:byteorder "network" + +connected + +write [0x10 0x18] # CONNECT + [0x00 0x04] "MQTT" # protocol name + [0x05] # protocol version + [0x02] # flags = clean start + [0x00 0x3c] # keep alive = 60s + [0x05] # properties + [0x27] 66560 # maximum packet size = 66560 + [0x00 0x06] "client" # client id + +read [0x20 0x03] # CONNACK + [0x00] # flags = none + [0x00] # reason code + [0x00] # properties + +write [0x31 0x14] # PUBLISH flags = at-most-once, retain + [0x00 0x0a] "sensor/one" # topic name + [0x00] # properties + "message" # payload + +write [0x31 0x14] # PUBLISH flags = at-most-once, retain + [0x00 0x0a] "sensor/two" # topic name + [0x00] # properties + "message" # payload + +write [0x31 0x15] # PUBLISH flags = at-most-once, retain + [0x00 0x0a] "sensor/one" # topic name + [0x00] # properties + "message2" # payload diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.retained.multiple.topic/server.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.retained.multiple.topic/server.rpt new file mode 100644 index 0000000000..0cceee278e --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/publish.retained.multiple.topic/server.rpt @@ -0,0 +1,52 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +accept "zilla://streams/net0" + option zilla:window 8192 + option zilla:transmission "duplex" + option zilla:byteorder "network" + +accepted +connected + +read [0x10 0x18] # CONNECT + [0x00 0x04] "MQTT" # protocol name + [0x05] # protocol version + [0x02] # flags = clean start + [0x00 0x3c] # keep alive = 60s + [0x05] # properties + [0x27] 66560 # maximum packet size = 66560 + [0x00 0x06] "client" # client id + +write [0x20 0x03] # CONNACK + [0x00] # flags = none + [0x00] # reason code + [0x00] # properties + +read [0x31 0x14] # PUBLISH flags = at-most-once, retain + [0x00 0x0a] "sensor/one" # topic name + [0x00] # properties + "message" # payload + +read [0x31 0x14] # PUBLISH flags = at-most-once, retain + [0x00 0x0a] "sensor/two" # topic name + [0x00] # properties + "message" # payload + +read [0x31 0x15] # PUBLISH flags = at-most-once, retain + [0x00 0x0a] "sensor/one" # topic name + [0x00] # properties + "message2" # payload diff --git a/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/application/PublishIT.java b/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/application/PublishIT.java index 02e8498a9d..3b3a33ef55 100644 --- a/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/application/PublishIT.java +++ b/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/application/PublishIT.java @@ -173,6 +173,15 @@ public void shouldPublishRetainedMessage() throws Exception k3po.finish(); } + @Test + @Specification({ + "${app}/publish.retained.multiple.topic/client", + "${app}/publish.retained.multiple.topic/server"}) + public void shouldPublishRetainedMessageMultipleTopic() throws Exception + { + k3po.finish(); + } + @Test @Specification({ "${app}/publish.empty.retained.message/client", diff --git a/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/network/v4/PublishIT.java b/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/network/v4/PublishIT.java index f61f3453ac..9f3e53972f 100644 --- a/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/network/v4/PublishIT.java +++ b/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/network/v4/PublishIT.java @@ -124,6 +124,15 @@ public void shouldPublishRetainedMessage() throws Exception k3po.finish(); } + @Test + @Specification({ + "${net}/publish.retained.multiple.topic/client", + "${net}/publish.retained.multiple.topic/server"}) + public void shouldPublishRetainedMessageMultipleTopic() throws Exception + { + k3po.finish(); + } + @Test @Specification({ "${net}/publish.empty.retained.message/client", diff --git a/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/PublishIT.java b/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/PublishIT.java index 570e46ae19..cb47367fe3 100644 --- a/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/PublishIT.java +++ b/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/network/v5/PublishIT.java @@ -293,6 +293,15 @@ public void shouldPublishRetainedMessage() throws Exception k3po.finish(); } + @Test + @Specification({ + "${net}/publish.retained.multiple.topic/client", + "${net}/publish.retained.multiple.topic/server"}) + public void shouldPublishRetainedMessageMultipleTopic() throws Exception + { + k3po.finish(); + } + @Test @Specification({ "${net}/publish.empty.retained.message/client",