相关文件
参考教程:https://www.bilibili.com/video/BV1dJ411S723?p=53
29_MQTT_JX.zip
user_main.c
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// // //
// 工程: MQTT_JX // 注:在《esp_mqtt_proj》例程上修改 //
// // //
// 平台: 【技新电子】物联网开发板 ESP8266 V1.0 // ①:添加【详实的注释】唉,不说了,说多了都是泪!!! //
// // //
// 功能: ①:设置MQTT相关参数 // ②:修改【MQTT参数数组】config.h -> device_id/mqtt_host/mqtt_pass //
// // //
// ②:与MQTT服务端,建立网络连接(TCP) // ③:修改【MQTT_CLIENT_ID宏定义】mqtt_config.h -> MQTT_CLIENT_ID //
// // //
// ③:配置/发送【CONNECT】报文,连接MQTT服务端 // ④:修改【PROTOCOL_NAMEv31宏】mqtt_config.h -> PROTOCOL_NAMEv311 //
// // //
// ④:订阅主题"SW_LED" // ⑤:修改【心跳报文的发送间隔】mqtt.c -> [mqtt_timer]函数 //
// // //
// ⑤:向主题"SW_LED"发布"ESP8266_Online" // ⑥:修改【SNTP服务器设置】user_main.c -> [sntpfn]函数 //
// // //
// ⑥:根据接收到"SW_LED"主题的消息,控制LED亮灭 // ⑦:注释【遗嘱设置】user_main.c -> [user_init]函数 //
// // //
// ⑦:每隔一分钟,向MQTT服务端发送【心跳】 // ⑧:添加【MQTT消息控制LED亮/灭】user_main.c -> [mqttDataCb]函数 //
// // //
// 版本: V1.1 // //
// // //
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// 头文件
//==============================
#include "ets_sys.h"
#include "driver/uart.h"
#include "osapi.h"
#include "mqtt.h"
#include "wifi.h"
#include "config.h"
#include "debug.h"
#include "gpio.h"
#include "user_interface.h"
#include "mem.h"
#include "sntp.h"
//==============================
// 类型定义
//=================================
typedef unsigned long u32_t;
//=================================
// 全局变量
//============================================================================
MQTT_Client mqttClient; // MQTT客户端_结构体【此变量非常重要】
static ETSTimer sntp_timer; // SNTP定时器
//============================================================================
// SNTP定时函数:获取当前网络时间
//============================================================================
void sntpfn()
{
u32_t ts = 0;
ts = sntp_get_current_timestamp(); // 获取当前的偏移时间
os_printf("current time : %s\n", sntp_get_real_time(ts)); // 获取真实时间
if (ts == 0) // 网络时间获取失败
{
os_printf("did not get a valid time from sntp server\n");
}
else //(ts != 0) // 网络时间获取成功
{
os_timer_disarm(&sntp_timer); // 关闭SNTP定时器
MQTT_Connect(&mqttClient); // 开始MQTT连接
}
}
//============================================================================
// WIFI连接状态改变:参数 = wifiStatus
//============================================================================
void wifiConnectCb(uint8_t status)
{
// 成功获取到IP地址
//---------------------------------------------------------------------
if(status == STATION_GOT_IP)
{
ip_addr_t * addr = (ip_addr_t *)os_zalloc(sizeof(ip_addr_t));
// 在官方例程的基础上,增加2个备用服务器
//---------------------------------------------------------------
sntp_setservername(0, "us.pool.ntp.org"); // 服务器_0【域名】
sntp_setservername(1, "ntp.sjtu.edu.cn"); // 服务器_1【域名】
ipaddr_aton("210.72.145.44", addr); // 点分十进制 => 32位二进制
sntp_setserver(2, addr); // 服务器_2【IP地址】
os_free(addr); // 释放addr
sntp_init(); // SNTP初始化
// 设置SNTP定时器[sntp_timer]
//-----------------------------------------------------------
os_timer_disarm(&sntp_timer);
os_timer_setfn(&sntp_timer, (os_timer_func_t *)sntpfn, NULL);
os_timer_arm(&sntp_timer, 1000, 1); // 1s定时
}
// IP地址获取失败
//----------------------------------------------------------------
else
{
MQTT_Disconnect(&mqttClient); // WIFI连接出错,TCP断开连接
}
}
//============================================================================
// MQTT已成功连接:ESP8266发送【CONNECT】,并接收到【CONNACK】
//============================================================================
void mqttConnectedCb(uint32_t *args)
{
MQTT_Client* client = (MQTT_Client*)args; // 获取mqttClient指针
INFO("MQTT: Connected\r\n");
// 【参数2:主题过滤器 / 参数3:订阅Qos】
//-----------------------------------------------------------------
MQTT_Subscribe(client, "SW_LED", 0); // 订阅主题"SW_LED",QoS=0
// MQTT_Subscribe(client, "SW_LED", 1);
// MQTT_Subscribe(client, "SW_LED", 2);
// 【参数2:主题名 / 参数3:发布消息的有效载荷 / 参数4:有效载荷长度 / 参数5:发布Qos / 参数6:Retain】
//-----------------------------------------------------------------------------------------------------------------------------------------
MQTT_Publish(client, "SW_LED", "ESP8266_Online", strlen("ESP8266_Online"), 0, 0); // 向主题"SW_LED"发布"ESP8266_Online",Qos=0、retain=0
// MQTT_Publish(client, "SW_LED", "ESP8266_Online", strlen("ESP8266_Online"), 1, 0);
// MQTT_Publish(client, "SW_LED", "ESP8266_Online", strlen("ESP8266_Online"), 2, 0);
}
//============================================================================
// MQTT成功断开连接
//============================================================================
void mqttDisconnectedCb(uint32_t *args)
{
MQTT_Client* client = (MQTT_Client*)args;
INFO("MQTT: Disconnected\r\n");
}
//============================================================================
// MQTT成功发布消息
//============================================================================
void mqttPublishedCb(uint32_t *args)
{
MQTT_Client* client = (MQTT_Client*)args;
INFO("MQTT: Published\r\n");
}
//============================================================================
// 【接收MQTT的[PUBLISH]数据】函数 【参数1:主题 / 参数2:主题长度 / 参数3:有效载荷 / 参数4:有效载荷长度】
//===============================================================================================================
void mqttDataCb(uint32_t *args, const char* topic, uint32_t topic_len, const char *data, uint32_t data_len)
{
char *topicBuf = (char*)os_zalloc(topic_len+1); // 申请【主题】空间
char *dataBuf = (char*)os_zalloc(data_len+1); // 申请【有效载荷】空间
MQTT_Client* client = (MQTT_Client*)args; // 获取MQTT_Client指针
os_memcpy(topicBuf, topic, topic_len); // 缓存主题
topicBuf[topic_len] = 0; // 最后添'\0'
os_memcpy(dataBuf, data, data_len); // 缓存有效载荷
dataBuf[data_len] = 0; // 最后添'\0'
INFO("Receive topic: %s, data: %s \r\n", topicBuf, dataBuf); // 串口打印【主题】【有效载荷】
INFO("Topic_len = %d, Data_len = %d\r\n", topic_len, data_len); // 串口打印【主题长度】【有效载荷长度】
// 【技小新】添加
//########################################################################################
// 根据接收到的主题名/有效载荷,控制LED的亮/灭
//-----------------------------------------------------------------------------------
if( os_strcmp(topicBuf,"SW_LED") == 0 ) // 主题 == "SW_LED"
{
if( os_strcmp(dataBuf,"LED_ON") == 0 ) // 有效载荷 == "LED_ON"
{
GPIO_OUTPUT_SET(GPIO_ID_PIN(4),0); // LED亮
}
else if( os_strcmp(dataBuf,"LED_OFF") == 0 ) // 有效载荷 == "LED_OFF"
{
GPIO_OUTPUT_SET(GPIO_ID_PIN(4),1); // LED灭
}
}
//########################################################################################
os_free(topicBuf); // 释放【主题】空间
os_free(dataBuf); // 释放【有效载荷】空间
}
//===============================================================================================================
/******************************************************************************
* FunctionName : user_rf_cal_sector_set
* Description : SDK just reversed 4 sectors, used for rf init data and paramters.
* We add this function to force users to set rf cal sector, since
* we don't know which sector is free in user's application.
* sector map for last several sectors : ABCCC
* A : rf cal
* B : rf init data
* C : sdk parameters
* Parameters : none
* Returns : rf cal sector
*******************************************************************************/
uint32 ICACHE_FLASH_ATTR
user_rf_cal_sector_set(void)
{
enum flash_size_map size_map = system_get_flash_size_map();
uint32 rf_cal_sec = 0;
switch (size_map) {
case FLASH_SIZE_4M_MAP_256_256:
rf_cal_sec = 128 - 5;
break;
case FLASH_SIZE_8M_MAP_512_512:
rf_cal_sec = 256 - 5;
break;
case FLASH_SIZE_16M_MAP_512_512:
case FLASH_SIZE_16M_MAP_1024_1024:
rf_cal_sec = 512 - 5;
break;
case FLASH_SIZE_32M_MAP_512_512:
case FLASH_SIZE_32M_MAP_1024_1024:
rf_cal_sec = 1024 - 5;
break;
case FLASH_SIZE_64M_MAP_1024_1024:
rf_cal_sec = 2048 - 5;
break;
case FLASH_SIZE_128M_MAP_1024_1024:
rf_cal_sec = 4096 - 5;
break;
default:
rf_cal_sec = 0;
break;
}
return rf_cal_sec;
}
// user_init:entry of user application, init user function here
//===================================================================================================================
void user_init(void)
{
uart_init(BIT_RATE_115200, BIT_RATE_115200); // 串口波特率设为115200
os_delay_us(60000);
//【技小新】添加
//###########################################################################
PIN_FUNC_SELECT(PERIPHS_IO_MUX_GPIO4_U, FUNC_GPIO4); // GPIO4输出高 #
GPIO_OUTPUT_SET(GPIO_ID_PIN(4),1); // LED初始化 #
//###########################################################################
CFG_Load(); // 加载/更新系统参数【WIFI参数、MQTT参数】
// 网络连接参数赋值:服务端域名【mqtt_test_jx.mqtt.iot.gz.baidubce.com】、网络连接端口【1883】、安全类型【0:NO_TLS】
//-------------------------------------------------------------------------------------------------------------------
MQTT_InitConnection(&mqttClient, sysCfg.mqtt_host, sysCfg.mqtt_port, sysCfg.security);
// MQTT连接参数赋值:客户端标识符【..】、MQTT用户名【..】、MQTT密钥【..】、保持连接时长【120s】、清除会话【1:clean_session】
//----------------------------------------------------------------------------------------------------------------------------
MQTT_InitClient(&mqttClient, sysCfg.device_id, sysCfg.mqtt_user, sysCfg.mqtt_pass, sysCfg.mqtt_keepalive, 1);
// 设置遗嘱参数(如果云端没有对应的遗嘱主题,则MQTT连接会被拒绝)
//--------------------------------------------------------------
// MQTT_InitLWT(&mqttClient, "Will", "ESP8266_offline", 0, 0);
// 设置MQTT相关函数
//--------------------------------------------------------------------------------------------------
MQTT_OnConnected(&mqttClient, mqttConnectedCb); // 设置【MQTT成功连接】函数的另一种调用方式
MQTT_OnDisconnected(&mqttClient, mqttDisconnectedCb); // 设置【MQTT成功断开】函数的另一种调用方式
MQTT_OnPublished(&mqttClient, mqttPublishedCb); // 设置【MQTT成功发布】函数的另一种调用方式
MQTT_OnData(&mqttClient, mqttDataCb); // 设置【接收MQTT数据】函数的另一种调用方式
// 连接WIFI:SSID[..]、PASSWORD[..]、WIFI连接成功函数[wifiConnectCb]
//--------------------------------------------------------------------------
WIFI_Connect(sysCfg.sta_ssid, sysCfg.sta_pwd, wifiConnectCb);
INFO("\r\nSystem started ...\r\n");
}
//===================================================================================================================
mqtt_config.h
#ifndef __MQTT_CONFIG_H__
#define __MQTT_CONFIG_H__
typedef enum{
NO_TLS = 0, // 0: disable SSL/TLS, there must be no certificate verify between MQTT server and ESP8266
TLS_WITHOUT_AUTHENTICATION = 1, // 1: enable SSL/TLS, but there is no a certificate verify
ONE_WAY_ANTHENTICATION = 2, // 2: enable SSL/TLS, ESP8266 would verify the SSL server certificate at the same time
TWO_WAY_ANTHENTICATION = 3, // 3: enable SSL/TLS, ESP8266 would verify the SSL server certificate and SSL server would verify ESP8266 certificate
}TLS_LEVEL;
/*IMPORTANT: the following configuration maybe need modified*/
/***********************************************************************************************************************************************************************************************************************************************************/
#define CFG_HOLDER 0x66666663 // 持有人标识(只有更新此数值,系统参数才会更新) /* Change this value to load default configurations */
/*DEFAULT CONFIGURATIONS*/
// 注:【MQTT协议规定:连接服务端的每个客户端都必须有唯一的客户端标识符(ClientId)】。如果两相同ID的客户端不断重连,就会进入互踢死循环
//--------------------------------------------------------------------------------------------------------------------------------------
#define MQTT_HOST "iot_light_jx.mqtt.iot.gz.baidubce.com" // MQTT服务端域名/IP地址 // the IP address or domain name of your MQTT server or MQTT broker ,such as "mqtt.yourdomain.com"
#define MQTT_PORT 1883 // 网络连接端口号 // the listening port of your MQTT server or MQTT broker
#define MQTT_CLIENT_ID "ESP8266ID0x%x" // 官方例程中是"Device_ID" // 客户端标识符 // the ID of yourself, any string is OK,client would use this ID register itself to MQTT server
#define MQTT_USER "iot_light_jx/iot_light_esp8266_01_jx" // MQTT用户名 // your MQTT login name, if MQTT server allow anonymous login,any string is OK, otherwise, please input valid login name which you had registered
#define MQTT_PASS "WWYksu2nyuft+ycYYOw11MBo1fTQvz1eI8LpeC050nk=" // MQTT密码 // you MQTT login password, same as above
#define STA_SSID "JI-XIN-TEC" // WIFI名称 // your AP/router SSID to config your device networking
#define STA_PASS "JIXINDIANZI78963" // WIFI密码 // your AP/router password
#define STA_TYPE AUTH_WPA2_PSK
#define DEFAULT_SECURITY NO_TLS // 加密传输类型【默认不加密】 // very important: you must config DEFAULT_SECURITY for SSL/TLS
#define CA_CERT_FLASH_ADDRESS 0x77 // 【CA证书】烧录地址 // CA certificate address in flash to read, 0x77 means address 0x77000
#define CLIENT_CERT_FLASH_ADDRESS 0x78 // 【设备证书】烧录地址 // client certificate and private key address in flash to read, 0x78 means address 0x78000
/*********************************************************************************************************************************************************************************************************************************************************************************/
/*Please Keep the following configuration if you have no very deep understanding of ESP SSL/TLS*/
#define CFG_LOCATION 0x79 // 系统参数的起始扇区 /* Please don't change or if you know what you doing */
#define MQTT_BUF_SIZE 1024 // MQTT缓存大小
#define MQTT_KEEPALIVE 120 // 保持连接时长 /*second*/
#define MQTT_RECONNECT_TIMEOUT 5 // 重连超时时长 /*second*/
#define MQTT_SSL_ENABLE // SSL使能 //* Please don't change or if you know what you doing */
#define QUEUE_BUFFER_SIZE 2048 // 消息队列的缓存大小
//#define PROTOCOL_NAMEv31 // 使用MQTT协议【v31】版本 /*MQTT version 3.1 compatible with Mosquitto v0.15*/
#define PROTOCOL_NAMEv311 // 使用MQTT协议【v311】版本 /*MQTT version 3.11 compatible with https://eclipse.org/paho/clients/testing/*/
#endif // __MQTT_CONFIG_H__
mqtt_msg.h
#ifndef MQTT_MSG_H
#define MQTT_MSG_H
#include "mqtt_config.h"
#include "c_types.h"
#ifdef __cplusplus
extern "C" {
#endif
// MQTT协议:控制报文的类型
//-----------------------------------------------------------------------------
enum mqtt_message_type
{
MQTT_MSG_TYPE_CONNECT = 1, // 客户端请求连接服务端 【客->服】
MQTT_MSG_TYPE_CONNACK = 2, // 连接报文确认 【服->客】
MQTT_MSG_TYPE_PUBLISH = 3, // 发布消息
MQTT_MSG_TYPE_PUBACK = 4, // 发布消息确认
MQTT_MSG_TYPE_PUBREC = 5, //
MQTT_MSG_TYPE_PUBREL = 6, //
MQTT_MSG_TYPE_PUBCOMP = 7, //
MQTT_MSG_TYPE_SUBSCRIBE = 8, // 客户端订阅请求 【客->服】
MQTT_MSG_TYPE_SUBACK = 9, // 订阅请求报文确认 【服->客】
MQTT_MSG_TYPE_UNSUBSCRIBE = 10, // 客户端取消订阅请求 【客->服】
MQTT_MSG_TYPE_UNSUBACK = 11, // 取消订阅报文确认 【服->客】
MQTT_MSG_TYPE_PINGREQ = 12, // 心跳请求 【客->服】
MQTT_MSG_TYPE_PINGRESP = 13, // 心跳响应 【服->客】
MQTT_MSG_TYPE_DISCONNECT = 14 // 客户端断开连接 【客->服】
};
// MQTT控制报文[指针]、[长度]
//------------------------------------------------------------------------
typedef struct mqtt_message
{
uint8_t* data; // MQTT控制报文指针
uint16_t length; // MQTT控制报文长度(配置报文时,作为报文指针的偏移值)
} mqtt_message_t;
//------------------------------------------------------------------------
// MQTT报文
//------------------------------------------------------------------------
typedef struct mqtt_connection
{
mqtt_message_t message; // MQTT控制报文[指针]、[长度]
uint16_t message_id; // 报文标识符
uint8_t* buffer; // 出站报文缓存区指针 buffer = os_zalloc(1024)
uint16_t buffer_length; // 出站报文缓存区长度 1024
} mqtt_connection_t;
//------------------------------------------------------------------------
// MQTT【CONNECT】报文的连接参数
//---------------------------------------
typedef struct mqtt_connect_info
{
char* client_id; // 客户端标识符
char* username; // MQTT用户名
char* password; // MQTT密码
char* will_topic; // 遗嘱主题
char* will_message; // 遗嘱消息
int keepalive; // 保持连接时长
int will_qos; // 遗嘱消息质量
int will_retain; // 遗嘱保留
int clean_session; // 清除会话
} mqtt_connect_info_t;
//---------------------------------------
static inline int ICACHE_FLASH_ATTR mqtt_get_type(uint8_t* buffer) { return (buffer[0] & 0xf0) >> 4; }
static inline int ICACHE_FLASH_ATTR mqtt_get_dup(uint8_t* buffer) { return (buffer[0] & 0x08) >> 3; }
static inline int ICACHE_FLASH_ATTR mqtt_get_qos(uint8_t* buffer) { return (buffer[0] & 0x06) >> 1; }
static inline int ICACHE_FLASH_ATTR mqtt_get_retain(uint8_t* buffer) { return (buffer[0] & 0x01); }
void ICACHE_FLASH_ATTR mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length);
int ICACHE_FLASH_ATTR mqtt_get_total_length(uint8_t* buffer, uint16_t length);
const char* ICACHE_FLASH_ATTR mqtt_get_publish_topic(uint8_t* buffer, uint16_t* length);
const char* ICACHE_FLASH_ATTR mqtt_get_publish_data(uint8_t* buffer, uint16_t* length);
uint16_t ICACHE_FLASH_ATTR mqtt_get_id(uint8_t* buffer, uint16_t length);
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_info_t* info);
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_publish(mqtt_connection_t* connection, const char* topic, const char* data, int data_length, int qos, int retain, uint16_t* message_id);
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_puback(mqtt_connection_t* connection, uint16_t message_id);
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pubrec(mqtt_connection_t* connection, uint16_t message_id);
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pubrel(mqtt_connection_t* connection, uint16_t message_id);
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pubcomp(mqtt_connection_t* connection, uint16_t message_id);
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_subscribe(mqtt_connection_t* connection, const char* topic, int qos, uint16_t* message_id);
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_unsubscribe(mqtt_connection_t* connection, const char* topic, uint16_t* message_id);
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pingreq(mqtt_connection_t* connection);
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pingresp(mqtt_connection_t* connection);
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_disconnect(mqtt_connection_t* connection);
#ifdef __cplusplus
}
#endif
#endif /* MQTT_MSG_H */
mqtt_msg.c
#include "user_interface.h"
#include <string.h>
#include "mqtt_msg.h"
#include "user_config.h"
#define MQTT_MAX_FIXED_HEADER_SIZE 3
#include "osapi.h"
// 【CONNECT】控制报文的Flag标志位
//---------------------------------------------------------------
enum mqtt_connect_flag
{
MQTT_CONNECT_FLAG_USERNAME = 1 << 7, // Username
MQTT_CONNECT_FLAG_PASSWORD = 1 << 6, // Password
MQTT_CONNECT_FLAG_WILL_RETAIN = 1 << 5, // Will Retain
MQTT_CONNECT_FLAG_WILL = 1 << 2, // Will Flag
MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1 // Clean Session
};
// 【CONNECT】控制报文的可变头长度 == 12Byte(v31)/10Byte(v311)
//-----------------------------------------------------------------
struct __attribute((__packed__)) mqtt_connect_variable_header
{
uint8_t lengthMsb; // 协议名[MQlsdp/MQTT]长度
uint8_t lengthLsb; // 6/4
#if defined(PROTOCOL_NAMEv31)
uint8_t magic[6]; // MQIsdp
#elif defined(PROTOCOL_NAMEv311)
uint8_t magic[4]; // "MQTT"
#else
#error "Please define protocol name"
#endif
uint8_t version; // 协议版本
uint8_t flags; // 连接标志
uint8_t keepaliveMsb; // keepalive
uint8_t keepaliveLsb;
};
// 将[参数字符串]字段添加到报文缓存区,报文长度+=[参数字符串]所占长度 【注:字符串前需添加两字节的前缀】
//========================================================================================================
static int ICACHE_FLASH_ATTR append_string(mqtt_connection_t* connection, const char* string, int len)
{
if(connection->message.length + len + 2 > connection->buffer_length) // 判断报文是否过长
return -1;
// 设置字符串前的两字节前缀,表示此字符串的长度
//--------------------------------------------------------------------------
connection->buffer[connection->message.length++] = len >> 8; // 高八位
connection->buffer[connection->message.length++] = len & 0xff; // 低八位
memcpy(connection->buffer+connection->message.length, string, len); // 将[参数字符串]添加到报文缓存区
connection->message.length += len; // 报文长度 += [参数字符串]所占长度
return len + 2; // 返回[参数字符串]在MQTT控制报文中所占长度
}
//========================================================================================================
// 添加【报文标识符】字段
//========================================================================================================
static uint16_t ICACHE_FLASH_ATTR append_message_id(mqtt_connection_t* connection, uint16_t message_id)
{
// If message_id is zero then we should assign one, otherwise
// we'll use the one supplied by the caller
while(message_id == 0)
message_id = ++connection->message_id; // 【报文标识符】++
if(connection->message.length + 2 > connection->buffer_length) // 报文过长
return 0;
connection->buffer[connection->message.length++] = message_id >> 8; // 添加【报文标识符】字段
connection->buffer[connection->message.length++] = message_id & 0xff; // 2字节
return message_id; // 返回【报文标识符】
}
//========================================================================================================
// 设置报文长度 = 3(暂时设为【固定报头】长度(3),之后添加【可变报头】、【有效载荷】)
//====================================================================================
static int ICACHE_FLASH_ATTR init_message(mqtt_connection_t* connection)
{
connection->message.length = MQTT_MAX_FIXED_HEADER_SIZE; // 报文长度 = 3(固定报头)
return MQTT_MAX_FIXED_HEADER_SIZE;
}
//====================================================================================
// 报文无效
//====================================================================================
static mqtt_message_t* ICACHE_FLASH_ATTR fail_message(mqtt_connection_t* connection)
{
connection->message.data = connection->buffer; // 报文指针指向报文缓存区首地址
connection->message.length = 0; // 报文长度 = 0
return &connection->message;
}
//====================================================================================
// 设置【MQTT控制报文】的固定报头
//------------------------------------------------------------------------------------------------------------------------------
// 注: 在【PUBLISH】报文中,报文类型标志位由重复分发标志[dup][Bit3]、服务质量[qos][Bit2~1]、报文保留标志[retain][Bit1=0]组成。
// 其余类型报文的报文类型标志位是固定的。
//==============================================================================================================================
static mqtt_message_t* ICACHE_FLASH_ATTR fini_message(mqtt_connection_t* connection, int type, int dup, int qos, int retain)
{
int remaining_length = connection->message.length - MQTT_MAX_FIXED_HEADER_SIZE; // 获取【可变报头】+【有效载荷】长度
// 设置固定报头(固定头中的剩余长度使用变长度编码方案,详情请参考MQTT协议手册)
//----------------------------------------------------------------------------------------------------------
if(remaining_length > 127) // 剩余长度占2字节
{
connection->buffer[0] = ((type&0x0f)<<4)|((dup&1)<<3)|((qos&3)<<1)|(retain&1); // 固定头的首字节赋值
connection->buffer[1] = 0x80 | (remaining_length % 128); // 剩余长度的第一个字节
connection->buffer[2] = remaining_length / 128; // 剩余长度的第二个字节
connection->message.length = remaining_length + 3; // 报文的整个长度
connection->message.data = connection->buffer; // MQTT报文指针 -> 出站报文缓存区首地址
}
else //if(remaining_length<= 127) // 剩余长度占1字节
{
// buffer[0] = 无
connection->buffer[1] = ((type&0x0f)<<4)|((dup&1)<<3)|((qos&3)<<1)|(retain&1); // 固定头的首字节赋值
connection->buffer[2] = remaining_length; // 固定头中的[剩余长度](可变报头+负载数据)
connection->message.length = remaining_length + 2; // 报文的整个长度
connection->message.data = connection->buffer + 1; // MQTT报文指针 -> 出站报文缓存区首地址+1
}
return &connection->message; // 返回报文首地址【报文数据、报文整体长度】
}
//==============================================================================================================================
// 初始化MQTT报文缓存区
// mqtt_msg_init(&client->mqtt_state.mqtt_connection, client->mqtt_state.out_buffer, client->mqtt_state.out_buffer_length);
//==========================================================================================================================
void ICACHE_FLASH_ATTR mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length)
{
memset(connection, 0, sizeof(mqtt_connection_t)); // mqttClient->mqtt_state.mqtt_connection = 0
connection->buffer = buffer; // buffer = (uint8_t *)os_zalloc(1024) 【缓存区指针】
connection->buffer_length = buffer_length; // buffer_length = 1024 【缓存区长度】
}
//==========================================================================================================================
// 计算接收到的网络数据中,报文的实际长度(通过【剩余长度】得到)
//===============================================================================================
int ICACHE_FLASH_ATTR mqtt_get_total_length(uint8_t* buffer, uint16_t length)
{
int i;
int totlen = 0; // 报文总长度(字节数)
// 计算剩余长度的值
//--------------------------------------------------------------------
for(i = 1; i <length; ++i) // 解析【剩余长度】字段(从buffer[1]开始)
{
totlen += (buffer[i]&0x7f)<<(7*(i-1));
// 如果剩余长度的当前字节的值<0x80,则表示此字节是最后的字节
//-----------------------------------------------------------------------
if((buffer[i]&0x80) == 0) // 当前字节的值<0x80
{
++i; // i == 固定报头长度 == 报文类型(1字节) + 剩余长度字段
break; // 跳出循环
}
}
totlen += i; // 报文总长度 = 固定报头长度 + 剩余长度的值(【可变报头】+【有效载荷】的长度)
return totlen; // 返回报文总长度
}
//===============================================================================================
// 获取【PUBLISH】报文中的主题名(指针)、主题名长度
//=========================================================================================
const char* ICACHE_FLASH_ATTR mqtt_get_publish_topic(uint8_t* buffer, uint16_t* length)
{
int i;
int totlen = 0;
int topiclen;
// 计算剩余长度的值
//--------------------------------------------------------------------
for(i = 1; i<*length; ++i) // 解析【剩余长度】字段(从buffer[1]开始)
{
totlen += (buffer[i]&0x7f)<<(7*(i-1));
// 如果剩余长度的当前字节的值<0x80,则表示此字节是最后的字节
//-----------------------------------------------------------------
if((buffer[i] & 0x80) == 0) // 当前字节的值<0x80
{
++i; // i == 固定报头长度 == 报文类型(1字节) + 剩余长度字段
break; // 跳出循环
}
}
totlen += i; // 报文总长度(这句可删,没有用)
if(i + 2 >= *length) // 如果没有载荷,则返回NULL
return NULL;
topiclen = buffer[i++] << 8; // 获取主题名长度(2字节)
topiclen |= buffer[i++]; // 前缀
if(i + topiclen > *length) // 报文出错(没有主题名),返回NULL
return NULL;
*length = topiclen; // 返回主题名长度
return (const char*)(buffer+i); // 返回主题名(指针)
}
//=========================================================================================
// 获取【PUBLISH】报文的载荷(指针)、载荷长度
//========================================================================================================
const char* ICACHE_FLASH_ATTR mqtt_get_publish_data(uint8_t* buffer, uint16_t* length)
{
int i;
int totlen = 0;
int topiclen;
int blength = *length;
*length = 0;
// 计算剩余长度的值
//--------------------------------------------------------------------
for(i = 1; i<blength; ++i) // 解析【剩余长度】字段(从buffer[1]开始)
{
totlen += (buffer[i]&0x7f)<<(7*(i-1));
// 如果剩余长度的当前字节的值<0x80,则表示此字节是最后的字节
//-----------------------------------------------------------------
if((buffer[i] & 0x80) == 0)
{
++i; // i == 固定报头长度 == 报文类型(1字节) + 剩余长度字段
break; // 跳出循环
}
}
totlen += i; // 报文总长度 = 剩余长度表示的值 + 固定头所占字节
if(i + 2 >= blength) // 如果没有载荷,则返回NULL
return NULL;
topiclen = buffer[i++] << 8; // 获取主题名长度(2字节)
topiclen |= buffer[i++]; // 前缀
if(i+topiclen >= blength) // 报文出错(没有主题名),返回NULL
return NULL;
i += topiclen; // i = 【固定报头】+【主题名(包括前缀)】
// Qos > 0
//----------------------------------------------------------------------------
if(mqtt_get_qos(buffer)>0) // 当Qos>0时,【主题名】字段后面是【报文标识符】
{
if(i + 2 >= blength) // 报文错误(无载荷)
return NULL;
i += 2; // i = 【固定报头】+【可变报头】
}
if(totlen < i) // 报文错误,返回NULL
return NULL;
if(totlen <= blength) // 报文总长度 <= 网络接收数据长度
*length = totlen - i; // 【有效载荷】长度 = 报文长度- (【固定报头】长度+【可变报头】长度)
else // 报文总长度 > 网络接收数据长度【丢失数据/未接收完毕】
*length = blength - i; // 有效载荷长度 = 网络接收数据长度 - (【固定报头】长度+【可变报头】长度)
return (const char*)(buffer + i); // 返回有效载荷首地址
}
//========================================================================================================
// 获取报文中的【报文标识符】
//=========================================================================================
uint16_t ICACHE_FLASH_ATTR mqtt_get_id(uint8_t* buffer, uint16_t length)
{
if(length < 1) // 报文无效
return 0;
// 判断目标报文的类型
//----------------------------
switch(mqtt_get_type(buffer))
{
// 【PUBLISH】报文
//…………………………………………………………………………………………………………
case MQTT_MSG_TYPE_PUBLISH:
{
int i;
int topiclen;
for(i = 1; i < length; ++i) // 查找【有效载荷】首地址
{
if((buffer[i] & 0x80) == 0) // 判断当前字节是否为【剩余长度】的尾字节
{
++i; // 指向有效载荷首地址
break;
}
}
if(i + 2 >= length) // 报文错误(无主题名)
return 0;
topiclen = buffer[i++] << 8; // 主题名长度
topiclen |= buffer[i++];
if(i + topiclen >= length) // 报文错误
return 0;
i += topiclen; // 指向【报文标识符】/【有效载荷】
// Qos > 0
//----------------------------------------------------------------------------
if(mqtt_get_qos(buffer) > 0) // 当Qos>0时,【主题名】字段后面是【报文标识符】
{
if(i + 2 >= length) // 报文错误(无载荷)
return 0;
//i += 2; // &buffer[i]指向【报文标识符】
}
else // Qos == 0
{
return 0; // 当Qos==0时,没有【报文标识符】
}
return (buffer[i] << 8) | buffer[i + 1]; //获取【报文标识符】
}
//…………………………………………………………………………………………………………
// 以下类型的报文,【报文标识符】固定是控制报文的【第2、第3字节】
//--------------------------------------------------------------------------------
case MQTT_MSG_TYPE_PUBACK: // 【PUBACK】报文
case MQTT_MSG_TYPE_PUBREC: // 【PUBREC】报文
case MQTT_MSG_TYPE_PUBREL: // 【PUBREL】报文
case MQTT_MSG_TYPE_PUBCOMP: // 【PUBCOMP】报文
case MQTT_MSG_TYPE_SUBACK: // 【SUBACK】报文
case MQTT_MSG_TYPE_UNSUBACK: // 【UNSUBACK】报文
case MQTT_MSG_TYPE_SUBSCRIBE: // 【SUBSCRIBE】报文
{
// This requires the remaining length to be encoded in 1 byte,
// which it should be.
if(length >= 4 && (buffer[1] & 0x80) == 0) // 判断【固定头、可变头】是否正确
return (buffer[2] << 8) | buffer[3]; // 返回【报文标识符】
else
return 0;
}
//--------------------------------------------------------------------------------
// 【CONNECT】【CONNACK】【UNSUBSCRIBE】【PINGREQ】【PINGRESP】【DISCONNECT】
//---------------------------------------------------------------------------
default:
return 0;
}
}
//=========================================================================================
// 配置【CONNECT】控制报文
// mqtt_msg_connect(&client->mqtt_state.mqtt_connection, client->mqtt_state.connect_info)
//================================================================================================================================
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_info_t* info)
{
struct mqtt_connect_variable_header* variable_header; // 【CONNECT】报文的【可变报头】指针
init_message(connection); // 设置报文长度 = 3(暂时设为【固定报头】长度(3),之后添加【可变报头】、【有效载荷】)
// 判断消息长度是否超过缓存区长度 // 【注:[message.length]是指TCP传输的整个MQTT报文长度】
//------------------------------------------------------------------------------------------------------------
if(connection->message.length + sizeof(*variable_header) > connection->buffer_length) // 判断MQTT报文长度
return fail_message(connection);
// 跳过了对【固定报头】的赋值,只为【固定报头】保留了3个字节的空间。 注:剩余长度最多占两字节。
// 获取【可变报头】指针,并更新报文长度
//----------------------------------------------------------------------------------------------------------------------------
variable_header = (void*)(connection->buffer + connection->message.length); // 【可变报头】指针 = 报文缓存区指针+3(固定报头)
connection->message.length += sizeof(*variable_header); // 报文长度 == 固定报头 + 可变报头
// 协议名、协议级别赋值
//-----------------------------------------------
variable_header->lengthMsb = 0; // lengthMsb
#if defined(PROTOCOL_NAMEv31)
variable_header->lengthLsb = 6; // lengthLsb
memcpy(variable_header->magic, "MQIsdp", 6);
variable_header->version = 3; // v31版本 = 3
#elif defined(PROTOCOL_NAMEv311)
variable_header->lengthLsb = 4; // lengthLsb
memcpy(variable_header->magic, "MQTT", 4);
variable_header->version = 4; // v311版本 = 4
#else
#error "Please define protocol name"
#endif
//----------------------------------------------------------------------
variable_header->flags = 0; // 连接标志字节 = 0(暂时清0,待会赋值)
// 保持连接时长赋值
//----------------------------------------------------------------------
variable_header->keepaliveMsb = info->keepalive >> 8; // 赋值高字节
variable_header->keepaliveLsb = info->keepalive & 0xff; // 赋值低字节
// clean_session = 1:客户端和服务端必须丢弃之前的任何会话并开始一个新的会话
//----------------------------------------------------------------------------
if(info->clean_session)
variable_header->flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION; //clean_session=1
// 判断是否存在[client_id],存在则设置[client_id]字段
//----------------------------------------------------------------------------
if(info->client_id != NULL && info->client_id[0] != '\0')
{
// 将[client_id]字段添加到报文缓存区,报文长度+=[client_id]所占长度
//--------------------------------------------------------------------------
if(append_string(connection, info->client_id, strlen(info->client_id)) < 0)
return fail_message(connection);
}
else
return fail_message(connection); // 报文出错
// 判断是否存在[will_topic]
//---------------------------------------------------------------------------------
if(info->will_topic != NULL && info->will_topic[0] != '\0')
{
// 将[will_topic]字段添加到报文缓存区,报文长度+=[will_topic]所占长度
//--------------------------------------------------------------------------
if(append_string(connection, info->will_topic,strlen(info->will_topic))<0)
return fail_message(connection);
// 将[will_message]字段添加到报文缓存区,报文长度+=[will_message]所占长度
//----------------------------------------------------------------------------
if(append_string(connection,info->will_message,strlen(info->will_message))<0)
return fail_message(connection);
// 设置【CONNECT】报文中的Will标志位:[Will Flag]、[Will QoS]、[Will Retain]
//--------------------------------------------------------------------------
variable_header->flags |= MQTT_CONNECT_FLAG_WILL; // 遗嘱标志位 = 1
if(info->will_retain)
variable_header->flags |= MQTT_CONNECT_FLAG_WILL_RETAIN;// WILL_RETAIN = 1
variable_header->flags |= (info->will_qos & 3) << 3; // will质量赋值
}
// 判断是否存在[username]
//----------------------------------------------------------------------------
if(info->username != NULL && info->username[0] != '\0')
{
// 将[username]字段添加到报文缓存区,报文长度+=[username]所占长度
//--------------------------------------------------------------------------
if(append_string(connection, info->username, strlen(info->username)) < 0)
return fail_message(connection);
// 设置【CONNECT】报文中的[username]标志位
//--------------------------------------------------------------------------
variable_header->flags |= MQTT_CONNECT_FLAG_USERNAME; // username = 1
}
// 判断是否存在[password]
//----------------------------------------------------------------------------
if(info->password != NULL && info->password[0] != '\0')
{
// 将[password]字段添加到报文缓存区,报文长度+=[password]所占长度
//--------------------------------------------------------------------------
if(append_string(connection, info->password, strlen(info->password)) < 0)
return fail_message(connection);
// 设置【CONNECT】报文中的[password]标志位
//--------------------------------------------------------------------------
variable_header->flags |= MQTT_CONNECT_FLAG_PASSWORD; // password = 1
}
// 设置【CONNECT】报文固定头:类型[Bit7~4=1]、[Bit3=0]、[Bit2~1=0]、[Bit1=0]
//----------------------------------------------------------------------------
return fini_message(connection, MQTT_MSG_TYPE_CONNECT, 0, 0, 0);
}
//================================================================================================================================
// 配置【PUBLISH】报文,并获取【PUBLISH】报文[指针]、[长度]
// 【参数2:主题名 / 参数3:发布消息的有效载荷 / 参数4:有效载荷长度 / 参数5:发布Qos / 参数6:Retain / 参数7:报文标识符指针】
//===================================================================================================================================================
mqtt_message_t* ICACHE_FLASH_ATTR
mqtt_msg_publish(mqtt_connection_t* connection, const char* topic, const char* data, int data_length, int qos, int retain, uint16_t* message_id)
{
init_message(connection); // 设置报文长度 = 3
if(topic == NULL || topic[0] == '\0') // 没有"topic"则错误
return fail_message(connection);
if(append_string(connection, topic, strlen(topic)) < 0) // 添加【主题】字段
return fail_message(connection);
if(qos > 0) // 当【QoS>0】,需要【报文标识符】
{
if((*message_id = append_message_id(connection, 0)) == 0) // 添加【报文标识符】字段
return fail_message(connection);
}
else // if(qos = 0) // 当【QoS=0】,不需要【报文标识符】
*message_id = 0;
// 判断报文长度是否大于缓存区
//----------------------------------------------------------------------------
if(connection->message.length + data_length > connection->buffer_length)
return fail_message(connection);
memcpy(connection->buffer + connection->message.length, data, data_length); // 添加【有效载荷】字段
connection->message.length += data_length; // 设置报文长度
// 设置【PUBLISH】报文固定头:类型[Bit7~4=1]、[Bit3=0]、[Bit2~1=0]、[Bit1=0]
//----------------------------------------------------------------------------
return fini_message(connection, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain);
}
//===================================================================================================================================================
// 配置【PUBACK】报文
//=====================================================================================================
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_puback(mqtt_connection_t* connection, uint16_t message_id)
{
init_message(connection); // 设置报文长度 = 3
if(append_message_id(connection, message_id) == 0) // 添加【报文标识符】字段
return fail_message(connection);
return fini_message(connection, MQTT_MSG_TYPE_PUBACK, 0, 0, 0); // 配置固定报头
}
//=====================================================================================================
// 配置【PUBREC】报文
//=====================================================================================================
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pubrec(mqtt_connection_t* connection, uint16_t message_id)
{
init_message(connection); // 设置报文长度 = 3
if(append_message_id(connection, message_id) == 0) // 添加【报文标识符】字段
return fail_message(connection);
return fini_message(connection, MQTT_MSG_TYPE_PUBREC, 0, 0, 0); // 配置固定报头
}
//=====================================================================================================
// 配置【PUBREL】报文
//=====================================================================================================
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pubrel(mqtt_connection_t* connection, uint16_t message_id)
{
init_message(connection); // 设置报文长度 = 3
if(append_message_id(connection, message_id) == 0) // 添加【报文标识符】字段
return fail_message(connection);
return fini_message(connection, MQTT_MSG_TYPE_PUBREL, 0, 1, 0); // 配置固定报头
}
//=====================================================================================================
// 配置【PUBCOMP】报文
//=====================================================================================================
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pubcomp(mqtt_connection_t* connection, uint16_t message_id)
{
init_message(connection); // 设置报文长度 = 3
if(append_message_id(connection, message_id) == 0) // 添加【报文标识符】字段
return fail_message(connection);
return fini_message(connection, MQTT_MSG_TYPE_PUBCOMP, 0, 0, 0); // 配置固定报头
}
//=====================================================================================================
// 配置【SUBSCRIBE】报文:【参数2:订阅主题 / 参数3:订阅质量 / 参数4:报文标识符】
//=======================================================================================================================================
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_subscribe(mqtt_connection_t* connection, const char* topic, int qos, uint16_t* message_id)
{
init_message(connection); // 报文长度 = 3(固定头)
if(topic == NULL || topic[0] == '\0') // 主题无效
return fail_message(connection);
if((*message_id = append_message_id(connection, 0)) == 0) // 添加[报文标识符]
return fail_message(connection);
if(append_string(connection, topic, strlen(topic)) < 0) // 添加[主题]
return fail_message(connection);
if(connection->message.length + 1 > connection->buffer_length)// 判断报文长度
return fail_message(connection);
connection->buffer[connection->message.length++] = qos; // 设置消息质量QoS
// 设置【SUBSCRIBE】报文的固定报头
//-----------------------------------------------------------------
return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0);
}
//=======================================================================================================================================
// 配置【UNSUBSCRIBE】报文:【参数2:取消订阅主题 / 参数3:报文标识符】
//===============================================================================================================================
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_unsubscribe(mqtt_connection_t* connection, const char* topic, uint16_t* message_id)
{
init_message(connection); // 报文长度 = 3(固定头)
if(topic == NULL || topic[0] == '\0') // 主题无效
return fail_message(connection);
if((*message_id = append_message_id(connection, 0)) == 0) // 添加[报文标识符]
return fail_message(connection);
if(append_string(connection, topic, strlen(topic)) < 0) // 添加[主题]
return fail_message(connection);
// 设置【UNSUBSCRIBE】报文的固定报头
//-----------------------------------------------------------------
return fini_message(connection, MQTT_MSG_TYPE_UNSUBSCRIBE, 0, 1, 0);
}
//===============================================================================================================================
// 配置【PINGREQ】报文
//=================================================================================
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pingreq(mqtt_connection_t* connection)
{
init_message(connection);
return fini_message(connection, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0);
}
//=================================================================================
// 配置【PINGRESP】报文
//=================================================================================
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pingresp(mqtt_connection_t* connection)
{
init_message(connection);
return fini_message(connection, MQTT_MSG_TYPE_PINGRESP, 0, 0, 0);
}
//=================================================================================
// 配置【DISCONNECT】报文
//====================================================================================
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_disconnect(mqtt_connection_t* connection)
{
init_message(connection);
return fini_message(connection, MQTT_MSG_TYPE_DISCONNECT, 0, 0, 0);
}
//====================================================================================