其中 pom.xml 文件的依赖配置、基础连接配置、 application.properties 文件的 MQTT 服务器配置同前文一样。

一、实现 MQTT 消息的订阅

动态监听topic没有实现!

  1. package com.gyz.mqtt.config;
  2. import org.apache.commons.lang3.StringUtils;
  3. import org.springframework.beans.factory.annotation.Value;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. import org.springframework.integration.annotation.ServiceActivator;
  7. import org.springframework.integration.channel.DirectChannel;
  8. import org.springframework.integration.core.MessageProducer;
  9. import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
  10. import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
  11. import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
  12. import org.springframework.messaging.Message;
  13. import org.springframework.messaging.MessageChannel;
  14. import org.springframework.messaging.MessageHandler;
  15. import org.springframework.messaging.MessagingException;
  16. /**
  17. * MQTT配置,消费者
  18. *
  19. * @author gong_yuzhuo
  20. */
  21. @Configuration
  22. public class MqttReceiverConfig {
  23. /**
  24. * 订阅的bean名称
  25. */
  26. public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
  27. public MqttPahoMessageDrivenChannelAdapter adapter;
  28. /** 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息 */
  29. private static final byte[] WILL_DATA;
  30. static {
  31. WILL_DATA = "offline".getBytes();
  32. }
  33. @Value("${mqtt.username}")
  34. private String username;
  35. @Value("${mqtt.password}")
  36. private String password;
  37. @Value("${mqtt.host}")
  38. private String url;
  39. @Value("${mqtt.receiver.clientId}")
  40. private String clientId;
  41. @Value("${mqtt.receiver.defaultTopic}")
  42. private String defaultTopic;
  43. /**
  44. * MQTT信息通道(消费者)
  45. */
  46. @Bean(name = CHANNEL_NAME_IN)
  47. public MessageChannel mqttInboundChannel() {
  48. return new DirectChannel();
  49. }
  50. /**
  51. * MQTT消息订阅绑定(消费者)
  52. */
  53. @Bean
  54. public MessageProducer inbound(MqttPahoClientFactory factory) {
  55. // 可以同时消费(订阅)多个Topic
  56. adapter = new MqttPahoMessageDrivenChannelAdapter(
  57. clientId, factory,
  58. StringUtils.split(defaultTopic, ","));
  59. adapter.setCompletionTimeout(5000);
  60. adapter.setConverter(new DefaultPahoMessageConverter());
  61. adapter.setQos(1);
  62. // 设置订阅通道
  63. adapter.setOutputChannel(mqttInboundChannel());
  64. return adapter;
  65. }
  66. /**
  67. * MQTT消息处理器(消费者)
  68. */
  69. @Bean
  70. @ServiceActivator(inputChannel = CHANNEL_NAME_IN)
  71. public MessageHandler handler() {
  72. return new MessageHandler() {
  73. @Override
  74. public void handleMessage(Message<?> message) throws MessagingException {
  75. String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
  76. String msg = message.getPayload().toString();
  77. System.out.println("\n--------------------START-------------------\n" +
  78. "接收到订阅消息:\ntopic:" + topic + "\nmessage:" + msg +
  79. "\n---------------------END--------------------");
  80. }
  81. };
  82. }
  83. }

二、测试运行

项目启动后,我们使用 MQTTBox 对“hangge”这个主题,发送一条消息:

六、SpringBoot - 集成MQTT(订阅消息) - 图1

可以看到 SprinBoot 项目这边成功接收到消息并打印出来:

六、SpringBoot - 集成MQTT(订阅消息) - 图2

发送“hello”主题的消息:

六、SpringBoot - 集成MQTT(订阅消息) - 图3

可以看到“hello”主题消息也已被接收成功

六、SpringBoot - 集成MQTT(订阅消息) - 图4

注意主题配置

六、SpringBoot - 集成MQTT(订阅消息) - 图5