一、安装配置

1、依赖配置

编辑项目的 pom.xml 文件,添加如下依赖:

  1. <dependency>
  2. <groupId>org.springframework.integration</groupId>
  3. <artifactId>spring-integration-stream</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.integration</groupId>
  7. <artifactId>spring-integration-mqtt</artifactId>
  8. </dependency>

2、配置 MQTT 服务器基本信息

编辑项目的 application.properties 文件,增加 MQTT 服务器配置信息:

  1. server.port=10001
  2. # 用户名(这里为空)
  3. mqtt.username=
  4. # 密码(这里为空)
  5. mqtt.password=
  6. # 推送信息的连接地址,如果有多个,用逗号隔开,如:tcp://127.0.0.1:1883,tcp://x.x.x.x:1883
  7. mqtt.host=tcp://x.x.x.x:1883
  8. # 生产者连接服务器默认客户端ID
  9. mqtt.sender.clientId=mqttProducer
  10. # 默认的推送主题,实际可在调用接口时指定
  11. mqtt.sender.defaultTopic=hangge
  12. # 消费者连接服务器默认客户端ID(这里使用随机数)
  13. mqtt.receiver.clientId=${random.value}
  14. # 默认的接收主题,实际可在调用接口时指定
  15. mqtt.receiver.defaultTopic=hangge

二、实现 MQTT 消息的发送

基础连接配置

  1. package com.gyz.mqtt.config;
  2. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  3. import org.springframework.beans.factory.annotation.Value;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
  7. import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
  8. /**
  9. * 配置连接
  10. *
  11. * @author gong_yuzhuo
  12. */
  13. @Configuration
  14. public class MqttBaseConfig {
  15. /**
  16. * 发布的bean名称
  17. */
  18. public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
  19. /** 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息 */
  20. private static final byte[] WILL_DATA;
  21. static {
  22. WILL_DATA = "offline".getBytes();
  23. }
  24. @Value("${mqtt.username}")
  25. private String username;
  26. @Value("${mqtt.password}")
  27. private String password;
  28. @Value("${mqtt.host}")
  29. private String url;
  30. @Value("${mqtt.sender.clientId}")
  31. private String clientId;
  32. @Value("${mqtt.sender.defaultTopic}")
  33. private String defaultTopic;
  34. @Bean
  35. public MqttPahoClientFactory factory() {
  36. DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
  37. MqttConnectOptions options = new MqttConnectOptions();
  38. // 设置连接的用户名
  39. if (!username.trim().equals("")) {
  40. options.setUserName(username);
  41. }
  42. // 设置连接的密码
  43. options.setPassword(password.toCharArray());
  44. // 设置连接的地址
  45. options.setServerURIs(new String[]{url});
  46. // 设置超时时间 单位为秒
  47. options.setConnectionTimeout(10);
  48. // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
  49. // 但这个方法并没有重连的机制
  50. options.setKeepAliveInterval(20);
  51. // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
  52. options.setWill("willTopic", WILL_DATA, 2, false);
  53. factory.setConnectionOptions(options);
  54. return factory;
  55. }
  56. }

MqttSenderConfig.java(MQTT 消息推送配置类)

  1. package com.gyz.mqtt.config;
  2. import org.springframework.beans.factory.annotation.Value;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import org.springframework.integration.annotation.ServiceActivator;
  6. import org.springframework.integration.channel.DirectChannel;
  7. import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
  8. import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
  9. import org.springframework.messaging.MessageChannel;
  10. import org.springframework.messaging.MessageHandler;
  11. /**
  12. * MQTT配置,生产者
  13. */
  14. @Configuration
  15. public class MqttSenderConfig {
  16. /**
  17. * 发布的bean名称
  18. */
  19. public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
  20. /** 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息 */
  21. private static final byte[] WILL_DATA;
  22. static {
  23. WILL_DATA = "offline".getBytes();
  24. }
  25. @Value("${mqtt.username}")
  26. private String username;
  27. @Value("${mqtt.password}")
  28. private String password;
  29. @Value("${mqtt.host}")
  30. private String url;
  31. @Value("${mqtt.sender.clientId}")
  32. private String clientId;
  33. @Value("${mqtt.sender.defaultTopic}")
  34. private String defaultTopic;
  35. /**
  36. * MQTT信息通道(生产者)
  37. */
  38. @Bean(name = CHANNEL_NAME_OUT)
  39. public MessageChannel mqttOutboundChannel() {
  40. return new DirectChannel();
  41. }
  42. /**
  43. * MQTT消息处理器(生产者)
  44. */
  45. @Bean
  46. @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
  47. public MessageHandler mqttOutbound(MqttPahoClientFactory factory) {
  48. MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
  49. clientId,
  50. factory);
  51. messageHandler.setAsync(true);
  52. messageHandler.setDefaultTopic(defaultTopic);
  53. return messageHandler;
  54. }
  55. }

2、IMqttSender(消息推送接口类)

  1. package com.gyz.mqtt.service;
  2. import com.gyz.mqtt.config.MqttSenderConfig;
  3. import org.springframework.integration.annotation.MessagingGateway;
  4. import org.springframework.integration.mqtt.support.MqttHeaders;
  5. import org.springframework.messaging.handler.annotation.Header;
  6. import org.springframework.stereotype.Component;
  7. /**
  8. * @author gong_yuzhuo
  9. */
  10. @Component
  11. @MessagingGateway(defaultRequestChannel = MqttSenderConfig.CHANNEL_NAME_OUT)
  12. public interface IMqttSender {
  13. /**
  14. * 发送信息到MQTT服务器
  15. *
  16. * @param data 发送的文本
  17. */
  18. void sendToMqtt(String data);
  19. /**
  20. * 发送信息到MQTT服务器
  21. *
  22. * @param topic 主题
  23. * @param payload 消息主体
  24. */
  25. void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
  26. /**
  27. * 发送信息到MQTT服务器
  28. *
  29. * @param topic 主题
  30. * @param QOS 对消息处理的几种机制:
  31. * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
  32. * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
  33. * 2 多了一次去重的动作,确保订阅者收到的消息有一次。
  34. * @param payload 消息主体
  35. */
  36. void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
  37. @Header(MqttHeaders.QOS) int QOS,
  38. String payload);
  39. }

3、HelloController.java(测试类)

  1. package com.gyz.mqtt.controller;
  2. import com.gyz.mqtt.service.IMqttSender;
  3. import org.springframework.web.bind.annotation.PathVariable;
  4. import org.springframework.web.bind.annotation.RequestMapping;
  5. import org.springframework.web.bind.annotation.RestController;
  6. import javax.annotation.Resource;
  7. /**
  8. * 测试发送消息
  9. *
  10. * @author gong_yuzhuo
  11. */
  12. @RestController
  13. public class HelloController {
  14. /**
  15. * 注入发送MQTT的Bean
  16. */
  17. @Resource
  18. private IMqttSender imqttSender;
  19. /**
  20. * 发送自定义消息内容(使用默认主题)
  21. * @param data
  22. */
  23. @RequestMapping("/test1/{data}")
  24. public void test1(@PathVariable("data") String data) {
  25. imqttSender.sendToMqtt(data);
  26. }
  27. /**
  28. * 发送自定义消息内容,且指定主题
  29. * @param topic
  30. * @param data
  31. */
  32. @RequestMapping("/test2/{topic}/{data}")
  33. public void test2(@PathVariable("topic") String topic, @PathVariable("data") String data) {
  34. imqttSender.sendToMqtt(topic, data);
  35. }
  36. }

4、测试运行

  1. 项目启动后,使用 MQTTBox订阅“hangge”这个主题,然后使用浏览器访问 http://localhost:10001/test1/abcd1234 可以看到 MQTTBox 这边可以成功接收到发布的消息:

MQTTBox的安装和使用

五、SpringBoot - 集成MQTT(发布消息) - 图1

  1. 接着我们访问 http://localhost:10001/test2/china/abcd1234,这次除了发送自定消息外还指定了主题(china,而不是使用默认主题),MQTTBox 这边订阅该主题并显示结果:

五、SpringBoot - 集成MQTT(发布消息) - 图2

参考文章:

https://www.hangge.com/blog/cache/detail_2610.html

https://www.hangge.com/blog/cache/detail_2611.html