@@ -57,13 +57,19 @@ esp_rmaker_mqtt_data_t *mqtt_data;
57
57
const int MQTT_CONNECTED_EVENT = BIT1 ;
58
58
static EventGroupHandle_t mqtt_event_group ;
59
59
60
+ typedef struct {
61
+ char * data ;
62
+ char * topic ;
63
+ } esp_rmaker_mqtt_long_data_t ;
64
+
60
65
static void esp_rmaker_mqtt_subscribe_callback (const char * topic , int topic_len , const char * data , int data_len )
61
66
{
62
67
esp_rmaker_mqtt_subscription_t * * subscriptions = mqtt_data -> subscriptions ;
63
68
int i ;
64
69
for (i = 0 ; i < MAX_MQTT_SUBSCRIPTIONS ; i ++ ) {
65
70
if (subscriptions [i ]) {
66
- if (strncmp (topic , subscriptions [i ]-> topic , topic_len ) == 0 ) {
71
+ if ((strncmp (topic , subscriptions [i ]-> topic , topic_len ) == 0 )
72
+ && (topic_len == strlen (subscriptions [i ]-> topic ))) {
67
73
subscriptions [i ]-> cb (subscriptions [i ]-> topic , (void * )data , data_len , subscriptions [i ]-> priv );
68
74
}
69
75
}
@@ -141,6 +147,53 @@ esp_err_t esp_rmaker_mqtt_publish(const char *topic, void *data, size_t data_len
141
147
return ESP_OK ;
142
148
}
143
149
150
+ static esp_rmaker_mqtt_long_data_t * esp_rmaker_mqtt_free_long_data (esp_rmaker_mqtt_long_data_t * long_data )
151
+ {
152
+ if (long_data ) {
153
+ if (long_data -> topic ) {
154
+ free (long_data -> topic );
155
+ }
156
+ if (long_data -> data ) {
157
+ free (long_data -> data );
158
+ }
159
+ free (long_data );
160
+ }
161
+ return NULL ;
162
+ }
163
+
164
+ static esp_rmaker_mqtt_long_data_t * esp_rmaker_mqtt_manage_long_data (esp_rmaker_mqtt_long_data_t * long_data ,
165
+ esp_mqtt_event_handle_t event )
166
+ {
167
+ if (event -> topic ) {
168
+ /* This is new data. Free any earlier data, if present. */
169
+ esp_rmaker_mqtt_free_long_data (long_data );
170
+ long_data = calloc (1 , sizeof (esp_rmaker_mqtt_long_data_t ));
171
+ if (!long_data ) {
172
+ ESP_LOGE (TAG , "Could not allocate memory for esp_rmaker_mqtt_long_data_t" );
173
+ return NULL ;
174
+ }
175
+ long_data -> data = calloc (1 , event -> total_data_len );
176
+ if (!long_data -> data ) {
177
+ ESP_LOGE (TAG , "Could not allocate %d bytes for received data." , event -> total_data_len );
178
+ return esp_rmaker_mqtt_free_long_data (long_data );
179
+ }
180
+ long_data -> topic = strndup (event -> topic , event -> topic_len );
181
+ if (!long_data -> topic ) {
182
+ ESP_LOGE (TAG , "Could not allocate %d bytes for received topic." , event -> topic_len );
183
+ return esp_rmaker_mqtt_free_long_data (long_data );
184
+ }
185
+ }
186
+ if (long_data ) {
187
+ memcpy (long_data -> data + event -> current_data_offset , event -> data , event -> data_len );
188
+
189
+ if ((event -> current_data_offset + event -> data_len ) == event -> total_data_len ) {
190
+ esp_rmaker_mqtt_subscribe_callback (long_data -> topic , strlen (long_data -> topic ),
191
+ long_data -> data , event -> total_data_len );
192
+ return esp_rmaker_mqtt_free_long_data (long_data );
193
+ }
194
+ }
195
+ return long_data ;
196
+ }
144
197
145
198
static esp_err_t mqtt_event_handler (esp_mqtt_event_handle_t event )
146
199
{
@@ -168,12 +221,27 @@ static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event)
168
221
case MQTT_EVENT_PUBLISHED :
169
222
ESP_LOGD (TAG , "MQTT_EVENT_PUBLISHED, msg_id=%d" , event -> msg_id );
170
223
break ;
171
- case MQTT_EVENT_DATA :
224
+ case MQTT_EVENT_DATA : {
172
225
ESP_LOGD (TAG , "MQTT_EVENT_DATA" );
173
- ESP_LOGD (TAG , "TOPIC=%.*s\r\n" , event -> topic_len , event -> topic );
226
+ static esp_rmaker_mqtt_long_data_t * long_data ;
227
+ /* Topic can be NULL, for data longer than the MQTT buffer */
228
+ if (event -> topic ) {
229
+ ESP_LOGD (TAG , "TOPIC=%.*s\r\n" , event -> topic_len , event -> topic );
230
+ }
174
231
ESP_LOGD (TAG , "DATA=%.*s\r\n" , event -> data_len , event -> data );
175
- esp_rmaker_mqtt_subscribe_callback (event -> topic , event -> topic_len , event -> data , event -> data_len );
232
+ if (event -> data_len == event -> total_data_len ) {
233
+ /* If long_data still exists, it means there was some issue getting the
234
+ * long data, and so, it needs to be freed up.
235
+ */
236
+ if (long_data ) {
237
+ long_data = esp_rmaker_mqtt_free_long_data (long_data );
238
+ }
239
+ esp_rmaker_mqtt_subscribe_callback (event -> topic , event -> topic_len , event -> data , event -> data_len );
240
+ } else {
241
+ long_data = esp_rmaker_mqtt_manage_long_data (long_data , event );
242
+ }
176
243
break ;
244
+ }
177
245
case MQTT_EVENT_ERROR :
178
246
ESP_LOGE (TAG , "MQTT_EVENT_ERROR" );
179
247
break ;
0 commit comments