From 7277872d642aec717f13f3d50f08db465a14a12c Mon Sep 17 00:00:00 2001 From: Vera Xia Date: Wed, 23 Jul 2025 17:04:35 -0700 Subject: [PATCH 1/5] handle ack packet --- AWSIoTPythonSDK/core/protocol/mqtt_core.py | 22 +++++++++++++++++--- AWSIoTPythonSDK/core/protocol/paho/client.py | 3 +++ 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/AWSIoTPythonSDK/core/protocol/mqtt_core.py b/AWSIoTPythonSDK/core/protocol/mqtt_core.py index fbdd6bf..6fe686d 100644 --- a/AWSIoTPythonSDK/core/protocol/mqtt_core.py +++ b/AWSIoTPythonSDK/core/protocol/mqtt_core.py @@ -29,6 +29,7 @@ from AWSIoTPythonSDK.core.protocol.internal.defaults import ALPN_PROTCOLS from AWSIoTPythonSDK.core.protocol.internal.events import FixedEventMids from AWSIoTPythonSDK.core.protocol.paho.client import MQTT_ERR_SUCCESS +from AWSIoTPythonSDK.core.protocol.paho.client import MQTT_ERR_SUBACK_ERROR from AWSIoTPythonSDK.exception.AWSIoTExceptions import connectError from AWSIoTPythonSDK.exception.AWSIoTExceptions import connectTimeoutException from AWSIoTPythonSDK.exception.AWSIoTExceptions import disconnectError @@ -58,6 +59,12 @@ from queue import Queue +class AckPacket(object): + def __init__(self): + self.event = Event() + self.data = None + + class MqttCore(object): _logger = logging.getLogger(__name__) @@ -298,12 +305,15 @@ def subscribe(self, topic, qos, message_callback=None): if ClientStatus.STABLE != self._client_status.get_status(): self._handle_offline_request(RequestTypes.SUBSCRIBE, (topic, qos, message_callback, None)) else: - event = Event() - rc, mid = self._subscribe_async(topic, qos, self._create_blocking_ack_callback(event), message_callback) - if not event.wait(self._operation_timeout_sec): + ack = AckPacket() + rc, mid = self._subscribe_async(topic, qos, self._create_blocking_ack_callback_ret(ack), message_callback) + if not ack.event.wait(self._operation_timeout_sec): self._internal_async_client.remove_event_callback(mid) self._logger.error("Subscribe timed out") raise subscribeTimeoutException() + if ack.data[0] == MQTT_ERR_SUBACK_ERROR: + self._logger.error(f"Subscribe error: {ack.data}") + raise subscribeError(ack.data) ret = True return ret @@ -361,6 +371,12 @@ def ack_callback(mid, data=None): event.set() return ack_callback + def _create_blocking_ack_callback_ret(self, ack: AckPacket): + def ack_callback(mid, data=None): + ack.data = data + ack.event.set() + return ack_callback + def _handle_offline_request(self, type, data): self._logger.info("Offline request detected!") offline_request = QueueableRequest(type, data) diff --git a/AWSIoTPythonSDK/core/protocol/paho/client.py b/AWSIoTPythonSDK/core/protocol/paho/client.py index 0b637c5..7676d83 100755 --- a/AWSIoTPythonSDK/core/protocol/paho/client.py +++ b/AWSIoTPythonSDK/core/protocol/paho/client.py @@ -137,6 +137,9 @@ MSG_QUEUEING_DROP_OLDEST = 0 MSG_QUEUEING_DROP_NEWEST = 1 +# Packet Error Codes +MQTT_ERR_SUBACK_ERROR = 0x80 + if sys.version_info[0] < 3: sockpair_data = "0" else: From deb7e1527da5e50e694f125758457f4e89a6bb06 Mon Sep 17 00:00:00 2001 From: Vera Xia Date: Thu, 24 Jul 2025 09:42:20 -0700 Subject: [PATCH 2/5] update suback packet --- AWSIoTPythonSDK/core/protocol/mqtt_core.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/AWSIoTPythonSDK/core/protocol/mqtt_core.py b/AWSIoTPythonSDK/core/protocol/mqtt_core.py index 6fe686d..007b4e8 100644 --- a/AWSIoTPythonSDK/core/protocol/mqtt_core.py +++ b/AWSIoTPythonSDK/core/protocol/mqtt_core.py @@ -59,7 +59,7 @@ from queue import Queue -class AckPacket(object): +class SubackPacket(object): def __init__(self): self.event = Event() self.data = None @@ -305,15 +305,15 @@ def subscribe(self, topic, qos, message_callback=None): if ClientStatus.STABLE != self._client_status.get_status(): self._handle_offline_request(RequestTypes.SUBSCRIBE, (topic, qos, message_callback, None)) else: - ack = AckPacket() - rc, mid = self._subscribe_async(topic, qos, self._create_blocking_ack_callback_ret(ack), message_callback) - if not ack.event.wait(self._operation_timeout_sec): + suback = SubackPacket() + rc, mid = self._subscribe_async(topic, qos, self._create_blocking_suback_callback(suback), message_callback) + if not suback.event.wait(self._operation_timeout_sec): self._internal_async_client.remove_event_callback(mid) self._logger.error("Subscribe timed out") raise subscribeTimeoutException() - if ack.data[0] == MQTT_ERR_SUBACK_ERROR: - self._logger.error(f"Subscribe error: {ack.data}") - raise subscribeError(ack.data) + if suback.data and suback.data[0] == MQTT_ERR_SUBACK_ERROR: + self._logger.error(f"Subscribe error: {suback.data}") + raise subscribeError(suback.data) ret = True return ret @@ -371,7 +371,7 @@ def ack_callback(mid, data=None): event.set() return ack_callback - def _create_blocking_ack_callback_ret(self, ack: AckPacket): + def _create_blocking_suback_callback(self, ack: SubackPacket): def ack_callback(mid, data=None): ack.data = data ack.event.set() From 179dd155849b3fb1176d79797ec796b3f1439b6a Mon Sep 17 00:00:00 2001 From: Vera Xia Date: Thu, 24 Jul 2025 09:44:05 -0700 Subject: [PATCH 3/5] update runner image --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 195bf2d..8776272 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,7 +15,7 @@ env: jobs: unit-tests: - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest strategy: fail-fast: false From e939ae81e0e8b5b3233aa31037d276d8ec474d65 Mon Sep 17 00:00:00 2001 From: Vera Xia Date: Tue, 29 Jul 2025 15:07:29 -0700 Subject: [PATCH 4/5] improve error structure --- AWSIoTPythonSDK/core/protocol/mqtt_core.py | 11 +++++------ AWSIoTPythonSDK/core/protocol/paho/client.py | 5 +++-- AWSIoTPythonSDK/exception/AWSIoTExceptions.py | 5 +++++ 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/AWSIoTPythonSDK/core/protocol/mqtt_core.py b/AWSIoTPythonSDK/core/protocol/mqtt_core.py index 007b4e8..bff6456 100644 --- a/AWSIoTPythonSDK/core/protocol/mqtt_core.py +++ b/AWSIoTPythonSDK/core/protocol/mqtt_core.py @@ -28,8 +28,7 @@ from AWSIoTPythonSDK.core.protocol.internal.defaults import METRICS_PREFIX from AWSIoTPythonSDK.core.protocol.internal.defaults import ALPN_PROTCOLS from AWSIoTPythonSDK.core.protocol.internal.events import FixedEventMids -from AWSIoTPythonSDK.core.protocol.paho.client import MQTT_ERR_SUCCESS -from AWSIoTPythonSDK.core.protocol.paho.client import MQTT_ERR_SUBACK_ERROR +from AWSIoTPythonSDK.core.protocol.paho.client import MQTT_ERR_SUCCESS, SUBACK_ERROR from AWSIoTPythonSDK.exception.AWSIoTExceptions import connectError from AWSIoTPythonSDK.exception.AWSIoTExceptions import connectTimeoutException from AWSIoTPythonSDK.exception.AWSIoTExceptions import disconnectError @@ -42,7 +41,7 @@ from AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeQueueDisabledException from AWSIoTPythonSDK.exception.AWSIoTExceptions import unsubscribeQueueFullException from AWSIoTPythonSDK.exception.AWSIoTExceptions import unsubscribeQueueDisabledException -from AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeError +from AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeError, subackError from AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeTimeoutException from AWSIoTPythonSDK.exception.AWSIoTExceptions import unsubscribeError from AWSIoTPythonSDK.exception.AWSIoTExceptions import unsubscribeTimeoutException @@ -311,9 +310,9 @@ def subscribe(self, topic, qos, message_callback=None): self._internal_async_client.remove_event_callback(mid) self._logger.error("Subscribe timed out") raise subscribeTimeoutException() - if suback.data and suback.data[0] == MQTT_ERR_SUBACK_ERROR: - self._logger.error(f"Subscribe error: {suback.data}") - raise subscribeError(suback.data) + if suback.data and suback.data[0] == SUBACK_ERROR: + self._logger.error(f"Suback error return code: {suback.data[0]}") + raise subackError(suback=suback.data) ret = True return ret diff --git a/AWSIoTPythonSDK/core/protocol/paho/client.py b/AWSIoTPythonSDK/core/protocol/paho/client.py index 7676d83..1c60c81 100755 --- a/AWSIoTPythonSDK/core/protocol/paho/client.py +++ b/AWSIoTPythonSDK/core/protocol/paho/client.py @@ -97,6 +97,9 @@ CONNACK_REFUSED_BAD_USERNAME_PASSWORD = 4 CONNACK_REFUSED_NOT_AUTHORIZED = 5 +# SUBACK codes +SUBACK_ERROR = 0x80 + # Connection state mqtt_cs_new = 0 mqtt_cs_connected = 1 @@ -137,8 +140,6 @@ MSG_QUEUEING_DROP_OLDEST = 0 MSG_QUEUEING_DROP_NEWEST = 1 -# Packet Error Codes -MQTT_ERR_SUBACK_ERROR = 0x80 if sys.version_info[0] < 3: sockpair_data = "0" diff --git a/AWSIoTPythonSDK/exception/AWSIoTExceptions.py b/AWSIoTPythonSDK/exception/AWSIoTExceptions.py index 0de5401..d5b6d63 100755 --- a/AWSIoTPythonSDK/exception/AWSIoTExceptions.py +++ b/AWSIoTPythonSDK/exception/AWSIoTExceptions.py @@ -79,6 +79,11 @@ class subscribeError(operationError.operationError): def __init__(self, errorCode): self.message = "Subscribe Error: " + str(errorCode) +class subackError(operationError.operationError): + def __init__(self, suback=None): + self.message = "Received Error suback. Subscription failed." + self.suback = suback + class subscribeQueueFullException(operationError.operationError): def __init__(self): From 40053c2bc0a12be5706e130893a723d2bb04a6fe Mon Sep 17 00:00:00 2001 From: Vera Xia Date: Wed, 30 Jul 2025 14:08:10 -0700 Subject: [PATCH 5/5] add suback test --- test/core/protocol/test_mqtt_core.py | 33 +++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/test/core/protocol/test_mqtt_core.py b/test/core/protocol/test_mqtt_core.py index 8469ea6..3cb0cb0 100644 --- a/test/core/protocol/test_mqtt_core.py +++ b/test/core/protocol/test_mqtt_core.py @@ -1,5 +1,6 @@ import AWSIoTPythonSDK from AWSIoTPythonSDK.core.protocol.mqtt_core import MqttCore +from AWSIoTPythonSDK.core.protocol.mqtt_core import SubackPacket from AWSIoTPythonSDK.core.protocol.internal.clients import InternalAsyncMqttClient from AWSIoTPythonSDK.core.protocol.internal.clients import ClientStatusContainer from AWSIoTPythonSDK.core.protocol.internal.clients import ClientStatus @@ -20,6 +21,7 @@ from AWSIoTPythonSDK.exception.AWSIoTExceptions import publishQueueFullException from AWSIoTPythonSDK.exception.AWSIoTExceptions import publishQueueDisabledException from AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeError +from AWSIoTPythonSDK.exception.AWSIoTExceptions import subackError from AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeTimeoutException from AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeQueueFullException from AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeQueueDisabledException @@ -29,6 +31,7 @@ from AWSIoTPythonSDK.exception.AWSIoTExceptions import unsubscribeQueueDisabledException from AWSIoTPythonSDK.core.protocol.paho.client import MQTT_ERR_SUCCESS from AWSIoTPythonSDK.core.protocol.paho.client import MQTT_ERR_ERRNO +from AWSIoTPythonSDK.core.protocol.paho.client import SUBACK_ERROR from AWSIoTPythonSDK.core.protocol.paho.client import MQTTv311 from AWSIoTPythonSDK.core.protocol.internal.defaults import ALPN_PROTCOLS try: @@ -61,6 +64,7 @@ KEY_EXPECTED_QUEUE_APPEND_RESULT = "ExpectedQueueAppendResult" KEY_EXPECTED_REQUEST_MID_OVERRIDE = "ExpectedRequestMidOverride" KEY_EXPECTED_REQUEST_TIMEOUT = "ExpectedRequestTimeout" +KEY_EXPECTED_ACK_RESULT = "ExpectedAckPacketResult" SUCCESS_RC_EXPECTED_VALUES = { KEY_EXPECTED_REQUEST_RC : DUMMY_SUCCESS_RC } @@ -73,6 +77,10 @@ NO_TIMEOUT_EXPECTED_VALUES = { KEY_EXPECTED_REQUEST_TIMEOUT : False } +ERROR_SUBACK_EXPECTED_VALUES = { + KEY_EXPECTED_ACK_RESULT : (SUBACK_ERROR, None) +} + QUEUED_EXPECTED_VALUES = { KEY_EXPECTED_QUEUE_APPEND_RESULT : AppendResults.APPEND_SUCCESS } @@ -121,6 +129,9 @@ def setup_class(cls): RequestTypes.SUBSCRIBE: subscribeError, RequestTypes.UNSUBSCRIBE: unsubscribeError } + cls.ack_error = { + RequestTypes.SUBSCRIBE : subackError, + } cls.request_queue_full = { RequestTypes.PUBLISH : publishQueueFullException, RequestTypes.SUBSCRIBE: subscribeQueueFullException, @@ -518,6 +529,9 @@ def test_subscribe_success(self): def test_subscribe_timeout(self): self._internal_test_sync_api_with(RequestTypes.SUBSCRIBE, TIMEOUT_EXPECTED_VALUES) + + def test_subscribe_error_suback(self): + self._internal_test_sync_api_with(RequestTypes.SUBSCRIBE, ERROR_SUBACK_EXPECTED_VALUES) def test_subscribe_queued(self): self._internal_test_sync_api_with(RequestTypes.SUBSCRIBE, QUEUED_EXPECTED_VALUES) @@ -547,6 +561,7 @@ def _internal_test_sync_api_with(self, request_type, expected_values): expected_request_mid = expected_values.get(KEY_EXPECTED_REQUEST_MID_OVERRIDE) expected_timeout = expected_values.get(KEY_EXPECTED_REQUEST_TIMEOUT) expected_append_result = expected_values.get(KEY_EXPECTED_QUEUE_APPEND_RESULT) + expected_suback_result = expected_values.get(KEY_EXPECTED_ACK_RESULT) if expected_request_mid is None: expected_request_mid = DUMMY_REQUEST_MID @@ -562,7 +577,16 @@ def _internal_test_sync_api_with(self, request_type, expected_values): self.invoke_mqtt_core_sync_api[request_type](self, message_callback) else: self.python_event_mock.wait.return_value = True - assert self.invoke_mqtt_core_sync_api[request_type](self, message_callback) is True + if expected_suback_result is not None: + self._use_mock_python_suback() + # mock the suback with expected suback result + self.python_suback_mock.data = expected_suback_result + if expected_suback_result[0] == SUBACK_ERROR: + with pytest.raises(self.ack_error[request_type]): + self.invoke_mqtt_core_sync_api[request_type](self, message_callback) + self.python_suback_patcher.stop() + else: + assert self.invoke_mqtt_core_sync_api[request_type](self, message_callback) is True if expected_append_result is not None: self.client_status_mock.get_status.return_value = ClientStatus.ABNORMAL_DISCONNECT @@ -583,3 +607,10 @@ def _use_mock_python_event(self): self.python_event_constructor = self.python_event_patcher.start() self.python_event_mock = MagicMock() self.python_event_constructor.return_value = self.python_event_mock + + # Create a SubackPacket mock, which would mock the data in SubackPacket + def _use_mock_python_suback(self): + self.python_suback_patcher = patch(PATCH_MODULE_LOCATION + "SubackPacket", spec=SubackPacket) + self.python_suback_constructor = self.python_suback_patcher.start() + self.python_suback_mock = MagicMock() + self.python_suback_constructor.return_value = self.python_suback_mock