1.Maven依赖
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.RabbitConfig
package com.xlwy.fkudp.rabbitmq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* ClassName: mq配置 仅支持基于 topic 模式
* @author WHN
* @date 2020/11/9 16:14
*/
@Configuration
public class RabbitConfig {
private final Logger log = LoggerFactory.getLogger(this.getClass());
/**
* 注入rabbitmq连接池
* @author WHN
* @date 2020/11/9 16:14
*/
@Autowired
private CachingConnectionFactory connectionFactory;
/**
* 交换机信息
*/
public static final String EXCHANGE_TOPIC_PERSON = "exchange_topic_person";
/**
* 队列1
*/
public static final String QUEUE_ELECTRIC = "QUEUE_ELECTRIC";
/**
* 队列秘钥规则
*/
public static final String ROUTINGKEY_ELECTRIC = "electric.#";
/**
* @Function 声明交换机
*/
@Bean(EXCHANGE_TOPIC_PERSON)
public Exchange exchangeTopicInform() {
return ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_PERSON).durable(true).build();
}
/**
* 获取队列 1个交换机可以绑定多个消息队列,也就是消息通过一个交换机,可以分发到不同的队列当中去。
*/
@Bean(QUEUE_ELECTRIC)
public Queue queueElectric() {
return new Queue(QUEUE_ELECTRIC, true);
}
/**
* @Function 交换机与队列绑定
*/
@Bean
public Binding queueElectricBinding(@Qualifier(QUEUE_ELECTRIC) Queue queue,
@Qualifier(EXCHANGE_TOPIC_PERSON) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(RabbitConfig.ROUTINGKEY_ELECTRIC).noargs();
}
@Bean
public RabbitTemplate rabbitTemplate() {
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("消息推送到交换器成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
} else {
log.info("消息推送到交换器失败:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
}
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息推送到队列丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
}
});
return rabbitTemplate;
}
}
3.MqProduct生产者
package com.xlwy.fkudp.rabbitmq;
import cn.hutool.json.JSONObject;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 生产者
* @author WHN
* @date 2020/11/9 16:16
*/
@Component
public class MqProduct {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送队列1设备上发数据
* @author WHN
* @date 2020/11/9 16:42
*/
public void sendElectric(String type, String electricType, JSONObject json) {
JSONObject jsonObject = new JSONObject();
jsonObject.putOpt("type", type);
jsonObject.putOpt("electricType", electricType);
jsonObject.putOpt("data", json);
this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TOPIC_PERSON,"electric.pushMsg", jsonObject.toJSONString(0));
}
}
4.MqListener监听消息类
package com.xlwy.fkudp.rabbitmq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* mq监听
* @author WHN
* @date 2020/11/9 16:16
*/
@Component
public class MqListener {
/**
* 队列1的数据
*/
@RabbitListener(queues = RabbitConfig.QUEUE_ELECTRIC)
@RabbitHandler
public void process(String message) {
System.out.println("RabbitMq 监听获取数据 : " + message);
}
}
5.向订阅通道中推送消息
public class AnalyzeFubangService implements Analyze{
@Autowired
private MqProduct mqProduct;
public void analyzeTheHeader(String hex) {
mqProduct.sendElectric("02", "队列1模拟量变动",JSONUtil.parseObj(hex, false, true));
}
}
6.application.yml中配置
spring:
rabbitmq:
host: 服务器ip
port: 服务器端口
username: 用户名
password: 密码
template:
mandatory: true