MQTT是基于TCP/IP协议栈构建的异步通信消息协议,是一种轻量级的发布、订阅信息传输协议。可以在不可靠的网络环境中进行扩展,适用于设备硬件存储空间或网络带宽有限的场景。使用MQTT协议,消息发送者与接收者不受时间和空间的限制。
IOT中继宝盒嵌入了MQTT消息服务器为接入设备接入物联网服务。
此设备采用Mqtt协议接入本IOT中继系统平台,实现从接入嵌入设备读、写、上报物模型数据。 设备读写报文数据采用字符串BASE64加密传输。
(本例程以stm32开发版+W5500以太网模块设备接入为例)
MQTT服务器地址: tcp://:192.168.0.105:1883( IOT中继宝盒IOT设备接口–配置说明中显示的MQTT服务器连接地址和端口)
用户名:txbxxxxx(IOT中继宝盒IOT设备接口–配置说明中显示的MQTT服务器用户名)
密码:dHhiMDIxMjE4 (IOT中继宝盒IOT设备接口–配置说明中显示的MQTT服务器客户端口令,BASE64解密使用)
客户端ID: 44080000001111000019(连接IOT中继宝盒的设备ID)
设备侧MQTT协议使用前提为设备启用、连接协议配置生效。设备侧Mqtt发布TOPIC为配置的物模型属性、物模型事件对应的上报设备属性数据和事件发生上报的数据。 数据流向为上行
对应的发布的主题TOPIC 、服务质量格式分别为:
类别 | 主题TOPIC | 描述 | 服务质量 |
---|---|---|---|
属性数据 | /iotboxProperties/44080000001111000019/P_1696583483495 | 此topic对应设备物模型属性数据,由类别标识、设备ID、物模型属性ID组成。 主题中 “44080000001111000019” 为设备ID; "P_1696583483495"为设备物模型定义的属性ID标识 | QS1 |
事件数据 | /iotboxEvent/44080000001111000019/E_1696584389929 | 此topic对应设备物模型事件数据,由类别标识、设备ID、物模型事件ID组成。 主题中 “44080000001111000019” 为设备ID; "E_1696584389929"为设备物模型定义的事件ID标识 | QS1 |
设备侧Mqtt订阅TOPIC为配置的物模型功能,通常用于设备接收处理功能指令的操作。 数据流向为下行
对应的订阅的主题TOPIC 、服务质量格式分别为:
类别 | 主题TOPIC | 描述 | 服务质量 |
---|---|---|---|
功能数据 | /iotboxFunction/44080000001111000019/F_1696584326733 | 此topic对应设备物模型功能数据,由类别标识、设备ID、物模型功能ID组成。 主题中 “44080000001111000019” 为设备ID; "F_1696584326733"为设备物模型定义的功能ID标识 | QS1 |
设备侧向MQTT服务器发布消息体格式采用字符串BASE64加密传输。 参数属性说明:
类别 | 主题TOPIC | 加密前消息体内容 | 加密后消息体内容 |
属性数据 | /iotboxProperties/44080000001111000019/P_1696583483495 | 如:温度 "36" 度 | MzY= |
事件数据 | /iotboxEvent/44080000001111000019/E_1696584389929 | 事件数据为json数值格式,由物模型事件属性中的输出参数组成。加密前格式如: [ {"paramsId": "aaaaaa", "dataValue": "36"}, {"paramsId": "bbbbbb", "dataValue": "45"}, ...... ]json字符串内容说明paramsId上报设备事件输出参数ID标识,为设备物模型配置的事件输出参数ID标识 如上例 "aaaaaa" 、"bbbbbb" 为温度和湿度参数标识dataValue上报设备事件输出参数数据值,dataValue 为上报设备事件输出参数值标识 上例"36"、"45" 上报设备事件输出参数数据值,为输出参数温度和湿度数据值 | WyB7InBhcmFtc0lkIjogImFhYWFhYSIsICJkYXRhVmFsdWUiOiAiMzYiIH0seyJwYXJhbXNJZCI6ICJiYmJiYmIiLCAgImRhdGFWYWx1ZSI6ICI0NSJ9LF0= |
设备侧向MQTT服务器订阅消息体格式采用字符串BASE64加密传输。 参数属性说明:
主题TOPIC: /iotboxFunction/44080000001111000019/F_1696584326733
订阅返回消息体内容:
ewogICAgICJkZXZpY2VJZCI6ICI0NDA4MDAwMDAwMTExMTAwMDAxNiIsCiAgICAgImZ1bmN0aW9uSWQiOiAiRl8xNjk2NTg0MzI2NzMzIiwKICAgICAiZnVuY3Rpb25OYW1lIjogIui/nOeoi+WFs+acuiIsCiAgICAgWwogICAgIHsicGFyYW1zSWQiOiAiYWFhYWFhIiwgInBhcmFtc05hbWUiOiAi5bCP6L2m5b2S5L2NIiwgImRhdGFWYWx1ZSI6ICJ0cnVlIiB9LAogICAgIF0KIH0=
订阅返回消息体内容:
消息体数据为json数值格式,由物模型功能定义中的输入参数组成。解密后的格式如:
{ "deviceId": "44080000001111000016", "functionId": "F_1696584326733", "functionName": "远程关机", "inputParamsList": [ {"paramsId": "aaaaaa", "paramsName": "小车归位", "dataValue": "true" }, ...... ] }
json字符串内容说明 | |
deviceId | 定义的设备ID标识 如上例黄色 "44080000001111000016" 为需要处理下发的功能指令设备标识 |
functionId | 设备物模型功能定义中的功能ID标识 如上例黄色 "F_1696584326733" 为设备物模型功能定义中的功能ID标识 |
functionName | 设备物模型功能定义中的功能名称 如上例黄色 "远程关机" 为设备物模型功能定义中的功能名称 |
paramsId | 输入参数ID标识,为设备物模型功能定义中的输入参数ID标识 如上例黄色 "aaaaaa" 为输入参数标识 |
paramsName | 输入参数名称,为设备物模型功能定义中的输入参数名称 如上例黄色 "远程关机" 为设备物模型功能定义中的输入参数名称 |
dataValue | 输入参数数据值,dataValue 为设备物模型功能定义中的输入参数数值标识 上例黄色 "true" 设备物模型功能定义中的输入参数数值,为输入参数布尔值true |
ts_mqtt.c文件中定义:
char *mqtt_clientID="44080000001111000019"; //Mqtt 客户端ID 填写iotrelay中继平台接入设备的ID char *mqtt_account ="txb0727"; //Mqtt 服务器连接账号 填写iotrelay中继平台接入设备接口生成的账号 char *mqtt_password ="xxxxxx"; //Mqtt 服务器连接密码 填写iotrelay中继平台接入设备接口生成的口令 uint16 local_port = 5000; //设置开发板本地端口,根据需要设置 uint8 remote_ip[4] = {192.168.0.102}; //连接mqtt服务器IP uint16 remote_port = 1883; //连接mqtt服务器端口
参数说明:
参数 | 说明 |
---|---|
mqtt_clientID | 连接本IOT中继平台Mqtt服务器的客户端ID, 填写iot中继平台接入设备的ID。 |
mqtt_account | 连接本IOT中继平台Mqtt服务器的连接账号, 填写iot中继平台接入设备接口生成的账号 |
mqtt_password | 连接本IOT中继平台Mqtt服务器的口令, 填写iot中继平台接入设备接口生成的口令,嵌入设备中填写base64解密后的口令 |
remote_ip[4] | 连接本IOT中继平台Mqtt服务器的IP, 填写iot中继平台的IP |
remote_port | 连接本IOT中继平台Mqtt服务器的端口, 填写iot中继平台Mqtt服务器的端口 |
发布上报物模型属性数据,在本样例中ts_mqtt.c文件publishIotrelayModelProperties方法实现,部分代码段:
/**** *此处为构建上报的报文,获取设备属性数据在此填入,例如在此调用gpio获取设备的温度,在此范例处先直接填入数字 *上报设备物模型属性数据,例如 温度 36 度, *****/ char *indata="36" ; //上报设备属性数据 温度 ,在此范例处先直接填入数字,实际应用调用gpio获取设备的物模型属性数据 char outdata[20]; //加密后数据 int outlen; memset(outdata,0,sizeof(outdata)); /* 对上报属性数据Base64加密处理 */ int jm= base64_encode(indata, strlen(indata), outdata, &outlen);
参数说明:
“36” 为要上报的属性实际数值,例如温度36 度(设备的属性值为您程序开发中获取,获取后替换); int jm= base64_encode(indata, strlen(indata), outdata, &outlen); 为对上报属性数据Base64加密处理
发布上报物模型事件数据,在本样例中ts_mqtt.c文件publishIotrelayModelEvent方法实现,部分代码段:
/**** * 此处为构建上报的报文,设备属性数据在此填入,例如在此调用gpio获取设备的光照、温度 *上报设备物模型事件数据,事件数据为json数值格式,由物模型事件属性中的输出参数组成, * 例如:设备关机事件上报数据 * 数据为json格式字符串 "[{\"paramsId\":\"suning\", \"dataValue\": \"80\" },{\"paramsId\": \"temperature\", \"dataValue\": \"45\"}]" *****/ char *indata="[{\"paramsId\":\"suning\", \"dataValue\": \"80\" },{\"paramsId\": \"temperature\", \"dataValue\": \"45\"}]"; //上报设备事件数据 printf("加密前报文: %s \r\n",indata); char outdata[512]; //加密后数据 int outlen; memset(outdata,0,sizeof(outdata)); /* 对上报数据Base64加密处理 */ int jm= base64_encode(indata, strlen(indata), outdata, &outlen);
char *indata=“[{"paramsId":"suning", "dataValue": "80" },{"paramsId": "temperature", "dataValue": "45"}]”; 为上报设备事件json数据 。 int jm= base64_encode(indata, strlen(indata), outdata, &outlen); 为对上报数据Base64加密处理
设备事件json数据参数说明:
参数 | 说明 |
---|---|
paramsId | 上报设备事件输出参数ID标识,为设备物模型配置的事件输出参数ID标识 如上例 “suning” 、 “temperature” 为光照和温度参数标识 |
dataValue | 上报设备事件输出参数数据值,dataValue 为上报设备事件输出参数值标识 上例 “80” 、 “45” 上报设备事件输出参数数据值,为输出参数光照温和度数据值;设备的属性值为您程序开发中获取,获取后替换) |
订阅物模型功能,在本样例中ts_mqtt.c文件 **subscribIotrelayModelFunction **方法实现,部分代码段:
//Base64解密接收到的报文 char outdata[600]; //接收到的报文数据 int outlen; memset(outdata,0,sizeof(outdata)); int jm= base64_decode((const char *)payload_in, strlen((const char *)payload_in), outdata, &outlen); if(jm<0){ printf("解密数据错误:\r\n"); }else{ printf("解密后数据: %s \r\n",outdata); //解析JSON格式字符串 cJSON *root = cJSON_Parse(outdata); if(root != NULL) { char *json_str = cJSON_Print(root); printf("解析解密后的JSON数据 %s", json_str); //此处根据获取的物模型功能指令,填写设备需要操作的代码,例如设备关机
int jm= base64_decode((const char *)payload_in, strlen((const char *)payload_in),outdata, &outlen); 为base64解密订阅功能接收的json字符串数据 。 cJSON *root = cJSON_Parse(outdata); 为解析JSON格式字符串
//此处根据获取的物模型功能指令,填写设备需要操作的代码,例如设备关机
/****************************************************************************** * @file mqqt协议发布上报设备属性或设备事件数据,订阅设备功能 Project Template ../main.c * @author txb0727 * @version V1.0.0 * @date 2023-12-10 * @brief Main program body ****************************************************************************** * @attention * 本范例用于嵌入式开发版采用mqqt协议发布上报设备属性或设备事件数据,订阅设备功能,仅供参考 * 本范例硬件为stm32f103开发板,通讯模块为W5500以太网通讯模块 * * <h2><center>© COPYRIGHT 2023 txb0727.</center></h2> ******************************************************************************/ /* Includes ------------------------------------------------------------------*/ #include <stm32f10x.h> #include "FreeRTOS.H" #include "task.h" #include "mcu_init.h" #include "config.h" #include "device.h" #include "spi2.h" #include "socket.h" #include "w5500.h" #include "at24c16.h" #include "util.h" #include "dhcp.h" #include "string.h" #include <stdio.h> #include "ts_mqtt.h" static TaskHandle_t AppTaskCreate_Handle = NULL;/* 创建任务句柄 */ static TaskHandle_t subscribeFunctionTopicTask_Handle = NULL;/* KEY任务句柄 */ static TaskHandle_t publishPropertiesTopicTask_Handle = NULL;/* KEY任务句柄 */ static TaskHandle_t publishEventTopicTask_Handle = NULL;/* LED任务句柄 */ static TaskHandle_t dhcpTask_Handle = NULL;/* KEY任务句柄 */ static void subscribeFunctionTopicTask(void* pvParameters);/* subscribeFunctionTopicTask任务实现 */ static void publishPropertiesTopicTask(void* pvParameters);/* publishPropertiesTopicTask任务实现 */ static void publishEventTopicTask(void* pvParameters);/* publishEventTopicTask任务实现 */ static void dhcpTask(void* pvParameters);/* dhcpTask任务实现 */ static char publishPropertiesMeassage[200]; //发布物模型属性消息体 static char publishEventMeassage[200]; //发布物模型事件消息体 /****************** * 此topic对应iotrelay中继平台中定义的设备物模型属性/功能/事件,由类别标识、设备ID、物模型属性ID/功能ID/事件ID组成 *************/ //订阅物模型功能Topic char *subscribeFunctionTopic="/iotboxFunction/44080000001111000019/F_1696584326733"; //发布物模型属性Topic char *publishPropertiesTopic="/iotboxProperties/44080000001111000019/P_1696583483495"; //发布物模型事件Topic char *publishEventTopic="/iotboxEvent/44080000001111000019/E_1696584389929"; static void subscribeFunctionTopicTask(void *pvParameters){ while (1) { /********* * 订阅iotrelay平台设备功能 * subscribeFunctionTopic 为订阅物模型功能主题 ********/ subscribIotrelayModelFunction(subscribeFunctionTopic); printf("subscribeFunctionTopicTask min free stack size is %d \r\n",(int32_t)uxTaskGetStackHighWaterMark(NULL)); vTaskDelay(5*1000); // 延时5000毫秒 } } static void publishPropertiesTopicTask(void *pvParameters) { while (1) { /********* * 发布iotrelay平台设备属性 * publishPropertiesTopic 为发布物模型属性主题 ********/ memset(publishPropertiesMeassage,0,sizeof(publishPropertiesMeassage)); publishIotrelayModelProperties(publishPropertiesTopic,publishPropertiesMeassage); printf("publishPropertiesTopicTask min free stack size is %d \r\n",(int32_t)uxTaskGetStackHighWaterMark(NULL)); vTaskDelay(60*1000); // 延时1秒 } } static void publishEventTopicTask(void *pvParameters) { while (1) { /********* * 发布iotrelay平台设备事件 根据业务需要调用此发布物模型事件,如设备关机、设备预警等 * publishEventTopic 为发布物模型事件主题 ********/ memset(publishEventMeassage,0,sizeof(publishEventMeassage)); publishIotrelayModelEvent(publishEventTopic,publishEventMeassage); printf("publishEventTopicTask min free stack size is %d \r\n",(int32_t)uxTaskGetStackHighWaterMark(NULL)); vTaskDelay(60*1000); // 延时1秒 } } static void dhcpTask(void *pvParameters) { while (1) { DHCP_run(); printf("dhcpTask min free stack size is %d \r\n",(int32_t)uxTaskGetStackHighWaterMark(NULL)); vTaskDelay(1000); // 延时100毫秒 } } static void AppTaskCreate(void){ BaseType_t xReturn = pdPASS; taskENTER_CRITICAL(); xReturn = xTaskCreate( (TaskFunction_t) subscribeFunctionTopicTask, (const char*) "subscribeFunctionTopicTask", (uint32_t) 1024, (void*) NULL, (UBaseType_t) 1, (TaskHandle_t*) &subscribeFunctionTopicTask_Handle ); if (pdPASS == xReturn) printf("subscribeFunctionTopicTask created successfully\r\n"); else printf("subscribeFunctionTopicTask created failed\r\n"); xReturn = xTaskCreate( (TaskFunction_t) publishPropertiesTopicTask, (const char*) "publishPropertiesTopicTask", (uint32_t) 350, (void*) NULL, (UBaseType_t) 1, (TaskHandle_t*) &publishPropertiesTopicTask_Handle ); if (pdPASS == xReturn) printf("publishPropertiesTopicTask created successfully\r\n"); else printf("publishPropertiesTopicTask created failed\r\n"); xReturn = xTaskCreate( (TaskFunction_t) publishEventTopicTask, (const char*) "publishEventTopicTask", (uint32_t) 350, (void*) NULL, (UBaseType_t) 1, (TaskHandle_t*) &publishEventTopicTask_Handle ); if (pdPASS == xReturn) printf("publishEventTopicTask created successfully\r\n"); else printf("publishEventTopicTask created failed\r\n"); xReturn = xTaskCreate( (TaskFunction_t) dhcpTask, (const char*) "dhcpTask", (uint32_t) 100, (void*) NULL, (UBaseType_t) 1, (TaskHandle_t*) &dhcpTask_Handle ); if (pdPASS == xReturn) printf("dhcpTask created successfully\r\n"); else printf("dhcpTask created failed\r\n"); vTaskDelete(AppTaskCreate_Handle); taskEXIT_CRITICAL(); } int main() { BaseType_t xReturn = pdPASS; RCC_Configuration(); /* 配置单片机系统时钟*/ NVIC_Configuration();/* 配置嵌套中断向量*/ Systick_Init(72);/* 初始化Systick工作时钟*/ GPIO_Configuration();/* 配置GPIO*/ Timer_Configuration();/*定时器初始化*/ USART1_Init(); /*初始化串口通信:115200@8-n-1*/ at24c16_init();/*初始化eeprom*/ printf("W5500 EVB initialization over.\r\n"); Reset_W5500();/*硬重启W5500*/ WIZ_SPI_Init();/*初始化SPI接口*/ printf("W5500 initialized!\r\n"); set_default(); init_dhcp_client(); // 创建任务 xReturn = xTaskCreate( (TaskFunction_t) AppTaskCreate, (const char*) "AppTaskCreate", (uint32_t) 128, (void*) NULL, (UBaseType_t) 3, (TaskHandle_t*) &AppTaskCreate_Handle ); if (xReturn == pdPASS) vTaskStartScheduler(); // 启动调度器 while(1); } /******************* (C) COPYRIGHT 2023 txb0727 ****** END OF FILE ****/
#include "ts_mqtt.h" #include "MQTTPacket.h" #include "transport.h" #include "util.h" #include "w5500.h" #include "socket.h" #include <string.h> #include <stdlib.h> #include "stm32f10x.h" // Device header #include "base64.h" #include "cJSON.h" #include "utf8_gb2312_switch.h" char *mqtt_clientID="44080000001111000019"; //Mqtt 客户端ID 填写iotrelay中继平台接入设备的ID char *mqtt_account = "iotrelayAdmin"; //Mqtt 服务器连接账号 填写iotrelay中继平台接入设备接口生成的账号 char *mqtt_password = "txb021218"; //Mqtt 服务器连接密码 填写iotrelay中继平台接入设备接口生成的口令 uint16 local_port = 5000; //设置开发板本地端口 uint8 remote_ip[4] = {192, 168, 0, 103}; //连接mqtt服务器端口 uint16 remote_port = 1883; //连接mqtt服务器端口 //MQTT发布消息函数 /********* * 发布iotrelay平台设备属性 消息体内容Base64加密传输 * @params pTopic 为发布物模型属性主题 ,此topic对应iotrelay中继平台中定义的设备物模型属性数据,由类别标识、设备ID、物模型属性ID组成,例如 "/iotboxProperties/44080000001111000019/P_1696583483495" * @params publishPropertiesMeassage 为发布消息体内容 ********/ void publishIotrelayModelProperties(char *pTopic,char *publishPropertiesMeassage) { MQTTPacket_connectData data = MQTTPacket_connectData_initializer; int rc = 1; int mysock = SOCK_PROPERTIES; unsigned char buf[200]; int buflen = sizeof(buf); MQTTString topicString = MQTTString_initializer; int len = 0; switch (getSn_SR(SOCK_PROPERTIES)) { case SOCK_CLOSED: socket(SOCK_PROPERTIES, Sn_MR_TCP, local_port++, Sn_MR_ND); printf("SOCK_CLOSED \r\n"); break; case SOCK_INIT: connect(SOCK_PROPERTIES, remote_ip, remote_port); printf("SOCK_INIT \r\n"); break; case SOCK_ESTABLISHED: if (getSn_IR(SOCK_PROPERTIES)&Sn_IR_CON) { printf("TCP established \r\n"); setSn_IR(SOCK_PROPERTIES, Sn_IR_CON); } data.clientID.cstring = mqtt_clientID; //client ID data.keepAliveInterval = 60; //keep alive time data.cleansession = 1; // clean the last info after re-link (1: clean; 0: not clean) data.username.cstring = mqtt_account; // the username if MQTT server need data.password.cstring = mqtt_password; // the password if MQTT server need len = MQTTSerialize_connect(buf, buflen, &data); // Serialize the MQTT Packet according the connect data printf("Serialize the MQTT Packet according the connect data %d \r\n",len); rc = transport_sendPacketBuffer(mysock, buf, len); // sned packet to Server by driver printf("sned packet to Server by driver %d \r\n",rc); /*wait for connack*/ if (MQTTPacket_read(buf, buflen, transport_getdata3)==CONNACK) // call the driver to get conn ack packet from Server { unsigned char sessionPresent, connack_rc; if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0) //deserialize packet to confirm the MQTT connect success { printf("Unable to connect, return code %d \n", connack_rc); transport_close(mysock); // return 0; } else { printf("Connect succeed!\r\n"); } } else { transport_close(mysock); // return 0; } /**** *此处为构建上报的报文,获取设备属性数据在此填入,例如在此调用gpio获取设备的温度,在此范例处先直接填入数字 *上报设备物模型属性数据,例如 温度 36 度, *****/ char *indata="36"; //上报设备属性数据 温度 ,在此范例处先直接填入数字,实际应用调用gpio获取设备的物模型属性数据 char outdata[20]; //加密后数据 int outlen; memset(outdata,0,sizeof(outdata)); /* 对上报属性数据Base64加密处理 */ int jm= base64_encode(indata, strlen(indata), outdata, &outlen); printf("加密后数据: %s \r\n",outdata); strcpy(publishPropertiesMeassage,outdata);//填充Ba64加密后的消息体内容 int pubStringLen= strlen(publishPropertiesMeassage); printf(" char *pubString. %s \r\n",publishPropertiesMeassage); memset(buf,0,buflen); topicString.cstring = pTopic; // publish topic name to Server len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, (unsigned char*)publishPropertiesMeassage, pubStringLen); rc = transport_sendPacketBuffer(mysock, buf, len); printf("disconnecting. \r\n"); len = MQTTSerialize_disconnect(buf, buflen); rc = transport_sendPacketBuffer(mysock, buf, len); free(outdata); free(buf); break; case SOCK_CLOSE_WAIT: close(SOCK_PROPERTIES); printf("SOCK_CLOSE_WAIT. \r\n"); break; } } /********* * 发布iotrelay平台设备事件 消息体内容Base64加密传输 * @params pTopic 为发布物模型事件主题 ,此topic对应iotrelay中继平台中定义的设备物模型事件数据,由类别标识、设备ID、物模型事件ID组成,例如 "/iotboxEvent/44080000001111000019/E_1696584389929" * @params publishEventMeassage 为发布消息体内容 ********/ void publishIotrelayModelEvent(char *pTopic,char *publishEventMeassage) { MQTTPacket_connectData data = MQTTPacket_connectData_initializer; int rc = 1; int mysock = SOCK_EVENT; unsigned char buf[200]; int buflen = sizeof(buf); MQTTString topicString = MQTTString_initializer; int len = 0; switch (getSn_SR(SOCK_EVENT)) { case SOCK_CLOSED: socket(SOCK_EVENT, Sn_MR_TCP, local_port++, Sn_MR_ND); printf("SOCK_CLOSED \r\n"); break; case SOCK_INIT: connect(SOCK_EVENT, remote_ip, remote_port); printf("SOCK_INIT \r\n"); break; case SOCK_ESTABLISHED: if (getSn_IR(SOCK_EVENT)&Sn_IR_CON) { printf("TCP established \r\n"); setSn_IR(SOCK_EVENT, Sn_IR_CON); } data.clientID.cstring = mqtt_clientID; //client ID data.keepAliveInterval = 60; //keep alive time data.cleansession = 1; // clean the last info after re-link (1: clean; 0: not clean) data.username.cstring = mqtt_account; // the username if MQTT server need data.password.cstring = mqtt_password; // the password if MQTT server need len = MQTTSerialize_connect(buf, buflen, &data); // Serialize the MQTT Packet according the connect data printf("Serialize the MQTT Packet according the connect data %d \r\n",len); rc = transport_sendPacketBuffer(mysock, buf, len); // sned packet to Server by driver printf("sned packet to Server by driver %d \r\n",rc); /*wait for connack*/ if (MQTTPacket_read(buf, buflen, transport_getdata4)==CONNACK) // call the driver to get conn ack packet from Server { unsigned char sessionPresent, connack_rc; if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0) //deserialize packet to confirm the MQTT connect success { printf("Unable to connect, return code %d \r\n", connack_rc); transport_close(mysock); // return 0; } else { printf("Connect succeed!\r\n"); } } else { transport_close(mysock); // return 0; } /**** * 此处为构建上报的报文,设备属性数据在此填入,例如在此调用gpio获取设备的湿度、温度 *上报设备物模型事件数据,事件数据为json数值格式,由物模型事件属性中的输出参数组成, * 例如:设备关机事件上报数据 * 数据为json格式字符串 "[{\"paramsId\":\"suning\", \"dataValue\": \"80\" },{\"paramsId\": \"temperature\", \"dataValue\": \"45\"}]" *****/ char *indata="[{\"paramsId\":\"suning\", \"dataValue\": \"80\" },{\"paramsId\": \"temperature\", \"dataValue\": \"45\"}]"; //上报设备事件数据 printf("加密前报文: %s \r\n",indata); char outdata[512]; //加密后数据 int outlen; memset(outdata,0,sizeof(outdata)); /* 对上报数据Base64加密处理 */ int jm= base64_encode(indata, strlen(indata), outdata, &outlen); printf("加密后数据: %s \r\n",outdata); strcpy(publishEventMeassage,outdata);//填充Ba64加密后的消息体内容 int pubStringLen= strlen(publishEventMeassage); printf(" char *pubString. %s \r\n",publishEventMeassage); memset(buf,0,buflen); topicString.cstring = pTopic; // publish topic name to Server len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, (unsigned char*)publishEventMeassage, pubStringLen); rc = transport_sendPacketBuffer(mysock, buf, len); printf("disconnecting. \r\n"); len = MQTTSerialize_disconnect(buf, buflen); rc = transport_sendPacketBuffer(mysock, buf, len); free(outdata); free(buf); break; case SOCK_CLOSE_WAIT: close(SOCK_EVENT); printf("SOCK_CLOSE_WAIT. \r\n"); break; } } /********* * MQTT订阅消息函数 * 订阅iotrelay平台设备物模型功能 * @params pTopic 为订阅物模型功能主题 ,此topic对应iotrelay中继平台中定义的设备物模型功能数据,由类别标识、设备ID、物模型功能ID组成,例如 "/iotboxFunction/44080000001111000019/F_1696584326733" * ********/ void subscribIotrelayModelFunction(char *pTopic) { MQTTPacket_connectData data = MQTTPacket_connectData_initializer; int rc = 1; int mysock = SOCK_SUBSCRIBED; unsigned char buf[256]; int buflen = sizeof(buf); int msgid = 1; MQTTString topicString = MQTTString_initializer; int req_qos = 0; int len = 0; switch (getSn_SR(SOCK_SUBSCRIBED)) { case SOCK_CLOSED: socket(SOCK_SUBSCRIBED, Sn_MR_TCP, local_port++, Sn_MR_ND); printf("SOCK_CLOSED \r\n"); break; case SOCK_INIT: connect(SOCK_SUBSCRIBED, remote_ip, remote_port); printf("SOCK_INIT \r\n"); break; case SOCK_ESTABLISHED: if (getSn_IR(SOCK_SUBSCRIBED)&Sn_IR_CON) { printf("TCP established \r\n"); setSn_IR(SOCK_SUBSCRIBED, Sn_IR_CON); } data.clientID.cstring = mqtt_clientID; //client ID data.keepAliveInterval = 20; //keep alive time data.cleansession = 1; // clean the last info after re-link (1: clean; 0: not clean) data.username.cstring = mqtt_account; // the username if MQTT server need data.password.cstring = mqtt_password; // the password if MQTT server need len = MQTTSerialize_connect(buf, buflen, &data); // Serialize the MQTT Packet according the connect data printf("Serialize the MQTT Packet according the connect data %d buf: %s \r\n",len,buf); rc = transport_sendPacketBuffer(mysock, buf, len); // sned packet to Server by driver printf("sned packet to Server by driver %d \r\n",rc); vTaskDelay(5*1000); /*wait for connack*/ if (MQTTPacket_read(buf, buflen, transport_getdata2)==CONNACK) // call the driver to get conn ack packet from Server { unsigned char sessionPresent, connack_rc; if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0) //deserialize packet to confirm the MQTT connect success { printf("Unable to connect, return code %d \r\n", connack_rc); transport_close(mysock); // return 0; } else { printf("Connect succeed!\r\n"); } } else { transport_close(mysock); // return -1; } printf("into subscribe topic name from Server \r\n"); /*subscribe*/ topicString.cstring =pTopic; // subscribe topic name from Server len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos); //serialize subscribe topic packet rc = transport_sendPacketBuffer(mysock, buf, len); // sned packet to Server by driver /* wait for suback */ if (MQTTPacket_read(buf, buflen, transport_getdata2) == SUBACK) // call the driver to get subscribe ack packet from Server { unsigned short submsgid; int subcount; int granted_qos; rc = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, buf, buflen); // deserialize packet to confirm the MQTT subscribe success if (granted_qos != 0) { printf("granted qos != 0, %d \r\n", granted_qos); transport_close(mysock); // return -1; } else { printf("Received suback \r\n"); } } else { transport_close(mysock); // return -1; } vTaskDelay(5*1000); printf("loop get msgs on 'subscribed' topic. %s \r\n",pTopic); printf("loop get msgs on 'subscribed' topic. \r\n"); /*loop get msgs on 'subscribed' topic and send msgs on 'pubtopic' topic*/ memset(buf,0,buflen); /*接收数据会阻塞,除非服务器断开连接后才返回*/ while(getSn_SR(SOCK_SUBSCRIBED)==SOCK_ESTABLISHED) { printf("接收数据会阻塞. buf: %s buflen= %d\r\n",buf,buflen); /* transport_getdata() has a built-in 1 second timeout, your mileage will vary */ if (MQTTPacket_read(buf, buflen, transport_getdata2) == PUBLISH) { unsigned char dup; //re-send flag int qos=0; // Service quality level unsigned char retained; //keep flag unsigned short msgid; int payloadlen_in; unsigned char* payload_in; MQTTString receivedTopic; printf("接收数据会阻塞. buf: %s buflen= %d\r\n",buf,buflen); rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic, &payload_in, &payloadlen_in, buf, buflen); printf("message arrived %d: %s\n\r", payloadlen_in, payload_in); if(payload_in!=NULL){ if (strlen((const char *)payload_in) > 0) { //Base64解密接收到的报文 char outdata[600]; //接收到的报文数据 int outlen; memset(outdata,0,sizeof(outdata)); int jm= base64_decode((const char *)payload_in, strlen((const char *)payload_in), outdata, &outlen); if(jm<0){ printf("解密数据错误:\r\n"); }else{ printf("解密后数据: %s \r\n",outdata); //解析JSON格式字符串 cJSON *root = cJSON_Parse(outdata); if(root != NULL) { char *json_str = cJSON_Print(root); printf("解析解密后的JSON数据 %s", json_str); //此处根据获取的物模型功能指令,填写设备需要操作的代码,例如设备关机 char *deviceId = cJSON_GetObjectItem(root,"deviceId")->valuestring; printf("deviceId = %s\r\n",deviceId);//解析获取的设备ID char *functionId = cJSON_GetObjectItem(root,"functionId")->valuestring; printf("functionId = %s\r\n",functionId);//解析获取的物模型功能ID char *functionName = cJSON_GetObjectItem(root,"functionName")->valuestring; printf("functionName = %s\r\n",functionName);//解析获取的物模型功能命令字符串 //解析解密后的JSON格式字符串 根据解析的数据,提供设备的功能服务,例如设备关机 /* { "deviceId": "44080000001111000019", "functionId": "F_1696584326733", "functionName": "close system" } */ cJSON_Delete(root); // 释放内存空间 } else { printf("Error before: [%s]\r\n",cJSON_GetErrorPtr()); } } free(outdata); } } } else { printf("No data arrived.\r\n"); } } // Delay_s(1); vTaskDelay(5*1000); printf("disconnecting. \r\n"); len = MQTTSerialize_disconnect(buf, buflen); rc = transport_sendPacketBuffer(mysock, buf, len); free(buf); break; case SOCK_CLOSE_WAIT: close(SOCK_SUBSCRIBED); printf("SOCK_CLOSE_WAIT. \r\n"); rc=1; break; } // return rc; }
Mqtt协议接入STM例程打包源码进入IOT中继宝盒主操作界面打开“IOT设备接口”窗口,选择对应的设备–设备接入端接口中对应的协议接入样例中下载。