MQTT在SpringBoot中的配置

1. 完整示例

  1. <!--第一步:就是在Maven中的需要添加的依赖-->
  2. <dependencies>
  3. <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
  4. <dependency>
  5. <groupId>com.alibaba</groupId>
  6. <artifactId>fastjson</artifactId>
  7. <version>1.2.68</version>
  8. </dependency>
  9. <!--mqtt依赖-->
  10. <dependency>
  11. <groupId>org.springframework.boot</groupId>
  12. <artifactId>spring-boot-starter-integration</artifactId>
  13. </dependency>
  14. <dependency>
  15. <groupId>org.springframework.integration</groupId>
  16. <artifactId>spring-integration-stream</artifactId>
  17. </dependency>
  18. <dependency>
  19. <groupId>org.springframework.integration</groupId>
  20. <artifactId>spring-integration-mqtt</artifactId>
  21. </dependency>
  22. <!--lombok-->
  23. <dependency>
  24. <groupId>org.projectlombok</groupId>
  25. <artifactId>lombok</artifactId>
  26. <optional>true</optional>
  27. </dependency>
  28. <dependency>
  29. <groupId>org.springframework.boot</groupId>
  30. <artifactId>spring-boot-starter-web</artifactId>
  31. </dependency>
  32. <dependency>
  33. <groupId>org.springframework.boot</groupId>
  34. <artifactId>spring-boot-starter-test</artifactId>
  35. <scope>test</scope>
  36. </dependency>
  37. </dependencies>
  1. # 第二步: 在application.yml中,添加关于mqtt的配置 (如果mqtt的IP、port、user、password在库中则需要在实现时候查表进行) #
  2. server:
  3. port: 8023
  4. spring:
  5. #给项目来个名字
  6. application:
  7. name: mqtt
  8. #MQTT-用户名
  9. mqtt:
  10. username: admin
  11. #MQTT-密码password: password
  12. password: public
  13. #MQTT-服务器连接地址,如果有多个,用逗号隔开,如:tcp://127.0.0.1:1883,tcp://192.168.2.133:1883
  14. url: tcp://127.0.0.1:1883
  15. #MQTT-连接服务器默认客户端ID
  16. client:
  17. id: mqttId
  18. #MQTT-默认的消息推送主题,实际可在调用接口时指定
  19. default:
  20. topic: topic
  21. yihonWQM: YIHON_WQ_M
  22. #连接超时
  23. completionTimeout: 3000
  1. /**
  2. * 第三步: MqttSenderAndReceiveConfig推送接受消息类
  3. */
  4. import cn.hutool.core.util.IdUtil;
  5. import com.zxk.mqtt.MqttQosEnum;
  6. import com.zxk.mqtt.handle.MqttReceiveHandle;
  7. import com.zxk.camera.pojo.dto.EmqInfoDTO;
  8. import com.zxk.camera.service.ConfigInfoService;
  9. import lombok.extern.slf4j.Slf4j;
  10. import org.apache.commons.lang3.StringUtils;
  11. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  12. import org.springframework.beans.factory.annotation.Value;
  13. import org.springframework.context.annotation.Bean;
  14. import org.springframework.context.annotation.Configuration;
  15. import org.springframework.integration.annotation.IntegrationComponentScan;
  16. import org.springframework.integration.annotation.ServiceActivator;
  17. import org.springframework.integration.channel.DirectChannel;
  18. import org.springframework.integration.core.MessageProducer;
  19. import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
  20. import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
  21. import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
  22. import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
  23. import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
  24. import org.springframework.messaging.MessageChannel;
  25. import org.springframework.messaging.MessageHandler;
  26. import java.util.Arrays;
  27. import java.util.List;
  28. @Slf4j
  29. @Configuration
  30. @IntegrationComponentScan
  31. public class MqttConfiguration {
  32. private final ConfigInfoService configInfoService;
  33. private final MqttReceiveHandle mqttReceiveHandle;
  34. /**
  35. * 消息驱动
  36. */
  37. private MqttPahoMessageDrivenChannelAdapter adapter;
  38. @Value("${spring.mqtt.username}")
  39. private String username;
  40. @Value("${spring.mqtt.password}")
  41. private String password;
  42. @Value("${spring.mqtt.url}")
  43. private String hostUrl;
  44. @Value("${spring.mqtt.client.id}")
  45. private String clientId;
  46. @Value("${spring.mqtt.default.topic}")
  47. private String defaultTopic;
  48. //水质设备主题
  49. @Value("${spring.mqtt.default.yihonWQM}")
  50. private String yihonWQM;
  51. @Value("${spring.mqtt.completionTimeout}")
  52. private int completionTimeout; //连接超时
  53. public MqttConfiguration(ConfigInfoService configInfoService, MqttReceiveHandle mqttReceiveHandle) {
  54. this.configInfoService = configInfoService;
  55. this.mqttReceiveHandle = mqttReceiveHandle;
  56. }
  57. /**
  58. * mqtt 连接配置属性
  59. */
  60. @Bean(value = "getMqttConnectOptions")
  61. public MqttConnectOptions getMqttConnectOptions1(){
  62. MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
  63. // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
  64. mqttConnectOptions.setCleanSession(true);
  65. // 设置超时时间 单位为秒
  66. mqttConnectOptions.setConnectionTimeout(10);
  67. mqttConnectOptions.setAutomaticReconnect(true);
  68. mqttConnectOptions.setUserName(username);
  69. mqttConnectOptions.setPassword(password.toCharArray());
  70. mqttConnectOptions.setServerURIs(new String[]{hostUrl});
  71. // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
  72. mqttConnectOptions.setKeepAliveInterval(10);
  73. // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
  74. //mqttConnectOptions.setWill("willTopic", WILL_DATA, 2, false);
  75. return mqttConnectOptions;
  76. /**
  77. * MQTT工厂
  78. */
  79. @Bean
  80. public MqttPahoClientFactory mqttClientFactory() {
  81. DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
  82. factory.setConnectionOptions(getMqttConnectOptions1());
  83. return factory;
  84. }
  85. /**
  86. * MQTT信息通道(生产者)
  87. */
  88. @Bean
  89. public MessageChannel mqttOutboundChannel() {
  90. return new DirectChannel();
  91. }
  92. /**
  93. * MQTT消息处理器(生产者)
  94. */
  95. @Bean
  96. @ServiceActivator(inputChannel = "mqttOutboundChannel")
  97. public MessageHandler mqttOutbound() {
  98. MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(IdUtil.simpleUUID(), mqttClientFactory());
  99. messageHandler.setAsync(true);
  100. messageHandler.setDefaultTopic(defaultTopic);
  101. messageHandler.setDefaultQos(2);
  102. return messageHandler;
  103. }
  104. @Bean
  105. public MessageChannel mqttInputChannel() {
  106. return new DirectChannel();
  107. }
  108. /**
  109. * 配置client,监听的topic
  110. * MQTT消息订阅绑定(消费者)
  111. */
  112. @Bean
  113. public MessageProducer inbound() {
  114. if (adapter == null) {
  115. adapter = new MqttPahoMessageDrivenChannelAdapter(IdUtil.simpleUUID() + "_inbound1", mqttClientFactory(), defaultTopic, yihonWQM);
  116. }
  117. String[] topics = listenTopic.split(",");
  118. for (String topic : topics) {
  119. if (!StringUtils.isEmpty(topic)) {
  120. adapter.addTopic(topic, 2);
  121. }
  122. }
  123. adapter.setCompletionTimeout(completionTimeout);
  124. DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter(2,false,"UTF-8");
  125. defaultPahoMessageConverter.setPayloadAsBytes(true);
  126. adapter.setConverter(defaultPahoMessageConverter);
  127. // 设置服务质量
  128. // 0 最多一次,数据可能丢失;
  129. // 1 至少一次,数据可能重复;
  130. // 2 只有一次,有且只有一次;最耗性能
  131. adapter.setQos(1);
  132. adapter.setOutputChannel(mqttInputChannel());
  133. return adapter;
  134. }
  135. /**
  136. * 增加监听的topic
  137. *
  138. * @param topicArr topic数组
  139. */
  140. public List<String> addListenTopic(String[] topicArr) {
  141. if (adapter == null) {
  142. adapter = new MqttPahoMessageDrivenChannelAdapter(IdUtil.simpleUUID() + "_inbound1", mqttClientFactory(),
  143. defaultTopic);
  144. }
  145. List<String> listTopic = Arrays.asList(adapter.getTopic());
  146. for (String topic : topicArr) {
  147. if (!StringUtils.isEmpty(topic)) {
  148. if (!listTopic.contains(topic)) {
  149. adapter.addTopic(topic, 2);
  150. }
  151. }
  152. }
  153. return Arrays.asList(adapter.getTopic());
  154. }
  155. /**
  156. * 移除一个监听的topic
  157. */
  158. public List<String> removeListenTopic(String topic) {
  159. if (adapter == null) {
  160. adapter = new MqttPahoMessageDrivenChannelAdapter(IdUtil.simpleUUID() + "_inbound1", mqttClientFactory(),
  161. defaultTopic);
  162. }
  163. List<String> listTopic = Arrays.asList(adapter.getTopic());
  164. if (listTopic.contains(topic)) {
  165. adapter.removeTopic(topic);
  166. }
  167. return Arrays.asList(adapter.getTopic());
  168. }
  169. /**
  170. * MQTT消息处理器(消费者)
  171. * **/
  172. @Bean
  173. @ServiceActivator(inputChannel = "mqttInputChannel")
  174. public MessageHandler handler() {
  175. return mqttReceiveHandle::handle;
  176. }
  177. }
  1. /**
  2. * 第四步: MqttGateway发送消息类
  3. */
  4. import org.springframework.integration.annotation.MessagingGateway;
  5. import org.springframework.integration.mqtt.support.MqttHeaders;
  6. import org.springframework.messaging.handler.annotation.Header;
  7. /**
  8. * mqtt发送消息
  9. * (defaultRequestChannel = "mqttOutboundChannel" 对应config配置)
  10. * **/
  11. @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
  12. public interface MqttGateway {
  13. /**
  14. * 发送信息到MQTT服务器
  15. *
  16. * @param data 发送的文本
  17. */
  18. void sendToMqtt(String data);
  19. /**
  20. * 发送信息到MQTT服务器
  21. *
  22. * @param topic 主题
  23. * @param payload 消息主体
  24. */
  25. void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
  26. String payload);
  27. /**
  28. * 发送信息到MQTT服务器
  29. *
  30. * @param topic 主题
  31. * @param qos 对消息处理的几种机制。
  32. * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
  33. * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
  34. * 2 多了一次去重的动作,确保订阅者收到的消息有一次。
  35. * @param payload 消息主体
  36. */
  37. void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
  38. @Header(MqttHeaders.QOS) int qos,
  39. String payload);
  40. }
  1. /**
  2. * 第五步: mqtt客户端消息处理
  3. */
  4. import com.alibaba.fastjson.JSON;
  5. import com.alibaba.fastjson.JSONObject;
  6. import lombok.extern.slf4j.Slf4j;
  7. import org.springframework.integration.mqtt.support.MqttHeaders;
  8. import org.springframework.messaging.Message;
  9. import org.springframework.stereotype.Component;
  10. /**
  11. * mqtt客户端消息处理类
  12. * **/
  13. @Slf4j
  14. @Component
  15. public class MqttReceiveHandle {
  16. public void handle(Message<?> message){
  17. log.info("主题:{},QOS:{},消息接收到的数据:{}", message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC), message.getHeaders().get(MqttHeaders.RECEIVED_QOS), message.getPayload());
  18. byte[] payLoadByte = (byte[]) message.getPayload();
  19. String payLoad = new String(payLoadByte);
  20. System.out.println("mqtt返回的信息有:" +JSON.toJSONString(payLoad));
  21. JSONObject jsonMessage = JSONObject.parseObject(payLoad);
  22. // todo: 当获取到返回信息后,继续往下执行不同返回的不同逻辑
  23. }
  24. }

在对具体的回调函数中,继续处理业务逻辑的时候:使用注解形式来获取对象和Service(如@Autowired、@Service),直接使用注解会报java.lang.NullException错误

解决方案: 通过上下文的方式,获取进行业务处理的service!

  1. /**
  2. * 第六步: 写一个方法,用来获取上下文
  3. */
  4. import org.springframework.beans.BeansException;
  5. import org.springframework.context.ApplicationContext;
  6. import org.springframework.context.ApplicationContextAware;
  7. import org.springframework.stereotype.Component;
  8. @Component
  9. public class GetUserDefinedServiceUtil implements ApplicationContextAware{
  10. private static ApplicationContext applicationContext = null;
  11. @Override
  12. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  13. if (null == GetUserDefinedServiceUtil.applicationContext) {
  14. GetUserDefinedServiceUtil.applicationContext = applicationContext;
  15. }
  16. }
  17. //获取applicationContext
  18. public static ApplicationContext getApplicationContext() {
  19. return applicationContext;
  20. }
  21. //通过class获取Bean.
  22. public static <T> T getBean(Class<T> clazz){
  23. return getApplicationContext().getBean(clazz);
  24. }
  25. //通过name,以及Clazz返回指定的Bean
  26. public static <T> T getBean(String name,Class<T> clazz){
  27. return getApplicationContext().getBean(name, clazz);
  28. }
  29. }
  1. /**
  2. * 在业务中,取出service对象
  3. */
  4. xxxxService xxxxService = GetUserDefinedServiceUtil.getBean(xxxxService.class);