# 1.通过pom引入依赖<!-- Netty --><dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.45.Final</version></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId></dependency><dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId></dependency><dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId></dependency>
2.mqtt引入方式
import cn.hutool.core.util.IdUtil;import com.dsa.videocontrol.mqtt.MqttQosEnum;import com.dsa.videocontrol.mqtt.handle.MqttReceiveHandle;import com.dsa.videocontrol.camera.pojo.dto.EmqInfoDTO;import com.dsa.videocontrol.camera.service.ConfigInfoService;import lombok.extern.slf4j.Slf4j;import org.apache.commons.lang3.StringUtils;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.integration.annotation.IntegrationComponentScan;import org.springframework.integration.annotation.ServiceActivator;import org.springframework.integration.channel.DirectChannel;import org.springframework.integration.core.MessageProducer;import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;import org.springframework.integration.mqtt.core.MqttPahoClientFactory;import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.MessageHandler;import java.util.Arrays;import java.util.List;/** * 描述 * Title: MqttConfigdsa */@Slf4j@Configuration@IntegrationComponentScanpublic class MqttConfiguration { private final ConfigInfoService configInfoService; private final MqttReceiveHandle mqttReceiveHandle; /** * 消息驱动 */ private MqttPahoMessageDrivenChannelAdapter adapter; /** * 订阅的主题列表 */ @Value("${topic.subscribe}") private String defaultTopic; @Value("${topic.listen}") private String listenTopic; @Value("#{ T(java.lang.Math).random() * 1000.0 }") public String clientId; @Value(value = "200") public Long completionTimeout; public MqttConfiguration(ConfigInfoService configInfoService, MqttReceiveHandle mqttReceiveHandle) { this.configInfoService = configInfoService; this.mqttReceiveHandle = mqttReceiveHandle; } /** * mqtt 连接配置属性 */ @Bean(value = "getMqttConnectOptions") public MqttConnectOptions getMqttConnectOptions1() { EmqInfoDTO emqInfo = configInfoService.getEmqInfo(); MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接 mqttConnectOptions.setCleanSession(false); // 设置超时时间 单位为秒 mqttConnectOptions.setConnectionTimeout(60); mqttConnectOptions.setAutomaticReconnect(true); mqttConnectOptions.setUserName(emqInfo.getUserName()); mqttConnectOptions.setPassword(emqInfo.getUserPassword().toCharArray()); mqttConnectOptions.setServerURIs(new String[]{"tcp://" + emqInfo.getIp() + ":" + emqInfo.getPort()}); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制 mqttConnectOptions.setKeepAliveInterval(60); mqttConnectOptions.setMaxInflight(5000); // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。 //mqttConnectOptions.setWill("willTopic", WILL_DATA, 2, false); return mqttConnectOptions; } /** * MQTT工厂 */ @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttConnectOptions1()); return factory; } /** * MQTT信息通道(生产者) */ @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } /** * MQTT消息处理器(生产者) */ @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(IdUtil.simpleUUID(), mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic(defaultTopic+"#"); messageHandler.setDefaultQos(MqttQosEnum.QOS_2.getQosLevel()); return messageHandler; } @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } /** * 配置client,监听的topic * MQTT消息订阅绑定(消费者) */ @Bean public MessageProducer inbound() { if (adapter == null) { adapter = new MqttPahoMessageDrivenChannelAdapter(IdUtil.simpleUUID() + "_inbound1", mqttClientFactory(), defaultTopic+"#"); } String[] topics = listenTopic.split(","); for (String topic : topics) { if (!StringUtils.isEmpty(topic)) { adapter.addTopic(topic, MqttQosEnum.QOS_2.getQosLevel()); } } adapter.setCompletionTimeout(completionTimeout); DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter(MqttQosEnum.QOS_2.getQosLevel(),false,"UTF-8"); defaultPahoMessageConverter.setPayloadAsBytes(true); adapter.setConverter(defaultPahoMessageConverter); adapter.setQos(MqttQosEnum.QOS_2.getQosLevel()); adapter.setOutputChannel(mqttInputChannel()); return adapter; } /** * 增加监听的topic * * @param topicArr topic数组 */ public List<String> addListenTopic(String[] topicArr) { if (adapter == null) { adapter = new MqttPahoMessageDrivenChannelAdapter(IdUtil.simpleUUID() + "_inbound1", mqttClientFactory(), defaultTopic+"#"); } List<String> listTopic = Arrays.asList(adapter.getTopic()); for (String topic : topicArr) { if (!StringUtils.isEmpty(topic)) { if (!listTopic.contains(topic)) { adapter.addTopic(topic, MqttQosEnum.QOS_2.getQosLevel()); } } } return Arrays.asList(adapter.getTopic()); } /** * 移除一个监听的topic */ public List<String> removeListenTopic(String topic) { if (adapter == null) { adapter = new MqttPahoMessageDrivenChannelAdapter(IdUtil.simpleUUID() + "_inbound1", mqttClientFactory(), defaultTopic+"#"); } List<String> listTopic = Arrays.asList(adapter.getTopic()); if (listTopic.contains(topic)) { adapter.removeTopic(topic); } return Arrays.asList(adapter.getTopic()); } @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return mqttReceiveHandle::handle; }}
# mqttReceiveHandle --mqtt的处理import com.alibaba.fastjson.JSONObject;import com.test.videocontrol.mqtt.service.MqttMessageBizService;import lombok.extern.slf4j.Slf4j;import org.apache.commons.lang3.StringUtils;import org.springframework.messaging.Message;import org.springframework.stereotype.Component;/** * 描述 * Title: MqttReceiveHandle */@Slf4j@Componentpublic class MqttReceiveHandle { private final MqttMessageBizService mqttMessageBizService; public MqttReceiveHandle(MqttMessageBizService mqttMessageBizService) { this.mqttMessageBizService = mqttMessageBizService; } public void handle(Message<?> message) { byte[] payLoadByte = (byte[]) message.getPayload(); String payLoad = new String(payLoadByte); /*log.info("主题:{},QOS:{},消息接收到的数据:{}", message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC), message.getHeaders().get(MqttHeaders.RECEIVED_QOS), payLoad); */ if (StringUtils.isEmpty(payLoad)) { return; } JSONObject jsonMessage = JSONObject.parseObject(payLoad); mqttMessageBizService.messagePush(jsonMessage); }}
# 定义一个MqttGateway的接口,用来发送消息import org.springframework.integration.annotation.MessagingGateway;import org.springframework.integration.mqtt.support.MqttHeaders;import org.springframework.messaging.handler.annotation.Header;import org.springframework.stereotype.Component;/** * Title: MqttGateway * Description: mqtt 推送消息类 */@Component@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")public interface MqttGateway { /** * 发送信息到MQTT服务器 * * @param data 发送的文本 */ void sendToMqtt(String data); /** * 发送信息到MQTT服务器 * * @param topic 主题 * @param payload 消息主体 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); /** * 发送信息到MQTT服务器 * * @param topic 主题 * @param qos 对消息处理的几种机制。 * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。 * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。 * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 * @param payload 消息主体 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);}
import com.alibaba.fastjson.JSONObject;import com.test.videocontrol.mqtt.strategy.MqttJsonParseContext;import com.test.videocontrol.mqtt.strategy.MqttMessageStrategyFactory;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Service;@Slf4j@Servicepublic class MqttMessageBizServiceImpl { public void messagePush(JSONObject json) { int cmd = json.getIntValue("cmd"); try { MqttJsonParseContext mqttJsonParseContext = MqttJsonParseContext.builder().jsonObject(json).cmd(cmd).build(); MqttMessageStrategyFactory.strategyMethod(mqttJsonParseContext); } catch (Exception e) { e.printStackTrace(); log.error("处理MQTT消息发生异常:message:{}------------>stackTrace:{}", e.getMessage(), e.getStackTrace()); } }}
# 请求返回的处理策略import com.test.videocontrol.mqtt.PtzCmdConstant;import lombok.extern.slf4j.Slf4j;@Slf4jpublic class MqttMessageStrategyFactory { public static void strategyMethod(MqttJsonParseContext context) { ExecuteMessageServiceStrategy executeMessageServiceStrategy = null; switch (context.getCmd()) { case PtzCmdConstant.PTZ_CHANNEL_CONTROL_RETURN_CMD: executeMessageServiceStrategy = new PtzControlReturnMessageStrategy(); break; case PtzCmdConstant.PRESET_RETURN_CMD: ... ... // 处理自己模块的逻辑 break; case PtzCmdConstant.CAPTURE_RETURN_CMD: ... ... break; case PtzCmdConstant.REGION_ZOOM_RETURN_CMD: ... ... break; default: break; } if (executeMessageServiceStrategy != null) { MqttMessageContext mqttMessageContext = new MqttMessageContext(executeMessageServiceStrategy, context); mqttMessageContext.execute(); } }}
import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.test.videocontrol.mqtt.PtzCmdEnum;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Service;@Slf4j@Servicepublic class PtzControlReturnMessageStrategy implements ExecuteMessageServiceStrategy { //private CenterService centerService; @Override public void executeMqttMessage(MqttJsonParseContext mqttJsonParseContext) { Integer cmdType = mqttJsonParseContext.getCmd(); if(PtzCmdEnum.PTZ_CHANNEL_CONTROL_RETURN_CMD.getCmd().equals(cmdType)) { JSONObject jsonObject = mqttJsonParseContext.getJsonObject(); log.info("云台控制操作mqtt返回消息:{}",JSON.toJSONString(jsonObject)); } }}