@@ -57,13 +57,19 @@ esp_rmaker_mqtt_data_t *mqtt_data;
5757const int MQTT_CONNECTED_EVENT = BIT1 ;
5858static EventGroupHandle_t mqtt_event_group ;
5959
60+ typedef struct {
61+ char * data ;
62+ char * topic ;
63+ } esp_rmaker_mqtt_long_data_t ;
64+
6065static void esp_rmaker_mqtt_subscribe_callback (const char * topic , int topic_len , const char * data , int data_len )
6166{
6267 esp_rmaker_mqtt_subscription_t * * subscriptions = mqtt_data -> subscriptions ;
6368 int i ;
6469 for (i = 0 ; i < MAX_MQTT_SUBSCRIPTIONS ; i ++ ) {
6570 if (subscriptions [i ]) {
66- if (strncmp (topic , subscriptions [i ]-> topic , topic_len ) == 0 ) {
71+ if ((strncmp (topic , subscriptions [i ]-> topic , topic_len ) == 0 )
72+ && (topic_len == strlen (subscriptions [i ]-> topic ))) {
6773 subscriptions [i ]-> cb (subscriptions [i ]-> topic , (void * )data , data_len , subscriptions [i ]-> priv );
6874 }
6975 }
@@ -141,6 +147,53 @@ esp_err_t esp_rmaker_mqtt_publish(const char *topic, void *data, size_t data_len
141147 return ESP_OK ;
142148}
143149
150+ static esp_rmaker_mqtt_long_data_t * esp_rmaker_mqtt_free_long_data (esp_rmaker_mqtt_long_data_t * long_data )
151+ {
152+ if (long_data ) {
153+ if (long_data -> topic ) {
154+ free (long_data -> topic );
155+ }
156+ if (long_data -> data ) {
157+ free (long_data -> data );
158+ }
159+ free (long_data );
160+ }
161+ return NULL ;
162+ }
163+
164+ static esp_rmaker_mqtt_long_data_t * esp_rmaker_mqtt_manage_long_data (esp_rmaker_mqtt_long_data_t * long_data ,
165+ esp_mqtt_event_handle_t event )
166+ {
167+ if (event -> topic ) {
168+ /* This is new data. Free any earlier data, if present. */
169+ esp_rmaker_mqtt_free_long_data (long_data );
170+ long_data = calloc (1 , sizeof (esp_rmaker_mqtt_long_data_t ));
171+ if (!long_data ) {
172+ ESP_LOGE (TAG , "Could not allocate memory for esp_rmaker_mqtt_long_data_t" );
173+ return NULL ;
174+ }
175+ long_data -> data = calloc (1 , event -> total_data_len );
176+ if (!long_data -> data ) {
177+ ESP_LOGE (TAG , "Could not allocate %d bytes for received data." , event -> total_data_len );
178+ return esp_rmaker_mqtt_free_long_data (long_data );
179+ }
180+ long_data -> topic = strndup (event -> topic , event -> topic_len );
181+ if (!long_data -> topic ) {
182+ ESP_LOGE (TAG , "Could not allocate %d bytes for received topic." , event -> topic_len );
183+ return esp_rmaker_mqtt_free_long_data (long_data );
184+ }
185+ }
186+ if (long_data ) {
187+ memcpy (long_data -> data + event -> current_data_offset , event -> data , event -> data_len );
188+
189+ if ((event -> current_data_offset + event -> data_len ) == event -> total_data_len ) {
190+ esp_rmaker_mqtt_subscribe_callback (long_data -> topic , strlen (long_data -> topic ),
191+ long_data -> data , event -> total_data_len );
192+ return esp_rmaker_mqtt_free_long_data (long_data );
193+ }
194+ }
195+ return long_data ;
196+ }
144197
145198static esp_err_t mqtt_event_handler (esp_mqtt_event_handle_t event )
146199{
@@ -168,12 +221,27 @@ static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event)
168221 case MQTT_EVENT_PUBLISHED :
169222 ESP_LOGD (TAG , "MQTT_EVENT_PUBLISHED, msg_id=%d" , event -> msg_id );
170223 break ;
171- case MQTT_EVENT_DATA :
224+ case MQTT_EVENT_DATA : {
172225 ESP_LOGD (TAG , "MQTT_EVENT_DATA" );
173- ESP_LOGD (TAG , "TOPIC=%.*s\r\n" , event -> topic_len , event -> topic );
226+ static esp_rmaker_mqtt_long_data_t * long_data ;
227+ /* Topic can be NULL, for data longer than the MQTT buffer */
228+ if (event -> topic ) {
229+ ESP_LOGD (TAG , "TOPIC=%.*s\r\n" , event -> topic_len , event -> topic );
230+ }
174231 ESP_LOGD (TAG , "DATA=%.*s\r\n" , event -> data_len , event -> data );
175- esp_rmaker_mqtt_subscribe_callback (event -> topic , event -> topic_len , event -> data , event -> data_len );
232+ if (event -> data_len == event -> total_data_len ) {
233+ /* If long_data still exists, it means there was some issue getting the
234+ * long data, and so, it needs to be freed up.
235+ */
236+ if (long_data ) {
237+ long_data = esp_rmaker_mqtt_free_long_data (long_data );
238+ }
239+ esp_rmaker_mqtt_subscribe_callback (event -> topic , event -> topic_len , event -> data , event -> data_len );
240+ } else {
241+ long_data = esp_rmaker_mqtt_manage_long_data (long_data , event );
242+ }
176243 break ;
244+ }
177245 case MQTT_EVENT_ERROR :
178246 ESP_LOGE (TAG , "MQTT_EVENT_ERROR" );
179247 break ;
0 commit comments