三、开发版MQTT协议接入开发指南

三、开发版MQTT协议接入开发指南


1、mqtt协议接入开发示例

1)、简介

  MQTT是基于TCP/IP协议栈构建的异步通信消息协议,是一种轻量级的发布、订阅信息传输协议。可以在不可靠的网络环境中进行扩展,适用于设备硬件存储空间或网络带宽有限的场景。使用MQTT协议,消息发送者与接收者不受时间和空间的限制。

  IOT中继宝盒嵌入了MQTT消息服务器为接入设备接入物联网服务。

  此设备采用Mqtt协议接入本IOT中继系统平台,实现从接入嵌入设备读、写、上报物模型数据。 设备读写报文数据采用字符串BASE64加密传输。
(
本例程以stm32开发版+W5500以太网模块设备接入为例

2)、MQTT服务器连接配置

MQTT服务器地址: tcp://:192.168.0.105:1883( IOT中继宝盒IOT设备接口–配置说明中显示的MQTT服务器连接地址和端口

用户名:txbxxxxxIOT中继宝盒IOT设备接口–配置说明中显示的MQTT服务器用户名

密码:dHhiMDIxMjE4 (IOT中继宝盒IOT设备接口–配置说明中显示的MQTT服务器客户端口令,BASE64解密使用

客户端ID: 44080000001111000019连接IOT中继宝盒的设备ID

3)、发布TOPIC

设备侧MQTT协议使用前提为设备启用、连接协议配置生效。设备侧Mqtt发布TOPIC为配置的物模型属性、物模型事件对应的上报设备属性数据和事件发生上报的数据。 数据流向为上行
对应的发布的主题TOPIC 、服务质量格式分别为:

类别主题TOPIC描述服务质量
属性数据/iotboxProperties/44080000001111000019/P_1696583483495此topic对应设备物模型属性数据,由类别标识、设备ID、物模型属性ID组成。
    主题中 “44080000001111000019” 为设备ID; "P_1696583483495"为设备物模型定义的属性ID标识
QS1
事件数据/iotboxEvent/44080000001111000019/E_1696584389929此topic对应设备物模型事件数据,由类别标识、设备ID、物模型事件ID组成。
    主题中 “44080000001111000019” 为设备ID; "E_1696584389929"为设备物模型定义的事件ID标识
QS1
4)、订阅TOPIC

设备侧Mqtt订阅TOPIC为配置的物模型功能通常用于设备接收处理功能指令的操作。 数据流向为下行
对应的订阅的主题TOPIC 、服务质量格式分别为:

类别主题TOPIC描述服务质量
功能数据/iotboxFunction/44080000001111000019/F_1696584326733此topic对应设备物模型功能数据,由类别标识、设备ID、物模型功能ID组成。
    主题中 “44080000001111000019” 为设备ID; "F_1696584326733"为设备物模型定义的功能ID标识
QS1
5)、发布消息体格式

设备侧向MQTT服务器发布消息体格式采用字符串BASE64加密传输。 参数属性说明:

类别主题TOPIC加密前消息体内容加密后消息体内容
属性数据/iotboxProperties/44080000001111000019/P_1696583483495如:温度 "36" 度MzY=
事件数据/iotboxEvent/44080000001111000019/E_1696584389929事件数据为json数值格式,由物模型事件属性中的输出参数组成。加密前格式如:
[ {"paramsId": "aaaaaa", "dataValue": "36"}, {"paramsId": "bbbbbb", "dataValue": "45"}, ...... ]json字符串内容说明paramsId上报设备事件输出参数ID标识,为设备物模型配置的事件输出参数ID标识 如上例 "aaaaaa" 、"bbbbbb" 为温度和湿度参数标识dataValue上报设备事件输出参数数据值,dataValue 为上报设备事件输出参数值标识 上例"36"、"45" 上报设备事件输出参数数据值,为输出参数温度和湿度数据值
WyB7InBhcmFtc0lkIjogImFhYWFhYSIsICJkYXRhVmFsdWUiOiAiMzYiIH0seyJwYXJhbXNJZCI6ICJiYmJiYmIiLCAgImRhdGFWYWx1ZSI6ICI0NSJ9LF0=
6)、 订阅消息体格式

设备侧向MQTT服务器订阅消息体格式采用字符串BASE64加密传输。 参数属性说明:
主题TOPIC/iotboxFunction/44080000001111000019/F_1696584326733
订阅返回消息体内容:

ewogICAgICJkZXZpY2VJZCI6ICI0NDA4MDAwMDAwMTExMTAwMDAxNiIsCiAgICAgImZ1bmN0aW9uSWQiOiAiRl8xNjk2NTg0MzI2NzMzIiwKICAgICAiZnVuY3Rpb25OYW1lIjogIui/nOeoi+WFs+acuiIsCiAgICAgWwogICAgIHsicGFyYW1zSWQiOiAiYWFhYWFhIiwgInBhcmFtc05hbWUiOiAi5bCP6L2m5b2S5L2NIiwgImRhdGFWYWx1ZSI6ICJ0cnVlIiB9LAogICAgIF0KIH0=

订阅返回消息体内容:

消息体数据为json数值格式,由物模型功能定义中的输入参数组成。解密后的格式如:

 {
 "deviceId": "44080000001111000016",
 "functionId": "F_1696584326733",
 "functionName": "远程关机",
 "inputParamsList": [
 {"paramsId": "aaaaaa", "paramsName": "小车归位", "dataValue": "true" },
      ......
 ]
 }
json字符串内容说明
deviceId定义的设备ID标识 如上例黄色 "44080000001111000016" 为需要处理下发的功能指令设备标识
functionId设备物模型功能定义中的功能ID标识 如上例黄色 "F_1696584326733" 为设备物模型功能定义中的功能ID标识
functionName设备物模型功能定义中的功能名称 如上例黄色 "远程关机" 为设备物模型功能定义中的功能名称
paramsId输入参数ID标识,为设备物模型功能定义中的输入参数ID标识 如上例黄色 "aaaaaa" 为输入参数标识
paramsName输入参数名称,为设备物模型功能定义中的输入参数名称 如上例黄色 "远程关机" 为设备物模型功能定义中的输入参数名称
dataValue输入参数数据值,dataValue 为设备物模型功能定义中的输入参数数值标识 上例黄色 "true" 设备物模型功能定义中的输入参数数值,为输入参数布尔值true
7)、开发板接入例程部分C语言代码:
a、C语言代码中接入参数配置

ts_mqtt.c文件中定义:

   char *mqtt_clientID="44080000001111000019"; //Mqtt 客户端ID 填写iotrelay中继平台接入设备的ID
   char *mqtt_account  ="txb0727"; //Mqtt 服务器连接账号 填写iotrelay中继平台接入设备接口生成的账号
   char *mqtt_password ="xxxxxx";  //Mqtt 服务器连接密码 填写iotrelay中继平台接入设备接口生成的口令
   uint16 local_port = 5000;  //设置开发板本地端口,根据需要设置
   uint8 remote_ip[4] = {192.168.0.102}; //连接mqtt服务器IP
   uint16 remote_port = 1883;  //连接mqtt服务器端口

参数说明:

参数说明
mqtt_clientID连接本IOT中继平台Mqtt服务器的客户端ID, 填写iot中继平台接入设备的ID。
mqtt_account连接本IOT中继平台Mqtt服务器的连接账号, 填写iot中继平台接入设备接口生成的账号
mqtt_password连接本IOT中继平台Mqtt服务器的口令, 填写iot中继平台接入设备接口生成的口令,嵌入设备中填写base64解密后的口令
remote_ip[4]连接本IOT中继平台Mqtt服务器的IP, 填写iot中继平台的IP
remote_port连接本IOT中继平台Mqtt服务器的端口, 填写iot中继平台Mqtt服务器的端口
b、发布物模型属性主题(上报属性数据)

发布上报物模型属性数据,在本样例中ts_mqtt.c文件publishIotrelayModelProperties方法实现,部分代码段:

  /****
  *此处为构建上报的报文,获取设备属性数据在此填入,例如在此调用gpio获取设备的温度,在此范例处先直接填入数字
  *上报设备物模型属性数据,例如 温度 36 度,
  *****/
  char *indata="36" ; //上报设备属性数据 温度 ,在此范例处先直接填入数字,实际应用调用gpio获取设备的物模型属性数据
  char outdata[20]; //加密后数据
  int outlen;
  memset(outdata,0,sizeof(outdata));
  /* 对上报属性数据Base64加密处理  */
  int    jm= base64_encode(indata, strlen(indata), outdata, &outlen);

参数说明:
“36” 为要上报的属性实际数值,例如温度36 度(设备的属性值为您程序开发中获取,获取后替换); int jm= base64_encode(indata, strlen(indata), outdata, &outlen); 为对上报属性数据Base64加密处理

c、发布物模型事件主题(上报事件数据)

发布上报物模型事件数据,在本样例中ts_mqtt.c文件publishIotrelayModelEvent方法实现,部分代码段:

/****
* 此处为构建上报的报文,设备属性数据在此填入,例如在此调用gpio获取设备的光照、温度
*上报设备物模型事件数据,事件数据为json数值格式,由物模型事件属性中的输出参数组成,
* 例如:设备关机事件上报数据
* 数据为json格式字符串 "[{\"paramsId\":\"suning\", \"dataValue\": \"80\" },{\"paramsId\": \"temperature\",  \"dataValue\": \"45\"}]"
*****/
char *indata="[{\"paramsId\":\"suning\", \"dataValue\": \"80\" },{\"paramsId\": \"temperature\",  \"dataValue\": \"45\"}]"; //上报设备事件数据
printf("加密前报文: %s \r\n",indata);
char outdata[512]; //加密后数据
int outlen;
memset(outdata,0,sizeof(outdata));
/* 对上报数据Base64加密处理  */
int    jm= base64_encode(indata, strlen(indata), outdata, &outlen);

char *indata=“[{"paramsId":"suning", "dataValue": "80" },{"paramsId": "temperature", "dataValue": "45"}]”; 为上报设备事件json数据 。 int jm= base64_encode(indata, strlen(indata), outdata, &outlen); 为对上报数据Base64加密处理
设备事件json数据参数说明:

参数说明
paramsId上报设备事件输出参数ID标识,为设备物模型配置的事件输出参数ID标识         如上例 “suning” 、 “temperature” 为光照和温度参数标识
dataValue上报设备事件输出参数数据值,dataValue 为上报设备事件输出参数值标识         上例 “80” 、 “45” 上报设备事件输出参数数据值,为输出参数光照温和度数据值;设备的属性值为您程序开发中获取,获取后替换
d、订阅物模型功能主题(接收下发指令)

订阅物模型功能,在本样例中ts_mqtt.c文件 **subscribIotrelayModelFunction **方法实现,部分代码段:

  //Base64解密接收到的报文
  char outdata[600]; //接收到的报文数据
  int outlen;
  memset(outdata,0,sizeof(outdata));
  int jm= base64_decode((const char *)payload_in, strlen((const char *)payload_in), outdata, &outlen);

   if(jm<0){
      printf("解密数据错误:\r\n");
   }else{
     printf("解密后数据: %s \r\n",outdata);
     //解析JSON格式字符串
     cJSON *root = cJSON_Parse(outdata);
     if(root != NULL)
     {
       char *json_str = cJSON_Print(root);
       printf("解析解密后的JSON数据 %s", json_str);

       //此处根据获取的物模型功能指令,填写设备需要操作的代码,例如设备关机

int jm= base64_decode((const char *)payload_in, strlen((const char *)payload_in),outdata, &outlen); 为base64解密订阅功能接收的json字符串数据 。 cJSON *root = cJSON_Parse(outdata); 为解析JSON格式字符串
//此处根据获取的物模型功能指令,填写设备需要操作的代码,例如设备关机

e、核心代码
main.c文件
  /******************************************************************************
  * @file   mqqt协议发布上报设备属性或设备事件数据,订阅设备功能  Project Template  ../main.c
  * @author  txb0727
  * @version V1.0.0
  * @date    2023-12-10
  * @brief   Main program body
  ******************************************************************************
  * @attention
  * 本范例用于嵌入式开发版采用mqqt协议发布上报设备属性或设备事件数据,订阅设备功能,仅供参考
  *  本范例硬件为stm32f103开发板,通讯模块为W5500以太网通讯模块
  *
  * <h2><center>© COPYRIGHT 2023 txb0727.</center></h2>
  ******************************************************************************/
/* Includes ------------------------------------------------------------------*/
#include <stm32f10x.h>
#include "FreeRTOS.H"
#include "task.h"
#include "mcu_init.h"
#include "config.h"
#include "device.h"
#include "spi2.h"
#include "socket.h"
#include "w5500.h"
#include "at24c16.h"
#include "util.h"
#include "dhcp.h"
#include "string.h"
#include <stdio.h>
#include "ts_mqtt.h"

static TaskHandle_t AppTaskCreate_Handle = NULL;/* 创建任务句柄 */
static TaskHandle_t subscribeFunctionTopicTask_Handle = NULL;/* KEY任务句柄 */
static TaskHandle_t publishPropertiesTopicTask_Handle = NULL;/* KEY任务句柄 */
static TaskHandle_t publishEventTopicTask_Handle = NULL;/* LED任务句柄 */
static TaskHandle_t dhcpTask_Handle = NULL;/* KEY任务句柄 */

static void subscribeFunctionTopicTask(void* pvParameters);/* subscribeFunctionTopicTask任务实现 */
static void publishPropertiesTopicTask(void* pvParameters);/* publishPropertiesTopicTask任务实现 */
static void publishEventTopicTask(void* pvParameters);/* publishEventTopicTask任务实现 */
static void dhcpTask(void* pvParameters);/* dhcpTask任务实现 */

static char publishPropertiesMeassage[200]; //发布物模型属性消息体
static char publishEventMeassage[200];  //发布物模型事件消息体
    /******************
    * 此topic对应iotrelay中继平台中定义的设备物模型属性/功能/事件,由类别标识、设备ID、物模型属性ID/功能ID/事件ID组成
    *************/
    //订阅物模型功能Topic
    char *subscribeFunctionTopic="/iotboxFunction/44080000001111000019/F_1696584326733";
     //发布物模型属性Topic
    char *publishPropertiesTopic="/iotboxProperties/44080000001111000019/P_1696583483495";
     //发布物模型事件Topic
    char *publishEventTopic="/iotboxEvent/44080000001111000019/E_1696584389929";

    static void subscribeFunctionTopicTask(void *pvParameters){

    while (1) {
            /*********
            * 订阅iotrelay平台设备功能
            * subscribeFunctionTopic 为订阅物模型功能主题
            ********/
            subscribIotrelayModelFunction(subscribeFunctionTopic);
            printf("subscribeFunctionTopicTask min free stack size is %d \r\n",(int32_t)uxTaskGetStackHighWaterMark(NULL));
            vTaskDelay(5*1000);  // 延时5000毫秒
    }
}

static void publishPropertiesTopicTask(void *pvParameters) {
    while (1) {
            /*********
            * 发布iotrelay平台设备属性
            * publishPropertiesTopic 为发布物模型属性主题
            ********/
            memset(publishPropertiesMeassage,0,sizeof(publishPropertiesMeassage));
            publishIotrelayModelProperties(publishPropertiesTopic,publishPropertiesMeassage);
            printf("publishPropertiesTopicTask min free stack size is %d \r\n",(int32_t)uxTaskGetStackHighWaterMark(NULL));
            vTaskDelay(60*1000);  // 延时1秒
    }
}
static void publishEventTopicTask(void *pvParameters) {
    while (1) {
            /*********
            * 发布iotrelay平台设备事件   根据业务需要调用此发布物模型事件,如设备关机、设备预警等
            * publishEventTopic 为发布物模型事件主题
            ********/
            memset(publishEventMeassage,0,sizeof(publishEventMeassage));
            publishIotrelayModelEvent(publishEventTopic,publishEventMeassage);
            printf("publishEventTopicTask min free stack size is %d \r\n",(int32_t)uxTaskGetStackHighWaterMark(NULL));
            vTaskDelay(60*1000);  // 延时1秒
    }
}
static void dhcpTask(void *pvParameters) {
    while (1) {
      DHCP_run();
      printf("dhcpTask min free stack size is %d \r\n",(int32_t)uxTaskGetStackHighWaterMark(NULL));
      vTaskDelay(1000);  // 延时100毫秒
    }
}

static void AppTaskCreate(void){
    BaseType_t xReturn = pdPASS;
    taskENTER_CRITICAL();
    xReturn = xTaskCreate(
                        (TaskFunction_t) subscribeFunctionTopicTask,
                        (const char*)    "subscribeFunctionTopicTask",
                        (uint32_t)       1024,
                        (void*)                     NULL,
                        (UBaseType_t)    1,
                        (TaskHandle_t*)  &subscribeFunctionTopicTask_Handle
                    );
    if (pdPASS == xReturn)
        printf("subscribeFunctionTopicTask created successfully\r\n");
    else
        printf("subscribeFunctionTopicTask created failed\r\n");

    xReturn = xTaskCreate(
                        (TaskFunction_t) publishPropertiesTopicTask,
                        (const char*)    "publishPropertiesTopicTask",
                        (uint32_t)       350,
                        (void*)                     NULL,
                        (UBaseType_t)    1,
                        (TaskHandle_t*)  &publishPropertiesTopicTask_Handle
                    );
    if (pdPASS == xReturn)
        printf("publishPropertiesTopicTask created successfully\r\n");
    else
        printf("publishPropertiesTopicTask created failed\r\n");
    xReturn = xTaskCreate(
                    (TaskFunction_t) publishEventTopicTask,
                    (const char*)    "publishEventTopicTask",
                    (uint32_t)       350,
                    (void*)                     NULL,
                    (UBaseType_t)    1,
                    (TaskHandle_t*)  &publishEventTopicTask_Handle
                );
    if (pdPASS == xReturn)
        printf("publishEventTopicTask created successfully\r\n");
    else
        printf("publishEventTopicTask created failed\r\n");
    xReturn = xTaskCreate(
                        (TaskFunction_t) dhcpTask,
                        (const char*)    "dhcpTask",
                        (uint32_t)       100,
                        (void*)                     NULL,
                        (UBaseType_t)    1,
                        (TaskHandle_t*)  &dhcpTask_Handle
                    );
    if (pdPASS == xReturn)
        printf("dhcpTask created successfully\r\n");
    else
        printf("dhcpTask created failed\r\n");
    vTaskDelete(AppTaskCreate_Handle);
    taskEXIT_CRITICAL();
}


int main()
{
    BaseType_t xReturn = pdPASS;
    RCC_Configuration(); /* 配置单片机系统时钟*/
    NVIC_Configuration();/* 配置嵌套中断向量*/
    Systick_Init(72);/* 初始化Systick工作时钟*/
    GPIO_Configuration();/* 配置GPIO*/
    Timer_Configuration();/*定时器初始化*/
    USART1_Init(); /*初始化串口通信:115200@8-n-1*/
    at24c16_init();/*初始化eeprom*/
    printf("W5500 EVB initialization over.\r\n");

    Reset_W5500();/*硬重启W5500*/
    WIZ_SPI_Init();/*初始化SPI接口*/
    printf("W5500 initialized!\r\n");

    set_default();
    init_dhcp_client();

   // 创建任务
     xReturn = xTaskCreate(
                            (TaskFunction_t) AppTaskCreate,
                            (const char*)    "AppTaskCreate",
                            (uint32_t)       128,
                            (void*)                     NULL,
                            (UBaseType_t)    3,
                            (TaskHandle_t*)  &AppTaskCreate_Handle
                        );
    if (xReturn == pdPASS)
        vTaskStartScheduler(); // 启动调度器
    while(1);
}

/******************* (C)  COPYRIGHT 2023 txb0727  ****** END OF FILE ****/
ts_mqtt.c文件
#include "ts_mqtt.h"
#include "MQTTPacket.h"
#include "transport.h"
#include "util.h"
#include "w5500.h"
#include "socket.h"
#include <string.h>
#include <stdlib.h>
#include "stm32f10x.h"                  // Device header
#include "base64.h"
#include "cJSON.h"
#include "utf8_gb2312_switch.h"
char *mqtt_clientID="44080000001111000019"; //Mqtt 客户端ID 填写iotrelay中继平台接入设备的ID
char *mqtt_account  = "iotrelayAdmin"; //Mqtt 服务器连接账号 填写iotrelay中继平台接入设备接口生成的账号
char *mqtt_password = "txb021218";  //Mqtt 服务器连接密码 填写iotrelay中继平台接入设备接口生成的口令
uint16 local_port = 5000;  //设置开发板本地端口
uint8 remote_ip[4] = {192, 168, 0, 103}; //连接mqtt服务器端口
uint16 remote_port = 1883;  //连接mqtt服务器端口

//MQTT发布消息函数
/*********
* 发布iotrelay平台设备属性 消息体内容Base64加密传输
* @params  pTopic 为发布物模型属性主题 ,此topic对应iotrelay中继平台中定义的设备物模型属性数据,由类别标识、设备ID、物模型属性ID组成,例如  "/iotboxProperties/44080000001111000019/P_1696583483495"
* @params publishPropertiesMeassage 为发布消息体内容
********/
void publishIotrelayModelProperties(char *pTopic,char *publishPropertiesMeassage)
{
    MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
    int rc = 1;
    int mysock = SOCK_PROPERTIES;
    unsigned char buf[200];
    int buflen = sizeof(buf);

    MQTTString topicString = MQTTString_initializer;

    int len = 0;

    switch (getSn_SR(SOCK_PROPERTIES))
    {
        case SOCK_CLOSED:
            socket(SOCK_PROPERTIES, Sn_MR_TCP, local_port++, Sn_MR_ND);
            printf("SOCK_CLOSED \r\n");

            break;
        case SOCK_INIT:
            connect(SOCK_PROPERTIES, remote_ip, remote_port);
            printf("SOCK_INIT \r\n");

            break;
        case SOCK_ESTABLISHED:
            if (getSn_IR(SOCK_PROPERTIES)&Sn_IR_CON)
            {
                printf("TCP established \r\n");
                setSn_IR(SOCK_PROPERTIES, Sn_IR_CON);
            }

            data.clientID.cstring = mqtt_clientID;    //client ID
            data.keepAliveInterval = 60;    //keep alive time
            data.cleansession = 1;    // clean the last info after re-link (1: clean; 0: not clean)
            data.username.cstring = mqtt_account;    // the username if MQTT server need
            data.password.cstring = mqtt_password;    // the password if MQTT server need

            len = MQTTSerialize_connect(buf, buflen, &data);  // Serialize the MQTT Packet according the connect data
              printf("Serialize the MQTT Packet according the connect data  %d \r\n",len);

            rc = transport_sendPacketBuffer(mysock, buf, len);    // sned packet to Server by driver
            printf("sned packet to Server by driver %d \r\n",rc);


            /*wait for connack*/
            if (MQTTPacket_read(buf, buflen, transport_getdata3)==CONNACK)    // call the driver to get conn ack packet from Server
            {
                unsigned char sessionPresent, connack_rc;
                if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0)    //deserialize packet to confirm the MQTT connect success
                {
                    printf("Unable to connect, return code %d \n", connack_rc);
                    transport_close(mysock);
//                    return 0;
                }
                else
                {
                    printf("Connect succeed!\r\n");
                }
            }
            else
            {
                transport_close(mysock);
//                return 0;
            }


            /****
            *此处为构建上报的报文,获取设备属性数据在此填入,例如在此调用gpio获取设备的温度,在此范例处先直接填入数字
            *上报设备物模型属性数据,例如 温度 36 度,
            *****/
            char *indata="36"; //上报设备属性数据 温度 ,在此范例处先直接填入数字,实际应用调用gpio获取设备的物模型属性数据
            char outdata[20]; //加密后数据
            int outlen;
            memset(outdata,0,sizeof(outdata));
                /* 对上报属性数据Base64加密处理  */
            int    jm= base64_encode(indata, strlen(indata), outdata, &outlen);
            printf("加密后数据: %s \r\n",outdata);

            strcpy(publishPropertiesMeassage,outdata);//填充Ba64加密后的消息体内容
            int pubStringLen= strlen(publishPropertiesMeassage);
            printf(" char *pubString. %s \r\n",publishPropertiesMeassage);

            memset(buf,0,buflen);
            topicString.cstring = pTopic; // publish topic name to Server
            len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, (unsigned char*)publishPropertiesMeassage, pubStringLen);
            rc = transport_sendPacketBuffer(mysock, buf, len);

            printf("disconnecting. \r\n");
            len = MQTTSerialize_disconnect(buf, buflen);
            rc = transport_sendPacketBuffer(mysock, buf, len);

            free(outdata);
            free(buf);
            break;
        case SOCK_CLOSE_WAIT:
            close(SOCK_PROPERTIES);
            printf("SOCK_CLOSE_WAIT. \r\n");
            break;
    }


}
/*********
* 发布iotrelay平台设备事件  消息体内容Base64加密传输
* @params  pTopic 为发布物模型事件主题 ,此topic对应iotrelay中继平台中定义的设备物模型事件数据,由类别标识、设备ID、物模型事件ID组成,例如  "/iotboxEvent/44080000001111000019/E_1696584389929"
* @params publishEventMeassage 为发布消息体内容
********/
void publishIotrelayModelEvent(char *pTopic,char *publishEventMeassage)
{
    MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
    int rc = 1;
    int mysock = SOCK_EVENT;
    unsigned char buf[200];
    int buflen = sizeof(buf);

    MQTTString topicString = MQTTString_initializer;

    int len = 0;

    switch (getSn_SR(SOCK_EVENT))
    {
        case SOCK_CLOSED:
            socket(SOCK_EVENT, Sn_MR_TCP, local_port++, Sn_MR_ND);
            printf("SOCK_CLOSED \r\n");

            break;
        case SOCK_INIT:
            connect(SOCK_EVENT, remote_ip, remote_port);
            printf("SOCK_INIT \r\n");

            break;
        case SOCK_ESTABLISHED:
            if (getSn_IR(SOCK_EVENT)&Sn_IR_CON)
            {
                printf("TCP established \r\n");
                setSn_IR(SOCK_EVENT, Sn_IR_CON);
            }

            data.clientID.cstring = mqtt_clientID;    //client ID
            data.keepAliveInterval = 60;    //keep alive time
            data.cleansession = 1;    // clean the last info after re-link (1: clean; 0: not clean)
            data.username.cstring = mqtt_account;    // the username if MQTT server need
            data.password.cstring = mqtt_password;    // the password if MQTT server need

            len = MQTTSerialize_connect(buf, buflen, &data);  // Serialize the MQTT Packet according the connect data
            printf("Serialize the MQTT Packet according the connect data  %d \r\n",len);

            rc = transport_sendPacketBuffer(mysock, buf, len);    // sned packet to Server by driver
            printf("sned packet to Server by driver %d \r\n",rc);


            /*wait for connack*/
            if (MQTTPacket_read(buf, buflen, transport_getdata4)==CONNACK)    // call the driver to get conn ack packet from Server
            {
                unsigned char sessionPresent, connack_rc;
                if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0)    //deserialize packet to confirm the MQTT connect success
                {
                    printf("Unable to connect, return code %d \r\n", connack_rc);
                    transport_close(mysock);
//                    return 0;
                }
                else
                {
                    printf("Connect succeed!\r\n");
                }
            }
            else
            {
                transport_close(mysock);
//                return 0;
            }


             /****
             * 此处为构建上报的报文,设备属性数据在此填入,例如在此调用gpio获取设备的湿度、温度
             *上报设备物模型事件数据,事件数据为json数值格式,由物模型事件属性中的输出参数组成,
             * 例如:设备关机事件上报数据
             * 数据为json格式字符串 "[{\"paramsId\":\"suning\", \"dataValue\": \"80\" },{\"paramsId\": \"temperature\",  \"dataValue\": \"45\"}]"
             *****/
            char *indata="[{\"paramsId\":\"suning\", \"dataValue\": \"80\" },{\"paramsId\": \"temperature\",  \"dataValue\": \"45\"}]"; //上报设备事件数据
            printf("加密前报文: %s \r\n",indata);

            char outdata[512]; //加密后数据
            int outlen;
            memset(outdata,0,sizeof(outdata));
            /* 对上报数据Base64加密处理  */
            int    jm= base64_encode(indata, strlen(indata), outdata, &outlen);
            printf("加密后数据: %s \r\n",outdata);

            strcpy(publishEventMeassage,outdata);//填充Ba64加密后的消息体内容
            int pubStringLen= strlen(publishEventMeassage);
            printf(" char *pubString. %s \r\n",publishEventMeassage);

            memset(buf,0,buflen);
            topicString.cstring = pTopic; // publish topic name to Server
            len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, (unsigned char*)publishEventMeassage, pubStringLen);
            rc = transport_sendPacketBuffer(mysock, buf, len);

            printf("disconnecting. \r\n");
            len = MQTTSerialize_disconnect(buf, buflen);
            rc = transport_sendPacketBuffer(mysock, buf, len);

            free(outdata);
            free(buf);

            break;
        case SOCK_CLOSE_WAIT:
            close(SOCK_EVENT);
            printf("SOCK_CLOSE_WAIT. \r\n");
            break;
    }

}





/*********
*  MQTT订阅消息函数
* 订阅iotrelay平台设备物模型功能
* @params  pTopic 为订阅物模型功能主题 ,此topic对应iotrelay中继平台中定义的设备物模型功能数据,由类别标识、设备ID、物模型功能ID组成,例如  "/iotboxFunction/44080000001111000019/F_1696584326733"
*
********/
void subscribIotrelayModelFunction(char *pTopic)
{
    MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
    int rc = 1;
    int mysock = SOCK_SUBSCRIBED;
    unsigned char buf[256];
    int buflen = sizeof(buf);
    int msgid = 1;
    MQTTString topicString = MQTTString_initializer;
    int req_qos = 0;
    int len = 0;

    switch (getSn_SR(SOCK_SUBSCRIBED))
    {
        case SOCK_CLOSED:
            socket(SOCK_SUBSCRIBED, Sn_MR_TCP, local_port++, Sn_MR_ND);
            printf("SOCK_CLOSED \r\n");

            break;
        case SOCK_INIT:
            connect(SOCK_SUBSCRIBED, remote_ip, remote_port);
            printf("SOCK_INIT \r\n");

            break;
        case SOCK_ESTABLISHED:
            if (getSn_IR(SOCK_SUBSCRIBED)&Sn_IR_CON)
            {
                printf("TCP established \r\n");
                setSn_IR(SOCK_SUBSCRIBED, Sn_IR_CON);
            }

      data.clientID.cstring = mqtt_clientID;    //client ID
            data.keepAliveInterval = 20;    //keep alive time
            data.cleansession = 1;    // clean the last info after re-link (1: clean; 0: not clean)
            data.username.cstring = mqtt_account;    // the username if MQTT server need
            data.password.cstring = mqtt_password;    // the password if MQTT server need


            len = MQTTSerialize_connect(buf, buflen, &data);  // Serialize the MQTT Packet according the connect data
            printf("Serialize the MQTT Packet according the connect data  %d buf: %s \r\n",len,buf);

            rc = transport_sendPacketBuffer(mysock, buf, len);    // sned packet to Server by driver
            printf("sned packet to Server by driver %d \r\n",rc);
            vTaskDelay(5*1000);
            /*wait for connack*/
            if (MQTTPacket_read(buf, buflen, transport_getdata2)==CONNACK)    // call the driver to get conn ack packet from Server
            {
                unsigned char sessionPresent, connack_rc;
                if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0)    //deserialize packet to confirm the MQTT connect success
                {
                    printf("Unable to connect, return code %d \r\n", connack_rc);
                    transport_close(mysock);
                 // return 0;
                }
                else
                {
                    printf("Connect succeed!\r\n");
                }
            }
            else
            {
                transport_close(mysock);
               // return -1;
            }

            printf("into subscribe topic name from Server \r\n");
            /*subscribe*/
            topicString.cstring =pTopic;    // subscribe topic name from Server
            len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos);    //serialize subscribe topic packet
            rc = transport_sendPacketBuffer(mysock, buf, len);    // sned packet to Server by driver

            /* wait for suback */
            if (MQTTPacket_read(buf, buflen, transport_getdata2) == SUBACK)    // call the driver to get subscribe ack packet from Server
            {
                unsigned short submsgid;
                int subcount;
                int granted_qos;

                rc = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, buf, buflen); // deserialize packet to confirm the MQTT subscribe success

                if (granted_qos != 0)
                {
                    printf("granted qos != 0, %d \r\n", granted_qos);
                    transport_close(mysock);
                      // return -1;
                }
                else
                {
                    printf("Received suback \r\n");
                }
            }
            else
            {
                transport_close(mysock);
             // return -1;
            }

            vTaskDelay(5*1000);
            printf("loop get msgs on 'subscribed' topic. %s \r\n",pTopic);
            printf("loop get msgs on 'subscribed' topic. \r\n");
            /*loop get msgs on 'subscribed' topic and send msgs on 'pubtopic' topic*/
            memset(buf,0,buflen);
        /*接收数据会阻塞,除非服务器断开连接后才返回*/
        while(getSn_SR(SOCK_SUBSCRIBED)==SOCK_ESTABLISHED)
            {

                 printf("接收数据会阻塞. buf: %s buflen= %d\r\n",buf,buflen);
                /* transport_getdata() has a built-in 1 second timeout,
                your mileage will vary */
                if (MQTTPacket_read(buf, buflen, transport_getdata2) == PUBLISH)
                {
                    unsigned char dup;    //re-send flag
                    int qos=0;    // Service quality level
                    unsigned char retained;    //keep flag
                    unsigned short msgid;
                    int payloadlen_in;
                    unsigned char* payload_in;

                    MQTTString receivedTopic;
                    printf("接收数据会阻塞. buf: %s buflen= %d\r\n",buf,buflen);
                    rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic,
                                                 &payload_in, &payloadlen_in, buf, buflen);

                    printf("message arrived %d: %s\n\r", payloadlen_in, payload_in);
                    if(payload_in!=NULL){
                        if (strlen((const char *)payload_in) > 0) {
                                //Base64解密接收到的报文
                                char outdata[600]; //接收到的报文数据
                                int outlen;
                                memset(outdata,0,sizeof(outdata));
                                int jm= base64_decode((const char *)payload_in, strlen((const char *)payload_in), outdata, &outlen);

                                if(jm<0){
                                    printf("解密数据错误:\r\n");
                                }else{
                                    printf("解密后数据: %s \r\n",outdata);
                                            //解析JSON格式字符串
                                            cJSON *root = cJSON_Parse(outdata);
                                            if(root != NULL)
                                            {
                                                    char *json_str = cJSON_Print(root);
                                                    printf("解析解密后的JSON数据 %s", json_str);

                                                //此处根据获取的物模型功能指令,填写设备需要操作的代码,例如设备关机

                                                    char *deviceId = cJSON_GetObjectItem(root,"deviceId")->valuestring;
                                                    printf("deviceId = %s\r\n",deviceId);//解析获取的设备ID

                                                    char  *functionId = cJSON_GetObjectItem(root,"functionId")->valuestring;
                                                    printf("functionId = %s\r\n",functionId);//解析获取的物模型功能ID

                                                    char  *functionName = cJSON_GetObjectItem(root,"functionName")->valuestring;
                                                    printf("functionName = %s\r\n",functionName);//解析获取的物模型功能命令字符串

                                                //解析解密后的JSON格式字符串 根据解析的数据,提供设备的功能服务,例如设备关机
                                                /*
                                                    {
                                                    "deviceId": "44080000001111000019",
                                                    "functionId": "F_1696584326733",
                                                    "functionName": "close system"
                                                    }
                                                */
                                            cJSON_Delete(root); // 释放内存空间
                                            }
                                            else
                                            {
                                                printf("Error before: [%s]\r\n",cJSON_GetErrorPtr());
                                            }

                                }

                                free(outdata);
                        }
                   }

                }
                else
                {
                    printf("No data arrived.\r\n");
                }



            }

    //   Delay_s(1);
        vTaskDelay(5*1000);
              printf("disconnecting. \r\n");
            len = MQTTSerialize_disconnect(buf, buflen);
            rc = transport_sendPacketBuffer(mysock, buf, len);
              free(buf);
            break;
        case SOCK_CLOSE_WAIT:
            close(SOCK_SUBSCRIBED);
            printf("SOCK_CLOSE_WAIT. \r\n");
             rc=1;
            break;
    }

    // return rc;
}


f、Mqtt协议接入STM例程下载

     Mqtt协议接入STM例程打包源码进入IOT中继宝盒主操作界面打开“IOT设备接口”窗口,选择对应的设备–设备接入端接口中对应的协议接入样例中下载。


IOT中继宝盒 长按关注宜联科技公众号

QQ在线咨询

点击这里给我发消息 咨询客服专员

QQ咨询

专业咨询

199-4502-1328

电话咨询

微信扫一扫

IOT中继宝盒

微信咨询
返回顶部