|
1 | | -import time |
| 1 | +''' |
| 2 | +Copyright (c) 2019, Pycom Limited. |
| 3 | +This software is licensed under the GNU GPL version 3 or any |
| 4 | +later version, with permitted additional terms. For more information |
| 5 | +see the Pycom Licence v1.0 document supplied with this file, or |
| 6 | +available at https://www.pycom.io/opensource/licensing |
| 7 | +''' |
| 8 | + |
| 9 | +try: |
| 10 | + from mqtt_core import MQTTCore as mqtt_core |
| 11 | +except: |
| 12 | + from _mqtt_core import MQTTCore as mqtt_core |
| 13 | + |
| 14 | +try: |
| 15 | + from pybytes_constants import MQTTConstants as mqttConst |
| 16 | +except: |
| 17 | + from _pybytes_constants import MQTTConstants as mqttConst |
| 18 | + |
2 | 19 | try: |
3 | | - import mqtt_core |
| 20 | + from pybytes_debug import print_debug |
4 | 21 | except: |
5 | | - import _mqtt_core as mqtt_core |
| 22 | + from _pybytes_debug import print_debug |
| 23 | + |
| 24 | +import time |
6 | 25 |
|
7 | 26 |
|
8 | 27 | class MQTTClient: |
9 | 28 |
|
10 | | - def __init__(self, client_id, server, mqtt_download_topic, port=0, user=None, password=None, keepalive=0, ssl=False, |
11 | | - ssl_params={}, reconnect=True): |
| 29 | + def __init__( |
| 30 | + self, client_id, server, mqtt_download_topic, |
| 31 | + pybytes_protocol, port=0, user=None, password=None, |
| 32 | + keepalive=0, ssl=False, |
| 33 | + ssl_params={}, reconnect=True |
| 34 | + ): |
| 35 | + if port == 0: |
| 36 | + self.__port = 8883 if ssl else 1883 |
| 37 | + else: |
| 38 | + self.__port = port |
12 | 39 | self.__reconnect = reconnect |
13 | 40 | self.__reconnect_count = 0 |
14 | 41 | self.__reconnecting = False |
15 | 42 | self.__server = server |
16 | 43 | self.__mqtt_download_topic = mqtt_download_topic |
17 | | - self.__mqtt = mqtt_core.MQTTClient(client_id, server, port, user, password, keepalive, |
18 | | - ssl, ssl_params) |
| 44 | + self.__pybytes_protocol = pybytes_protocol |
| 45 | + self.__clientId = client_id |
| 46 | + self.__user = user |
| 47 | + self.__password = password |
| 48 | + self.init_mqtt_core() |
| 49 | + |
| 50 | + def init_mqtt_core(self): |
| 51 | + self.__mqtt = mqtt_core( |
| 52 | + self.__clientId, |
| 53 | + True, |
| 54 | + mqttConst.MQTTv3_1_1, |
| 55 | + receive_timeout=500, |
| 56 | + reconnectMethod=self.reconnect |
| 57 | + ) |
| 58 | + self.__mqtt.configEndpoint(self.__server, self.__port) |
| 59 | + self.__mqtt._user = self.__user |
| 60 | + self.__mqtt._password = self.__password |
19 | 61 |
|
20 | 62 | def getError(self, x): |
21 | 63 | """Return a human readable error instead of its code number""" |
22 | 64 |
|
23 | | - # This errors are thrown by connect function, I wouldn't be able to find |
| 65 | + # This errors are thrown by connect function, |
| 66 | + # I wouldn't be able to find |
24 | 67 | # anywhere a complete list of these error codes |
25 | 68 | ERRORS = { |
26 | | - '-1': 'MQTTClient: Can\'t connect to MQTT server: "{}"'.format(self.__server), |
27 | | - '-4': 'MQTTClient: Bad credentials when connecting to MQTT server: "{}"'.format(self.__server), |
28 | | - '-9984': 'MQTTClient: Invalid certificate validation when connecting to MQTT server: "{}"'.format(self.__server) |
| 69 | + '-1': 'MQTTClient: Can\'t connect to MQTT server: "{}"'.format(self.__server), # noqa |
| 70 | + '-4': 'MQTTClient: Bad credentials when connecting to MQTT server: "{}"'.format(self.__server), # noqa |
| 71 | + '-9984': 'MQTTClient: Invalid certificate validation when connecting to MQTT server: "{}"'.format(self.__server) # noqa |
29 | 72 | } |
30 | 73 | message = str(x) |
31 | | - return ERRORS.get(str(x), 'Unknown error while connecting to MQTT server ' + message) |
| 74 | + return ERRORS.get( |
| 75 | + str(x), |
| 76 | + 'Unknown error while connecting to MQTT server {}'.format(message) |
| 77 | + ) |
32 | 78 |
|
33 | 79 | def connect(self, clean_session=True): |
34 | 80 | i = 0 |
35 | 81 | while 1: |
36 | 82 | try: |
37 | | - return self.__mqtt.connect(clean_session) |
38 | | - except OSError as e: |
39 | | - print(self.getError(e)) |
| 83 | + return self.__mqtt.connect() |
| 84 | + except OSError: |
40 | 85 | if (not self.__reconnect): |
41 | 86 | raise Exception('Reconnection Disabled.') |
42 | 87 | i += 1 |
43 | 88 | time.sleep(i) |
44 | 89 |
|
45 | | - def set_callback(self, f): |
46 | | - self.__mqtt.set_callback(f) |
| 90 | + def set_callback(self, mqtt_client, message): |
| 91 | + self.__pybytes_protocol.__process_recv_message(message) |
47 | 92 |
|
48 | 93 | def subscribe(self, topic, qos=0): |
49 | | - self.__mqtt.subscribe(topic, qos) |
50 | | - |
51 | | - def check_msg(self): |
52 | | - while 1: |
53 | | - time_before_retry = 10 |
54 | | - if self.__reconnecting == False: |
55 | | - try: |
56 | | - return self.__mqtt.check_msg() |
57 | | - except OSError as e: |
58 | | - print("Error check_msg", e) |
59 | | - |
60 | | - if (not self.__reconnect): |
61 | | - raise Exception('Reconnection Disabled.') |
62 | | - self.reconnect() |
63 | | - break |
64 | | - else: |
65 | | - time.sleep(time_before_retry) |
| 94 | + self.__mqtt.subscribe(topic, qos, self.set_callback) |
66 | 95 |
|
| 96 | + def unsubscribe(self, topic): |
| 97 | + return self.__mqtt.unsubscribe(topic) |
67 | 98 |
|
68 | 99 | def reconnect(self): |
69 | 100 | if self.__reconnecting: |
70 | 101 | return |
| 102 | + |
| 103 | + self.init_mqtt_core() |
| 104 | + |
71 | 105 | while True: |
72 | | - self.__reconnect_count += 1 |
| 106 | + self.__reconnect_count += 1 |
73 | 107 | self.__reconnecting = True |
74 | 108 | try: |
75 | | - self.__mqtt.connect() |
| 109 | + if not self.__mqtt.connect(): |
| 110 | + time.sleep(self.__reconnect_count) |
| 111 | + continue |
76 | 112 | self.subscribe(self.__mqtt_download_topic) |
77 | | - self.__reconnect_count=0 |
78 | | - print('Reconnected to MQTT server: "{}"'.format(self.__server)) |
| 113 | + self.__reconnect_count = 0 |
79 | 114 | self.__reconnecting = False |
80 | 115 | break |
81 | 116 | except OSError: |
82 | | - print("Reconnecting failed, will retry in {} seconds".format(self.__reconnect_count)) |
83 | 117 | time.sleep(self.__reconnect_count) |
84 | 118 |
|
85 | | - def publish(self, topic, msg, retain=False, qos=0): |
| 119 | + def publish(self, topic, msg, retain=False, qos=0, priority=False): |
86 | 120 | while 1: |
87 | | - if self.__reconnecting == False: |
| 121 | + if not self.__reconnecting: |
88 | 122 | try: |
89 | | - return self.__mqtt.publish(topic, msg, retain, qos) |
| 123 | + # Disable retain for publish by now |
| 124 | + return self.__mqtt.publish( |
| 125 | + topic, |
| 126 | + msg, |
| 127 | + qos, |
| 128 | + False, |
| 129 | + priority=priority |
| 130 | + ) |
90 | 131 | except OSError as e: |
91 | | - print("Error publish", e) |
| 132 | + print_debug(2, "Error publish", e) |
92 | 133 |
|
93 | 134 | if (not self.__reconnect): |
94 | 135 | raise Exception('Reconnection Disabled.') |
95 | 136 | self.reconnect() |
96 | | - break |
| 137 | + raise Exception('Error publish.') |
97 | 138 | else: |
98 | 139 | time.sleep(10) |
99 | 140 |
|
100 | | - def wait_msg(self): |
101 | | - while 1: |
102 | | - try: |
103 | | - return self.__mqtt.wait_msg() |
104 | | - except OSError as e: |
105 | | - print("Error wait_msg {}".format(e)) |
106 | | - |
107 | | - if (not self.__reconnect): |
108 | | - raise Exception('Reconnection Disabled.') |
109 | | - self.reconnect() |
110 | | - |
111 | 141 | def disconnect(self): |
112 | 142 | self.__mqtt.disconnect() |
0 commit comments