MQTT在SpringBoot中的配置
1. 完整示例
<!--第一步:就是在Maven中的需要添加的依赖--><dependencies><!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.68</version></dependency><!--mqtt依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
# 第二步: 在application.yml中,添加关于mqtt的配置 (如果mqtt的IP、port、user、password在库中则需要在实现时候查表进行) #server:port: 8023spring:#给项目来个名字application:name: mqtt#MQTT-用户名mqtt:username: admin#MQTT-密码password: passwordpassword: public#MQTT-服务器连接地址,如果有多个,用逗号隔开,如:tcp://127.0.0.1:1883,tcp://192.168.2.133:1883url: tcp://127.0.0.1:1883#MQTT-连接服务器默认客户端IDclient:id: mqttId#MQTT-默认的消息推送主题,实际可在调用接口时指定default:topic: topicyihonWQM: YIHON_WQ_M#连接超时completionTimeout: 3000
/*** 第三步: MqttSenderAndReceiveConfig推送接受消息类*/import cn.hutool.core.util.IdUtil;import com.zxk.mqtt.MqttQosEnum;import com.zxk.mqtt.handle.MqttReceiveHandle;import com.zxk.camera.pojo.dto.EmqInfoDTO;import com.zxk.camera.service.ConfigInfoService;import lombok.extern.slf4j.Slf4j;import org.apache.commons.lang3.StringUtils;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.integration.annotation.IntegrationComponentScan;import org.springframework.integration.annotation.ServiceActivator;import org.springframework.integration.channel.DirectChannel;import org.springframework.integration.core.MessageProducer;import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;import org.springframework.integration.mqtt.core.MqttPahoClientFactory;import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.MessageHandler;import java.util.Arrays;import java.util.List;@Slf4j@Configuration@IntegrationComponentScanpublic class MqttConfiguration {private final ConfigInfoService configInfoService;private final MqttReceiveHandle mqttReceiveHandle;/*** 消息驱动*/private MqttPahoMessageDrivenChannelAdapter adapter;@Value("${spring.mqtt.username}")private String username;@Value("${spring.mqtt.password}")private String password;@Value("${spring.mqtt.url}")private String hostUrl;@Value("${spring.mqtt.client.id}")private String clientId;@Value("${spring.mqtt.default.topic}")private String defaultTopic;//水质设备主题@Value("${spring.mqtt.default.yihonWQM}")private String yihonWQM;@Value("${spring.mqtt.completionTimeout}")private int completionTimeout; //连接超时public MqttConfiguration(ConfigInfoService configInfoService, MqttReceiveHandle mqttReceiveHandle) {this.configInfoService = configInfoService;this.mqttReceiveHandle = mqttReceiveHandle;}/*** mqtt 连接配置属性*/@Bean(value = "getMqttConnectOptions")public MqttConnectOptions getMqttConnectOptions1(){MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接mqttConnectOptions.setCleanSession(true);// 设置超时时间 单位为秒mqttConnectOptions.setConnectionTimeout(10);mqttConnectOptions.setAutomaticReconnect(true);mqttConnectOptions.setUserName(username);mqttConnectOptions.setPassword(password.toCharArray());mqttConnectOptions.setServerURIs(new String[]{hostUrl});// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制mqttConnectOptions.setKeepAliveInterval(10);// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。//mqttConnectOptions.setWill("willTopic", WILL_DATA, 2, false);return mqttConnectOptions;/*** MQTT工厂*/@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(getMqttConnectOptions1());return factory;}/*** MQTT信息通道(生产者)*/@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}/*** MQTT消息处理器(生产者)*/@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(IdUtil.simpleUUID(), mqttClientFactory());messageHandler.setAsync(true);messageHandler.setDefaultTopic(defaultTopic);messageHandler.setDefaultQos(2);return messageHandler;}@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}/*** 配置client,监听的topic* MQTT消息订阅绑定(消费者)*/@Beanpublic MessageProducer inbound() {if (adapter == null) {adapter = new MqttPahoMessageDrivenChannelAdapter(IdUtil.simpleUUID() + "_inbound1", mqttClientFactory(), defaultTopic, yihonWQM);}String[] topics = listenTopic.split(",");for (String topic : topics) {if (!StringUtils.isEmpty(topic)) {adapter.addTopic(topic, 2);}}adapter.setCompletionTimeout(completionTimeout);DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter(2,false,"UTF-8");defaultPahoMessageConverter.setPayloadAsBytes(true);adapter.setConverter(defaultPahoMessageConverter);// 设置服务质量// 0 最多一次,数据可能丢失;// 1 至少一次,数据可能重复;// 2 只有一次,有且只有一次;最耗性能adapter.setQos(1);adapter.setOutputChannel(mqttInputChannel());return adapter;}/*** 增加监听的topic** @param topicArr topic数组*/public List<String> addListenTopic(String[] topicArr) {if (adapter == null) {adapter = new MqttPahoMessageDrivenChannelAdapter(IdUtil.simpleUUID() + "_inbound1", mqttClientFactory(),defaultTopic);}List<String> listTopic = Arrays.asList(adapter.getTopic());for (String topic : topicArr) {if (!StringUtils.isEmpty(topic)) {if (!listTopic.contains(topic)) {adapter.addTopic(topic, 2);}}}return Arrays.asList(adapter.getTopic());}/*** 移除一个监听的topic*/public List<String> removeListenTopic(String topic) {if (adapter == null) {adapter = new MqttPahoMessageDrivenChannelAdapter(IdUtil.simpleUUID() + "_inbound1", mqttClientFactory(),defaultTopic);}List<String> listTopic = Arrays.asList(adapter.getTopic());if (listTopic.contains(topic)) {adapter.removeTopic(topic);}return Arrays.asList(adapter.getTopic());}/*** MQTT消息处理器(消费者)* **/@Bean@ServiceActivator(inputChannel = "mqttInputChannel")public MessageHandler handler() {return mqttReceiveHandle::handle;}}
/*** 第四步: MqttGateway发送消息类*/import org.springframework.integration.annotation.MessagingGateway;import org.springframework.integration.mqtt.support.MqttHeaders;import org.springframework.messaging.handler.annotation.Header;/*** mqtt发送消息* (defaultRequestChannel = "mqttOutboundChannel" 对应config配置)* **/@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")public interface MqttGateway {/*** 发送信息到MQTT服务器** @param data 发送的文本*/void sendToMqtt(String data);/*** 发送信息到MQTT服务器** @param topic 主题* @param payload 消息主体*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,String payload);/*** 发送信息到MQTT服务器** @param topic 主题* @param qos 对消息处理的几种机制。* 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。* 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。* 2 多了一次去重的动作,确保订阅者收到的消息有一次。* @param payload 消息主体*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,@Header(MqttHeaders.QOS) int qos,String payload);}
/*** 第五步: mqtt客户端消息处理*/import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import lombok.extern.slf4j.Slf4j;import org.springframework.integration.mqtt.support.MqttHeaders;import org.springframework.messaging.Message;import org.springframework.stereotype.Component;/*** mqtt客户端消息处理类* **/@Slf4j@Componentpublic class MqttReceiveHandle {public void handle(Message<?> message){log.info("主题:{},QOS:{},消息接收到的数据:{}", message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC), message.getHeaders().get(MqttHeaders.RECEIVED_QOS), message.getPayload());byte[] payLoadByte = (byte[]) message.getPayload();String payLoad = new String(payLoadByte);System.out.println("mqtt返回的信息有:" +JSON.toJSONString(payLoad));JSONObject jsonMessage = JSONObject.parseObject(payLoad);// todo: 当获取到返回信息后,继续往下执行不同返回的不同逻辑}}
在对具体的回调函数中,继续处理业务逻辑的时候:使用注解形式来获取对象和Service(如@Autowired、@Service),直接使用注解会报java.lang.NullException错误
解决方案: 通过上下文的方式,获取进行业务处理的service!
/*** 第六步: 写一个方法,用来获取上下文*/import org.springframework.beans.BeansException;import org.springframework.context.ApplicationContext;import org.springframework.context.ApplicationContextAware;import org.springframework.stereotype.Component;@Componentpublic class GetUserDefinedServiceUtil implements ApplicationContextAware{private static ApplicationContext applicationContext = null;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {if (null == GetUserDefinedServiceUtil.applicationContext) {GetUserDefinedServiceUtil.applicationContext = applicationContext;}}//获取applicationContextpublic static ApplicationContext getApplicationContext() {return applicationContext;}//通过class获取Bean.public static <T> T getBean(Class<T> clazz){return getApplicationContext().getBean(clazz);}//通过name,以及Clazz返回指定的Beanpublic static <T> T getBean(String name,Class<T> clazz){return getApplicationContext().getBean(name, clazz);}}
/*** 在业务中,取出service对象*/xxxxService xxxxService = GetUserDefinedServiceUtil.getBean(xxxxService.class);
