其中 pom.xml 文件的依赖配置、基础连接配置、 application.properties 文件的 MQTT 服务器配置同前文一样。
一、实现 MQTT 消息的订阅
动态监听topic没有实现!
package com.gyz.mqtt.config;import org.apache.commons.lang3.StringUtils;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.integration.annotation.ServiceActivator;import org.springframework.integration.channel.DirectChannel;import org.springframework.integration.core.MessageProducer;import org.springframework.integration.mqtt.core.MqttPahoClientFactory;import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;import org.springframework.messaging.Message;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.MessageHandler;import org.springframework.messaging.MessagingException;/*** MQTT配置,消费者** @author gong_yuzhuo*/@Configurationpublic class MqttReceiverConfig {/*** 订阅的bean名称*/public static final String CHANNEL_NAME_IN = "mqttInboundChannel";public MqttPahoMessageDrivenChannelAdapter adapter;/** 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息 */private static final byte[] WILL_DATA;static {WILL_DATA = "offline".getBytes();}@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Value("${mqtt.host}")private String url;@Value("${mqtt.receiver.clientId}")private String clientId;@Value("${mqtt.receiver.defaultTopic}")private String defaultTopic;/*** MQTT信息通道(消费者)*/@Bean(name = CHANNEL_NAME_IN)public MessageChannel mqttInboundChannel() {return new DirectChannel();}/*** MQTT消息订阅绑定(消费者)*/@Beanpublic MessageProducer inbound(MqttPahoClientFactory factory) {// 可以同时消费(订阅)多个Topicadapter = new MqttPahoMessageDrivenChannelAdapter(clientId, factory,StringUtils.split(defaultTopic, ","));adapter.setCompletionTimeout(5000);adapter.setConverter(new DefaultPahoMessageConverter());adapter.setQos(1);// 设置订阅通道adapter.setOutputChannel(mqttInboundChannel());return adapter;}/*** MQTT消息处理器(消费者)*/@Bean@ServiceActivator(inputChannel = CHANNEL_NAME_IN)public MessageHandler handler() {return new MessageHandler() {@Overridepublic void handleMessage(Message<?> message) throws MessagingException {String topic = message.getHeaders().get("mqtt_receivedTopic").toString();String msg = message.getPayload().toString();System.out.println("\n--------------------START-------------------\n" +"接收到订阅消息:\ntopic:" + topic + "\nmessage:" + msg +"\n---------------------END--------------------");}};}}
二、测试运行
项目启动后,我们使用 MQTTBox 对“hangge”这个主题,发送一条消息:

可以看到 SprinBoot 项目这边成功接收到消息并打印出来:

发送“hello”主题的消息:

可以看到“hello”主题消息也已被接收成功

注意主题配置

