其中 pom.xml 文件的依赖配置、基础连接配置、 application.properties 文件的 MQTT 服务器配置同前文一样。
一、实现 MQTT 消息的订阅
动态监听topic没有实现!
package com.gyz.mqtt.config;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
/**
* MQTT配置,消费者
*
* @author gong_yuzhuo
*/
@Configuration
public class MqttReceiverConfig {
/**
* 订阅的bean名称
*/
public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
public MqttPahoMessageDrivenChannelAdapter adapter;
/** 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息 */
private static final byte[] WILL_DATA;
static {
WILL_DATA = "offline".getBytes();
}
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.host}")
private String url;
@Value("${mqtt.receiver.clientId}")
private String clientId;
@Value("${mqtt.receiver.defaultTopic}")
private String defaultTopic;
/**
* MQTT信息通道(消费者)
*/
@Bean(name = CHANNEL_NAME_IN)
public MessageChannel mqttInboundChannel() {
return new DirectChannel();
}
/**
* MQTT消息订阅绑定(消费者)
*/
@Bean
public MessageProducer inbound(MqttPahoClientFactory factory) {
// 可以同时消费(订阅)多个Topic
adapter = new MqttPahoMessageDrivenChannelAdapter(
clientId, factory,
StringUtils.split(defaultTopic, ","));
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
// 设置订阅通道
adapter.setOutputChannel(mqttInboundChannel());
return adapter;
}
/**
* MQTT消息处理器(消费者)
*/
@Bean
@ServiceActivator(inputChannel = CHANNEL_NAME_IN)
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
String msg = message.getPayload().toString();
System.out.println("\n--------------------START-------------------\n" +
"接收到订阅消息:\ntopic:" + topic + "\nmessage:" + msg +
"\n---------------------END--------------------");
}
};
}
}
二、测试运行
项目启动后,我们使用 MQTTBox
对“hangge”这个主题,发送一条消息:
可以看到 SprinBoot 项目这边成功接收到消息并打印出来:
发送“hello”主题的消息:
可以看到“hello”主题消息也已被接收成功
注意主题配置