说明:MQTT客户端源码种类比较多,可以根据具体需求进行选择,这里采用linux C 编程方式和HTTP方式。

Ubuntu客户端源码

地址:https://docs.emqx.io/en/broker/latest/development/resource.html
image.png
选择该源码进行下载,自行解压到linux目录,解压后目录文件如下:

image.png
直接进行make编译

image.png
编译过程提示类型不匹配,需要将include/libemqtt.h 117行修改返回值类型
image.png
修改完成后继续make,即可编译成功。
image.png

代码编写流程

  1. //初始化mqtt,需要提供一个mqtt_broker_handle_t 链接参数结构体
  2. void mqtt_init(mqtt_broker_handle_t *broker,const char* clientid);
  3. //初始化认证信息,用户名,密码
  4. void mqtt_init_auth(mqtt_broker_handle_t* broker, const char* username, const char* password);
  5. //初始化socket,内部设置一些参数和方法给broker,主机IP,端口
  6. int init_socket(mqtt_broker_handle_t* broker, const char* hostname, short port);
  7. //连接服务器,将borker对象传进去
  8. int mqtt_connect(mqtt_broker_handle_t* broker);
  9. //接收数据,参数:超时时间,返回值:接收到的数据个数
  10. int read_packet(int timeout)
  11. //发布,brocker对象,topic地址,信息,0
  12. int mqtt_publish(mqtt_broker_handle_t* broker, const char* topic, const char* msg, uint8_t retain)
  13. //订阅函数
  14. int mqtt_subscribe(mqtt_broker_handle_t* broker, const char* topic, uint16_t* message_id)

发布服务器代码

  1. #include <libemqtt.h>
  2. #include <stdio.h>
  3. #include <string.h>
  4. #include <stdlib.h>
  5. #include <unistd.h>
  6. #include <arpa/inet.h>
  7. #include <linux/tcp.h>
  8. #include <signal.h>
  9. #define MAXBUF 1024
  10. int socket_id;
  11. int keepalive = 30;
  12. uint8_t read_buf[MAXBUF];
  13. int send_packet(void* socket_info, const void* buf, unsigned int count)
  14. {
  15. int fd = *((int*)socket_info);
  16. return send(fd, buf, count, 0);
  17. }
  18. int init_socket(mqtt_broker_handle_t* broker, const char* hostname, short port, int keepalive)
  19. {
  20. int flag = 1;
  21. // Create the socket
  22. if((socket_id = socket(PF_INET, SOCK_STREAM, 0)) < 0)
  23. return -1;
  24. // Disable Nagle Algorithm
  25. if (setsockopt(socket_id, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag)) < 0)
  26. return -2;
  27. struct sockaddr_in socket_address;
  28. // Create the stuff we need to connect
  29. socket_address.sin_family = AF_INET;
  30. socket_address.sin_port = htons(port);
  31. socket_address.sin_addr.s_addr = inet_addr(hostname);
  32. // Connect the socket
  33. if((connect(socket_id, (struct sockaddr*)&socket_address, sizeof(socket_address))) < 0)
  34. return -1;
  35. // MQTT stuffs
  36. mqtt_set_alive(broker, keepalive);
  37. broker->socket_info = (void*)&socket_id;
  38. broker->send = send_packet;
  39. return 0;
  40. }
  41. int read_packet(int timeout)
  42. {
  43. if(timeout > 0)
  44. {
  45. fd_set readfds;
  46. struct timeval tmv;
  47. // Initialize the file descriptor set
  48. FD_ZERO (&readfds);
  49. FD_SET (socket_id, &readfds);
  50. // Initialize the timeout data structure
  51. tmv.tv_sec = timeout;
  52. tmv.tv_usec = 0;
  53. // select returns 0 if timeout, 1 if input available, -1 if error
  54. if(select(1, &readfds, NULL, NULL, &tmv))
  55. return -2;
  56. }
  57. //定义临时变量
  58. int total_bytes = 0, bytes_rcvd, packet_length;
  59. //清理buf
  60. memset(read_buf, 0, sizeof(read_buf));
  61. if((bytes_rcvd = recv(socket_id, (read_buf+total_bytes), MAXBUF, 0)) <= 0) {
  62. return -1;
  63. }
  64. total_bytes += bytes_rcvd; // 赋值全部长度数
  65. if (total_bytes < 2) //少于2则退出
  66. return -1;
  67. uint16_t rem_len = mqtt_parse_rem_len(read_buf); //求剩余长度
  68. uint8_t rem_len_bytes = mqtt_num_rem_len_bytes(read_buf); //求剩余长度占用字节数
  69. packet_length = rem_len + rem_len_bytes + 1; //剩余长度 + 剩余长度本身 + 固定头
  70. while(total_bytes < packet_length) // 读包,如果读到的内容和整体长度不一致则继续接收
  71. {
  72. if((bytes_rcvd = recv(socket_id, (read_buf+total_bytes), MAXBUF, 0)) <= 0)
  73. return -1;
  74. total_bytes += bytes_rcvd; // 重新赋值总长度
  75. }
  76. //返回读到的数据长度
  77. return packet_length;
  78. }
  79. int connect_mqtt_check(int packet_length,char *buf)
  80. {
  81. //连接过程有返回数据,接收判断书否成功
  82. if(packet_length < 0)
  83. {
  84. fprintf(stderr, "Error(%d) on read packet!\n", packet_length);
  85. return -1;
  86. }
  87. //判断是否连接MQTT的ACK
  88. if(MQTTParseMessageType(buf) != MQTT_MSG_CONNACK)
  89. {
  90. fprintf(stderr, "CONNACK expected!\n");
  91. return -2;
  92. }
  93. //判断是否连接MQTT成功
  94. if(buf[3] != 0x00)
  95. {
  96. fprintf(stderr, "CONNACK failed!\n");
  97. return -2;
  98. }
  99. return 0;
  100. }
  101. int main(int argc,const char *argv[])
  102. {
  103. mqtt_broker_handle_t mqtt_info;
  104. //初始化mqtt数据
  105. mqtt_init(&mqtt_info,argv[1]);
  106. //填充验证信息
  107. mqtt_init_auth(&mqtt_info,"user","admin");
  108. //创建socket
  109. init_socket(&mqtt_info,"127.0.0.1",1883,keepalive);
  110. if(mqtt_connect(&mqtt_info) == -1)
  111. {
  112. printf("mqtt_connect is fail\n");
  113. }
  114. int packet_length;
  115. //连接有返回数据,接收一下,判断是否成功
  116. packet_length = read_packet(1);
  117. if(connect_mqtt_check(packet_length,read_buf))
  118. {
  119. printf("connect mqtt server is fail \n");
  120. return -1;
  121. }
  122. char top[30];
  123. char date[20];
  124. int i;
  125. for(i=0;i<10;i++)
  126. {
  127. sprintf(top,"public/test%d/topic",i);
  128. sprintf(date,"data num = %d",i);
  129. mqtt_publish(&mqtt_info,top , date, 0);
  130. usleep(500);
  131. }
  132. // mqtt_disconnect(&mqtt_info);
  133. //close_socket(&mqtt_info);
  134. return 0;
  135. }

STM32 基于LwIP - MQTT

以STM32F407x为例,首先进行网卡的基本配置:

image.png

image.pngimage.png
image.png
使能FREERTOS,是为了更方便的进行使用。

代码部分:

  1. //注意,提前修改MEMP_NUM_SYS_TIMEOUT 宏为128
  2. void StartDefaultTask(void const * argument)
  3. {
  4. /* init code for LWIP */
  5. MX_LWIP_Init();
  6. /* USER CODE BEGIN StartDefaultTask */
  7. ip_addr_t ppp;
  8. uint8_t user_data;
  9. struct mqtt_connect_client_info_t cloent_info;
  10. ppp.addr = inet_addr("192.168.1.178");
  11. mqtt_client_t *client_my = mqtt_client_new();
  12. cloent_info.client_user = "admin";
  13. cloent_info.client_pass = "admin";
  14. cloent_info.keep_alive = 30;
  15. cloent_info.client_id = "STM32";
  16. mqtt_client_connect(client_my,&ppp,LWIP_IANA_PORT_MQTT,NULL,&user_data,&cloent_info);
  17. uint8_t temp[] = "hello";
  18. /* Infinite loop */
  19. for(;;)
  20. {
  21. //发布
  22. mqtt_publish(client_my,"/sys/temp", (void *)&temp, sizeof(temp), 0,0,
  23. NULL, client_my);
  24. //5s发布一次数据
  25. osDelay(5000);
  26. }
  27. /* USER CODE END StartDefaultTask */
  28. }

image.png

ESP8266_RTOS_SDK

环境:ubuntu20
开发工具:IDF Python2 pip

1.环境搭建

SDK下载地:https://www.espressif.com/zh-hans/support/download/sdks-demos?keys=&field_type_tid%5B%5D=14
编译工具链下载:https://dl.espressif.com/dl/xtensa-lx106-elf-gcc8_4_0-esp-2020r3-linux-amd64.tar.gz
云盘连接:链接:https://pan.baidu.com/s/1sOaOx2mMgfvso7N80slfIQ
提取码:xkxg
image.png

①创建一个目录(mkdir exp8266)
image.png

②安装python2及pip

  1. sudo apt update #更新库
  2. sudo apt install python2 #安装python2
  3. sudo apt install curl #安装curl工具,方便下载其他内容
  4. sudo update-alternatives --install /usr/bin/python python /usr/bin/python2 1 #配置python2为默认python工具
  5. curl https://bootstrap.pypa.io/pip/2.7/get-pip.py --output get-pip.py #下载pip2包
  6. sudo python get-pip.py #安装pip

③拷贝及解压SDK、工具链

image.png

#进入ESP8266目录,执行安装依赖包
python -m pip install -r requirements.txt
git submodule add https://github.com/espressif/esp-mqtt.git components/espmqtt  #获取mqtt子模块

image.png

④添加环境变量

#终端输入如下命令,并将相应代码填入其中
#pwd获取当前目录位置
mqtt@mqtt-virtual-machine:~/esp8266/ESP8266_RTOS_SDK$ pwd
/home/mqtt/esp8266/ESP8266_RTOS_SDK  #这里应该与自己的路径对应,复制下来

mqtt@mqtt-virtual-machine:~/esp8266/ESP8266_RTOS_SDK$ gedit ~/.bashrc   #打开后在最后一行填写

image.png

#进入到编译工具链根目录,获取路径复制
mqtt@mqtt-virtual-machine:~/esp8266/xtensa-lx106-elf/bin$ pwd

image.png
image.png
添加完成后保存退出,关闭终端重新打开,输入xt 按两下tab键看到如下画面说明成功:
image.png

⑤编译测试

#进入到SDK中
cd examples/get-started/hello_world/   #进入helloworld工程当中

make menuconfig #配置工程

image.png
image.png
image.png
image.png
image.png

⑥配置工程

image.png

image.png

⑦编译工程

make all #编译工程,等待结束
#连接开发板到虚拟机,检查ls /dev/ttyUSB0 是否存在
make flash #烧写代码,如果遇到权限问题,sudo chmod 777 /dev/ttyUSB0 即可
make monitor  #打开串口监听数据
#结束监听 Ctrl + ] 即可

image.png

2.手册获取

SDK手册:https://docs.espressif.com/projects/esp8266-rtos-sdk/en/latest/get-started/index.html