# 1.通过pom引入依赖
<!-- Netty -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.45.Final</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
2.mqtt引入方式
import cn.hutool.core.util.IdUtil;
import com.dsa.videocontrol.mqtt.MqttQosEnum;
import com.dsa.videocontrol.mqtt.handle.MqttReceiveHandle;
import com.dsa.videocontrol.camera.pojo.dto.EmqInfoDTO;
import com.dsa.videocontrol.camera.service.ConfigInfoService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import java.util.Arrays;
import java.util.List;
/**
* 描述
* Title: MqttConfigdsa
*/
@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttConfiguration {
private final ConfigInfoService configInfoService;
private final MqttReceiveHandle mqttReceiveHandle;
/**
* 消息驱动
*/
private MqttPahoMessageDrivenChannelAdapter adapter;
/**
* 订阅的主题列表
*/
@Value("${topic.subscribe}")
private String defaultTopic;
@Value("${topic.listen}")
private String listenTopic;
@Value("#{ T(java.lang.Math).random() * 1000.0 }")
public String clientId;
@Value(value = "200")
public Long completionTimeout;
public MqttConfiguration(ConfigInfoService configInfoService, MqttReceiveHandle mqttReceiveHandle) {
this.configInfoService = configInfoService;
this.mqttReceiveHandle = mqttReceiveHandle;
}
/**
* mqtt 连接配置属性
*/
@Bean(value = "getMqttConnectOptions")
public MqttConnectOptions getMqttConnectOptions1() {
EmqInfoDTO emqInfo = configInfoService.getEmqInfo();
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
mqttConnectOptions.setCleanSession(false);
// 设置超时时间 单位为秒
mqttConnectOptions.setConnectionTimeout(60);
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setUserName(emqInfo.getUserName());
mqttConnectOptions.setPassword(emqInfo.getUserPassword().toCharArray());
mqttConnectOptions.setServerURIs(new String[]{"tcp://" + emqInfo.getIp() + ":" + emqInfo.getPort()});
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
mqttConnectOptions.setKeepAliveInterval(60);
mqttConnectOptions.setMaxInflight(5000);
// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
//mqttConnectOptions.setWill("willTopic", WILL_DATA, 2, false);
return mqttConnectOptions;
}
/**
* MQTT工厂
*/
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions1());
return factory;
}
/**
* MQTT信息通道(生产者)
*/
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
/**
* MQTT消息处理器(生产者)
*/
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(IdUtil.simpleUUID(), mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic+"#");
messageHandler.setDefaultQos(MqttQosEnum.QOS_2.getQosLevel());
return messageHandler;
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
/**
* 配置client,监听的topic
* MQTT消息订阅绑定(消费者)
*/
@Bean
public MessageProducer inbound() {
if (adapter == null) {
adapter = new MqttPahoMessageDrivenChannelAdapter(IdUtil.simpleUUID() + "_inbound1", mqttClientFactory(), defaultTopic+"#");
}
String[] topics = listenTopic.split(",");
for (String topic : topics) {
if (!StringUtils.isEmpty(topic)) {
adapter.addTopic(topic, MqttQosEnum.QOS_2.getQosLevel());
}
}
adapter.setCompletionTimeout(completionTimeout);
DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter(MqttQosEnum.QOS_2.getQosLevel(),false,"UTF-8");
defaultPahoMessageConverter.setPayloadAsBytes(true);
adapter.setConverter(defaultPahoMessageConverter);
adapter.setQos(MqttQosEnum.QOS_2.getQosLevel());
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
/**
* 增加监听的topic
*
* @param topicArr topic数组
*/
public List<String> addListenTopic(String[] topicArr) {
if (adapter == null) {
adapter = new MqttPahoMessageDrivenChannelAdapter(IdUtil.simpleUUID() + "_inbound1", mqttClientFactory(),
defaultTopic+"#");
}
List<String> listTopic = Arrays.asList(adapter.getTopic());
for (String topic : topicArr) {
if (!StringUtils.isEmpty(topic)) {
if (!listTopic.contains(topic)) {
adapter.addTopic(topic, MqttQosEnum.QOS_2.getQosLevel());
}
}
}
return Arrays.asList(adapter.getTopic());
}
/**
* 移除一个监听的topic
*/
public List<String> removeListenTopic(String topic) {
if (adapter == null) {
adapter = new MqttPahoMessageDrivenChannelAdapter(IdUtil.simpleUUID() + "_inbound1", mqttClientFactory(),
defaultTopic+"#");
}
List<String> listTopic = Arrays.asList(adapter.getTopic());
if (listTopic.contains(topic)) {
adapter.removeTopic(topic);
}
return Arrays.asList(adapter.getTopic());
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return mqttReceiveHandle::handle;
}
}
# mqttReceiveHandle --mqtt的处理
import com.alibaba.fastjson.JSONObject;
import com.test.videocontrol.mqtt.service.MqttMessageBizService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
/**
* 描述
* Title: MqttReceiveHandle
*/
@Slf4j
@Component
public class MqttReceiveHandle {
private final MqttMessageBizService mqttMessageBizService;
public MqttReceiveHandle(MqttMessageBizService mqttMessageBizService) {
this.mqttMessageBizService = mqttMessageBizService;
}
public void handle(Message<?> message) {
byte[] payLoadByte = (byte[]) message.getPayload();
String payLoad = new String(payLoadByte);
/*log.info("主题:{},QOS:{},消息接收到的数据:{}", message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC),
message.getHeaders().get(MqttHeaders.RECEIVED_QOS), payLoad); */
if (StringUtils.isEmpty(payLoad)) {
return;
}
JSONObject jsonMessage = JSONObject.parseObject(payLoad);
mqttMessageBizService.messagePush(jsonMessage);
}
}
# 定义一个MqttGateway的接口,用来发送消息
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
/**
* Title: MqttGateway
* Description: mqtt 推送消息类
*/
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
/**
* 发送信息到MQTT服务器
*
* @param data 发送的文本
*/
void sendToMqtt(String data);
/**
* 发送信息到MQTT服务器
*
* @param topic 主题
* @param payload 消息主体
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
String payload);
/**
* 发送信息到MQTT服务器
*
* @param topic 主题
* @param qos 对消息处理的几种机制。
* 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
* 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
* 2 多了一次去重的动作,确保订阅者收到的消息有一次。
* @param payload 消息主体
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
@Header(MqttHeaders.QOS) int qos,
String payload);
}
import com.alibaba.fastjson.JSONObject;
import com.test.videocontrol.mqtt.strategy.MqttJsonParseContext;
import com.test.videocontrol.mqtt.strategy.MqttMessageStrategyFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class MqttMessageBizServiceImpl {
public void messagePush(JSONObject json) {
int cmd = json.getIntValue("cmd");
try {
MqttJsonParseContext mqttJsonParseContext = MqttJsonParseContext.builder().jsonObject(json).cmd(cmd).build();
MqttMessageStrategyFactory.strategyMethod(mqttJsonParseContext);
} catch (Exception e) {
e.printStackTrace();
log.error("处理MQTT消息发生异常:message:{}------------>stackTrace:{}", e.getMessage(), e.getStackTrace());
}
}
}
# 请求返回的处理策略
import com.test.videocontrol.mqtt.PtzCmdConstant;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class MqttMessageStrategyFactory {
public static void strategyMethod(MqttJsonParseContext context) {
ExecuteMessageServiceStrategy executeMessageServiceStrategy = null;
switch (context.getCmd()) {
case PtzCmdConstant.PTZ_CHANNEL_CONTROL_RETURN_CMD:
executeMessageServiceStrategy = new PtzControlReturnMessageStrategy();
break;
case PtzCmdConstant.PRESET_RETURN_CMD:
... ... // 处理自己模块的逻辑
break;
case PtzCmdConstant.CAPTURE_RETURN_CMD:
... ...
break;
case PtzCmdConstant.REGION_ZOOM_RETURN_CMD:
... ...
break;
default:
break;
}
if (executeMessageServiceStrategy != null) {
MqttMessageContext mqttMessageContext = new MqttMessageContext(executeMessageServiceStrategy, context);
mqttMessageContext.execute();
}
}
}
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.test.videocontrol.mqtt.PtzCmdEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class PtzControlReturnMessageStrategy implements ExecuteMessageServiceStrategy {
//private CenterService centerService;
@Override
public void executeMqttMessage(MqttJsonParseContext mqttJsonParseContext) {
Integer cmdType = mqttJsonParseContext.getCmd();
if(PtzCmdEnum.PTZ_CHANNEL_CONTROL_RETURN_CMD.getCmd().equals(cmdType)) {
JSONObject jsonObject = mqttJsonParseContext.getJsonObject();
log.info("云台控制操作mqtt返回消息:{}",JSON.toJSONString(jsonObject));
}
}
}