Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,23 @@

jobs:
unit-tests:
runs-on: ubuntu-20.04
runs-on: ubuntu-latest
strategy:
fail-fast: false

steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.8'
- name: Unit tests
run: |
python3 setup.py install
pip install pytest
pip install mock
python3 -m pytest test

integration-tests:

Check warning

Code scanning / CodeQL

Workflow does not contain permissions Medium

Actions job or workflow does not limit the permissions of the GITHUB_TOKEN. Consider setting an explicit permissions block, using the following as a minimal starting point: {contents: read}
runs-on: ubuntu-latest
permissions:
id-token: write # This is required for requesting the JWT
Expand Down
25 changes: 20 additions & 5 deletions AWSIoTPythonSDK/core/protocol/mqtt_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from AWSIoTPythonSDK.core.protocol.internal.defaults import METRICS_PREFIX
from AWSIoTPythonSDK.core.protocol.internal.defaults import ALPN_PROTCOLS
from AWSIoTPythonSDK.core.protocol.internal.events import FixedEventMids
from AWSIoTPythonSDK.core.protocol.paho.client import MQTT_ERR_SUCCESS
from AWSIoTPythonSDK.core.protocol.paho.client import MQTT_ERR_SUCCESS, SUBACK_ERROR
from AWSIoTPythonSDK.exception.AWSIoTExceptions import connectError
from AWSIoTPythonSDK.exception.AWSIoTExceptions import connectTimeoutException
from AWSIoTPythonSDK.exception.AWSIoTExceptions import disconnectError
Expand All @@ -41,7 +41,7 @@
from AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeQueueDisabledException
from AWSIoTPythonSDK.exception.AWSIoTExceptions import unsubscribeQueueFullException
from AWSIoTPythonSDK.exception.AWSIoTExceptions import unsubscribeQueueDisabledException
from AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeError
from AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeError, subackError
from AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeTimeoutException
from AWSIoTPythonSDK.exception.AWSIoTExceptions import unsubscribeError
from AWSIoTPythonSDK.exception.AWSIoTExceptions import unsubscribeTimeoutException
Expand All @@ -58,6 +58,12 @@
from queue import Queue


class SubackPacket(object):
def __init__(self):
self.event = Event()
self.data = None


class MqttCore(object):

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -298,12 +304,15 @@ def subscribe(self, topic, qos, message_callback=None):
if ClientStatus.STABLE != self._client_status.get_status():
self._handle_offline_request(RequestTypes.SUBSCRIBE, (topic, qos, message_callback, None))
else:
event = Event()
rc, mid = self._subscribe_async(topic, qos, self._create_blocking_ack_callback(event), message_callback)
if not event.wait(self._operation_timeout_sec):
suback = SubackPacket()
rc, mid = self._subscribe_async(topic, qos, self._create_blocking_suback_callback(suback), message_callback)
if not suback.event.wait(self._operation_timeout_sec):
self._internal_async_client.remove_event_callback(mid)
self._logger.error("Subscribe timed out")
raise subscribeTimeoutException()
if suback.data and suback.data[0] == SUBACK_ERROR:
self._logger.error(f"Suback error return code: {suback.data[0]}")
raise subackError(suback=suback.data)
ret = True
return ret

Expand Down Expand Up @@ -361,6 +370,12 @@ def ack_callback(mid, data=None):
event.set()
return ack_callback

def _create_blocking_suback_callback(self, ack: SubackPacket):
def ack_callback(mid, data=None):
ack.data = data
ack.event.set()
return ack_callback

def _handle_offline_request(self, type, data):
self._logger.info("Offline request detected!")
offline_request = QueueableRequest(type, data)
Expand Down
4 changes: 4 additions & 0 deletions AWSIoTPythonSDK/core/protocol/paho/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@
CONNACK_REFUSED_BAD_USERNAME_PASSWORD = 4
CONNACK_REFUSED_NOT_AUTHORIZED = 5

# SUBACK codes
SUBACK_ERROR = 0x80

# Connection state
mqtt_cs_new = 0
mqtt_cs_connected = 1
Expand Down Expand Up @@ -137,6 +140,7 @@
MSG_QUEUEING_DROP_OLDEST = 0
MSG_QUEUEING_DROP_NEWEST = 1


if sys.version_info[0] < 3:
sockpair_data = "0"
else:
Expand Down
5 changes: 5 additions & 0 deletions AWSIoTPythonSDK/exception/AWSIoTExceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ class subscribeError(operationError.operationError):
def __init__(self, errorCode):
self.message = "Subscribe Error: " + str(errorCode)

class subackError(operationError.operationError):
def __init__(self, suback=None):
self.message = "Received Error suback. Subscription failed."
self.suback = suback


class subscribeQueueFullException(operationError.operationError):
def __init__(self):
Expand Down
33 changes: 32 additions & 1 deletion test/core/protocol/test_mqtt_core.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import AWSIoTPythonSDK
from AWSIoTPythonSDK.core.protocol.mqtt_core import MqttCore
from AWSIoTPythonSDK.core.protocol.mqtt_core import SubackPacket
from AWSIoTPythonSDK.core.protocol.internal.clients import InternalAsyncMqttClient
from AWSIoTPythonSDK.core.protocol.internal.clients import ClientStatusContainer
from AWSIoTPythonSDK.core.protocol.internal.clients import ClientStatus
Expand All @@ -20,6 +21,7 @@
from AWSIoTPythonSDK.exception.AWSIoTExceptions import publishQueueFullException
from AWSIoTPythonSDK.exception.AWSIoTExceptions import publishQueueDisabledException
from AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeError
from AWSIoTPythonSDK.exception.AWSIoTExceptions import subackError
from AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeTimeoutException
from AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeQueueFullException
from AWSIoTPythonSDK.exception.AWSIoTExceptions import subscribeQueueDisabledException
Expand All @@ -29,6 +31,7 @@
from AWSIoTPythonSDK.exception.AWSIoTExceptions import unsubscribeQueueDisabledException
from AWSIoTPythonSDK.core.protocol.paho.client import MQTT_ERR_SUCCESS
from AWSIoTPythonSDK.core.protocol.paho.client import MQTT_ERR_ERRNO
from AWSIoTPythonSDK.core.protocol.paho.client import SUBACK_ERROR
from AWSIoTPythonSDK.core.protocol.paho.client import MQTTv311
from AWSIoTPythonSDK.core.protocol.internal.defaults import ALPN_PROTCOLS
try:
Expand Down Expand Up @@ -61,6 +64,7 @@
KEY_EXPECTED_QUEUE_APPEND_RESULT = "ExpectedQueueAppendResult"
KEY_EXPECTED_REQUEST_MID_OVERRIDE = "ExpectedRequestMidOverride"
KEY_EXPECTED_REQUEST_TIMEOUT = "ExpectedRequestTimeout"
KEY_EXPECTED_ACK_RESULT = "ExpectedAckPacketResult"
SUCCESS_RC_EXPECTED_VALUES = {
KEY_EXPECTED_REQUEST_RC : DUMMY_SUCCESS_RC
}
Expand All @@ -73,6 +77,10 @@
NO_TIMEOUT_EXPECTED_VALUES = {
KEY_EXPECTED_REQUEST_TIMEOUT : False
}
ERROR_SUBACK_EXPECTED_VALUES = {
KEY_EXPECTED_ACK_RESULT : (SUBACK_ERROR, None)
}

QUEUED_EXPECTED_VALUES = {
KEY_EXPECTED_QUEUE_APPEND_RESULT : AppendResults.APPEND_SUCCESS
}
Expand Down Expand Up @@ -121,6 +129,9 @@ def setup_class(cls):
RequestTypes.SUBSCRIBE: subscribeError,
RequestTypes.UNSUBSCRIBE: unsubscribeError
}
cls.ack_error = {
RequestTypes.SUBSCRIBE : subackError,
}
cls.request_queue_full = {
RequestTypes.PUBLISH : publishQueueFullException,
RequestTypes.SUBSCRIBE: subscribeQueueFullException,
Expand Down Expand Up @@ -518,6 +529,9 @@ def test_subscribe_success(self):

def test_subscribe_timeout(self):
self._internal_test_sync_api_with(RequestTypes.SUBSCRIBE, TIMEOUT_EXPECTED_VALUES)

def test_subscribe_error_suback(self):
self._internal_test_sync_api_with(RequestTypes.SUBSCRIBE, ERROR_SUBACK_EXPECTED_VALUES)

def test_subscribe_queued(self):
self._internal_test_sync_api_with(RequestTypes.SUBSCRIBE, QUEUED_EXPECTED_VALUES)
Expand Down Expand Up @@ -547,6 +561,7 @@ def _internal_test_sync_api_with(self, request_type, expected_values):
expected_request_mid = expected_values.get(KEY_EXPECTED_REQUEST_MID_OVERRIDE)
expected_timeout = expected_values.get(KEY_EXPECTED_REQUEST_TIMEOUT)
expected_append_result = expected_values.get(KEY_EXPECTED_QUEUE_APPEND_RESULT)
expected_suback_result = expected_values.get(KEY_EXPECTED_ACK_RESULT)

if expected_request_mid is None:
expected_request_mid = DUMMY_REQUEST_MID
Expand All @@ -562,7 +577,16 @@ def _internal_test_sync_api_with(self, request_type, expected_values):
self.invoke_mqtt_core_sync_api[request_type](self, message_callback)
else:
self.python_event_mock.wait.return_value = True
assert self.invoke_mqtt_core_sync_api[request_type](self, message_callback) is True
if expected_suback_result is not None:
self._use_mock_python_suback()
# mock the suback with expected suback result
self.python_suback_mock.data = expected_suback_result
if expected_suback_result[0] == SUBACK_ERROR:
with pytest.raises(self.ack_error[request_type]):
self.invoke_mqtt_core_sync_api[request_type](self, message_callback)
self.python_suback_patcher.stop()
else:
assert self.invoke_mqtt_core_sync_api[request_type](self, message_callback) is True

if expected_append_result is not None:
self.client_status_mock.get_status.return_value = ClientStatus.ABNORMAL_DISCONNECT
Expand All @@ -583,3 +607,10 @@ def _use_mock_python_event(self):
self.python_event_constructor = self.python_event_patcher.start()
self.python_event_mock = MagicMock()
self.python_event_constructor.return_value = self.python_event_mock

# Create a SubackPacket mock, which would mock the data in SubackPacket
def _use_mock_python_suback(self):
self.python_suback_patcher = patch(PATCH_MODULE_LOCATION + "SubackPacket", spec=SubackPacket)
self.python_suback_constructor = self.python_suback_patcher.start()
self.python_suback_mock = MagicMock()
self.python_suback_constructor.return_value = self.python_suback_mock