1. # 1.通过pom引入依赖
  2. <!-- Netty -->
  3. <dependency>
  4. <groupId>io.netty</groupId>
  5. <artifactId>netty-all</artifactId>
  6. <version>4.1.45.Final</version>
  7. </dependency>
  8. <dependency>
  9. <groupId>org.springframework.boot</groupId>
  10. <artifactId>spring-boot-starter-integration</artifactId>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.springframework.integration</groupId>
  14. <artifactId>spring-integration-stream</artifactId>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.springframework.integration</groupId>
  18. <artifactId>spring-integration-mqtt</artifactId>
  19. </dependency>

2.mqtt引入方式

  1. import cn.hutool.core.util.IdUtil;
  2. import com.dsa.videocontrol.mqtt.MqttQosEnum;
  3. import com.dsa.videocontrol.mqtt.handle.MqttReceiveHandle;
  4. import com.dsa.videocontrol.camera.pojo.dto.EmqInfoDTO;
  5. import com.dsa.videocontrol.camera.service.ConfigInfoService;
  6. import lombok.extern.slf4j.Slf4j;
  7. import org.apache.commons.lang3.StringUtils;
  8. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  9. import org.springframework.beans.factory.annotation.Value;
  10. import org.springframework.context.annotation.Bean;
  11. import org.springframework.context.annotation.Configuration;
  12. import org.springframework.integration.annotation.IntegrationComponentScan;
  13. import org.springframework.integration.annotation.ServiceActivator;
  14. import org.springframework.integration.channel.DirectChannel;
  15. import org.springframework.integration.core.MessageProducer;
  16. import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
  17. import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
  18. import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
  19. import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
  20. import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
  21. import org.springframework.messaging.MessageChannel;
  22. import org.springframework.messaging.MessageHandler;
  23. import java.util.Arrays;
  24. import java.util.List;
  25. /**
  26. * 描述
  27. * Title: MqttConfigdsa
  28. */
  29. @Slf4j
  30. @Configuration
  31. @IntegrationComponentScan
  32. public class MqttConfiguration {
  33. private final ConfigInfoService configInfoService;
  34. private final MqttReceiveHandle mqttReceiveHandle;
  35. /**
  36. * 消息驱动
  37. */
  38. private MqttPahoMessageDrivenChannelAdapter adapter;
  39. /**
  40. * 订阅的主题列表
  41. */
  42. @Value("${topic.subscribe}")
  43. private String defaultTopic;
  44. @Value("${topic.listen}")
  45. private String listenTopic;
  46. @Value("#{ T(java.lang.Math).random() * 1000.0 }")
  47. public String clientId;
  48. @Value(value = "200")
  49. public Long completionTimeout;
  50. public MqttConfiguration(ConfigInfoService configInfoService, MqttReceiveHandle mqttReceiveHandle) {
  51. this.configInfoService = configInfoService;
  52. this.mqttReceiveHandle = mqttReceiveHandle;
  53. }
  54. /**
  55. * mqtt 连接配置属性
  56. */
  57. @Bean(value = "getMqttConnectOptions")
  58. public MqttConnectOptions getMqttConnectOptions1() {
  59. EmqInfoDTO emqInfo = configInfoService.getEmqInfo();
  60. MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
  61. // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
  62. mqttConnectOptions.setCleanSession(false);
  63. // 设置超时时间 单位为秒
  64. mqttConnectOptions.setConnectionTimeout(60);
  65. mqttConnectOptions.setAutomaticReconnect(true);
  66. mqttConnectOptions.setUserName(emqInfo.getUserName());
  67. mqttConnectOptions.setPassword(emqInfo.getUserPassword().toCharArray());
  68. mqttConnectOptions.setServerURIs(new String[]{"tcp://" + emqInfo.getIp() + ":" + emqInfo.getPort()});
  69. // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
  70. mqttConnectOptions.setKeepAliveInterval(60);
  71. mqttConnectOptions.setMaxInflight(5000);
  72. // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
  73. //mqttConnectOptions.setWill("willTopic", WILL_DATA, 2, false);
  74. return mqttConnectOptions;
  75. }
  76. /**
  77. * MQTT工厂
  78. */
  79. @Bean
  80. public MqttPahoClientFactory mqttClientFactory() {
  81. DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
  82. factory.setConnectionOptions(getMqttConnectOptions1());
  83. return factory;
  84. }
  85. /**
  86. * MQTT信息通道(生产者)
  87. */
  88. @Bean
  89. public MessageChannel mqttOutboundChannel() {
  90. return new DirectChannel();
  91. }
  92. /**
  93. * MQTT消息处理器(生产者)
  94. */
  95. @Bean
  96. @ServiceActivator(inputChannel = "mqttOutboundChannel")
  97. public MessageHandler mqttOutbound() {
  98. MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(IdUtil.simpleUUID(), mqttClientFactory());
  99. messageHandler.setAsync(true);
  100. messageHandler.setDefaultTopic(defaultTopic+"#");
  101. messageHandler.setDefaultQos(MqttQosEnum.QOS_2.getQosLevel());
  102. return messageHandler;
  103. }
  104. @Bean
  105. public MessageChannel mqttInputChannel() {
  106. return new DirectChannel();
  107. }
  108. /**
  109. * 配置client,监听的topic
  110. * MQTT消息订阅绑定(消费者)
  111. */
  112. @Bean
  113. public MessageProducer inbound() {
  114. if (adapter == null) {
  115. adapter = new MqttPahoMessageDrivenChannelAdapter(IdUtil.simpleUUID() + "_inbound1", mqttClientFactory(), defaultTopic+"#");
  116. }
  117. String[] topics = listenTopic.split(",");
  118. for (String topic : topics) {
  119. if (!StringUtils.isEmpty(topic)) {
  120. adapter.addTopic(topic, MqttQosEnum.QOS_2.getQosLevel());
  121. }
  122. }
  123. adapter.setCompletionTimeout(completionTimeout);
  124. DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter(MqttQosEnum.QOS_2.getQosLevel(),false,"UTF-8");
  125. defaultPahoMessageConverter.setPayloadAsBytes(true);
  126. adapter.setConverter(defaultPahoMessageConverter);
  127. adapter.setQos(MqttQosEnum.QOS_2.getQosLevel());
  128. adapter.setOutputChannel(mqttInputChannel());
  129. return adapter;
  130. }
  131. /**
  132. * 增加监听的topic
  133. *
  134. * @param topicArr topic数组
  135. */
  136. public List<String> addListenTopic(String[] topicArr) {
  137. if (adapter == null) {
  138. adapter = new MqttPahoMessageDrivenChannelAdapter(IdUtil.simpleUUID() + "_inbound1", mqttClientFactory(),
  139. defaultTopic+"#");
  140. }
  141. List<String> listTopic = Arrays.asList(adapter.getTopic());
  142. for (String topic : topicArr) {
  143. if (!StringUtils.isEmpty(topic)) {
  144. if (!listTopic.contains(topic)) {
  145. adapter.addTopic(topic, MqttQosEnum.QOS_2.getQosLevel());
  146. }
  147. }
  148. }
  149. return Arrays.asList(adapter.getTopic());
  150. }
  151. /**
  152. * 移除一个监听的topic
  153. */
  154. public List<String> removeListenTopic(String topic) {
  155. if (adapter == null) {
  156. adapter = new MqttPahoMessageDrivenChannelAdapter(IdUtil.simpleUUID() + "_inbound1", mqttClientFactory(),
  157. defaultTopic+"#");
  158. }
  159. List<String> listTopic = Arrays.asList(adapter.getTopic());
  160. if (listTopic.contains(topic)) {
  161. adapter.removeTopic(topic);
  162. }
  163. return Arrays.asList(adapter.getTopic());
  164. }
  165. @Bean
  166. @ServiceActivator(inputChannel = "mqttInputChannel")
  167. public MessageHandler handler() {
  168. return mqttReceiveHandle::handle;
  169. }
  170. }
  1. # mqttReceiveHandle --mqtt的处理
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.test.videocontrol.mqtt.service.MqttMessageBizService;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.apache.commons.lang3.StringUtils;
  6. import org.springframework.messaging.Message;
  7. import org.springframework.stereotype.Component;
  8. /**
  9. * 描述
  10. * Title: MqttReceiveHandle
  11. */
  12. @Slf4j
  13. @Component
  14. public class MqttReceiveHandle {
  15. private final MqttMessageBizService mqttMessageBizService;
  16. public MqttReceiveHandle(MqttMessageBizService mqttMessageBizService) {
  17. this.mqttMessageBizService = mqttMessageBizService;
  18. }
  19. public void handle(Message<?> message) {
  20. byte[] payLoadByte = (byte[]) message.getPayload();
  21. String payLoad = new String(payLoadByte);
  22. /*log.info("主题:{},QOS:{},消息接收到的数据:{}", message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC),
  23. message.getHeaders().get(MqttHeaders.RECEIVED_QOS), payLoad); */
  24. if (StringUtils.isEmpty(payLoad)) {
  25. return;
  26. }
  27. JSONObject jsonMessage = JSONObject.parseObject(payLoad);
  28. mqttMessageBizService.messagePush(jsonMessage);
  29. }
  30. }
  1. # 定义一个MqttGateway的接口,用来发送消息
  2. import org.springframework.integration.annotation.MessagingGateway;
  3. import org.springframework.integration.mqtt.support.MqttHeaders;
  4. import org.springframework.messaging.handler.annotation.Header;
  5. import org.springframework.stereotype.Component;
  6. /**
  7. * Title: MqttGateway
  8. * Description: mqtt 推送消息类
  9. */
  10. @Component
  11. @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
  12. public interface MqttGateway {
  13. /**
  14. * 发送信息到MQTT服务器
  15. *
  16. * @param data 发送的文本
  17. */
  18. void sendToMqtt(String data);
  19. /**
  20. * 发送信息到MQTT服务器
  21. *
  22. * @param topic 主题
  23. * @param payload 消息主体
  24. */
  25. void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
  26. String payload);
  27. /**
  28. * 发送信息到MQTT服务器
  29. *
  30. * @param topic 主题
  31. * @param qos 对消息处理的几种机制。
  32. * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
  33. * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
  34. * 2 多了一次去重的动作,确保订阅者收到的消息有一次。
  35. * @param payload 消息主体
  36. */
  37. void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
  38. @Header(MqttHeaders.QOS) int qos,
  39. String payload);
  40. }
  1. import com.alibaba.fastjson.JSONObject;
  2. import com.test.videocontrol.mqtt.strategy.MqttJsonParseContext;
  3. import com.test.videocontrol.mqtt.strategy.MqttMessageStrategyFactory;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.stereotype.Service;
  6. @Slf4j
  7. @Service
  8. public class MqttMessageBizServiceImpl {
  9. public void messagePush(JSONObject json) {
  10. int cmd = json.getIntValue("cmd");
  11. try {
  12. MqttJsonParseContext mqttJsonParseContext = MqttJsonParseContext.builder().jsonObject(json).cmd(cmd).build();
  13. MqttMessageStrategyFactory.strategyMethod(mqttJsonParseContext);
  14. } catch (Exception e) {
  15. e.printStackTrace();
  16. log.error("处理MQTT消息发生异常:message:{}------------>stackTrace:{}", e.getMessage(), e.getStackTrace());
  17. }
  18. }
  19. }
  1. # 请求返回的处理策略
  2. import com.test.videocontrol.mqtt.PtzCmdConstant;
  3. import lombok.extern.slf4j.Slf4j;
  4. @Slf4j
  5. public class MqttMessageStrategyFactory {
  6. public static void strategyMethod(MqttJsonParseContext context) {
  7. ExecuteMessageServiceStrategy executeMessageServiceStrategy = null;
  8. switch (context.getCmd()) {
  9. case PtzCmdConstant.PTZ_CHANNEL_CONTROL_RETURN_CMD:
  10. executeMessageServiceStrategy = new PtzControlReturnMessageStrategy();
  11. break;
  12. case PtzCmdConstant.PRESET_RETURN_CMD:
  13. ... ... // 处理自己模块的逻辑
  14. break;
  15. case PtzCmdConstant.CAPTURE_RETURN_CMD:
  16. ... ...
  17. break;
  18. case PtzCmdConstant.REGION_ZOOM_RETURN_CMD:
  19. ... ...
  20. break;
  21. default:
  22. break;
  23. }
  24. if (executeMessageServiceStrategy != null) {
  25. MqttMessageContext mqttMessageContext = new MqttMessageContext(executeMessageServiceStrategy, context);
  26. mqttMessageContext.execute();
  27. }
  28. }
  29. }
  1. import com.alibaba.fastjson.JSON;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.test.videocontrol.mqtt.PtzCmdEnum;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.stereotype.Service;
  6. @Slf4j
  7. @Service
  8. public class PtzControlReturnMessageStrategy implements ExecuteMessageServiceStrategy {
  9. //private CenterService centerService;
  10. @Override
  11. public void executeMqttMessage(MqttJsonParseContext mqttJsonParseContext) {
  12. Integer cmdType = mqttJsonParseContext.getCmd();
  13. if(PtzCmdEnum.PTZ_CHANNEL_CONTROL_RETURN_CMD.getCmd().equals(cmdType)) {
  14. JSONObject jsonObject = mqttJsonParseContext.getJsonObject();
  15. log.info("云台控制操作mqtt返回消息:{}",JSON.toJSONString(jsonObject));
  16. }
  17. }
  18. }