1.Maven依赖
<!--rabbitmq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.RabbitConfig
package com.xlwy.fkudp.rabbitmq;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * ClassName: mq配置 仅支持基于 topic 模式 * @author WHN * @date 2020/11/9 16:14 */@Configurationpublic class RabbitConfig { private final Logger log = LoggerFactory.getLogger(this.getClass()); /** * 注入rabbitmq连接池 * @author WHN * @date 2020/11/9 16:14 */ @Autowired private CachingConnectionFactory connectionFactory; /** * 交换机信息 */ public static final String EXCHANGE_TOPIC_PERSON = "exchange_topic_person"; /** * 队列1 */ public static final String QUEUE_ELECTRIC = "QUEUE_ELECTRIC"; /** * 队列秘钥规则 */ public static final String ROUTINGKEY_ELECTRIC = "electric.#"; /** * @Function 声明交换机 */ @Bean(EXCHANGE_TOPIC_PERSON) public Exchange exchangeTopicInform() { return ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_PERSON).durable(true).build(); } /** * 获取队列 1个交换机可以绑定多个消息队列,也就是消息通过一个交换机,可以分发到不同的队列当中去。 */ @Bean(QUEUE_ELECTRIC) public Queue queueElectric() { return new Queue(QUEUE_ELECTRIC, true); } /** * @Function 交换机与队列绑定 */ @Bean public Binding queueElectricBinding(@Qualifier(QUEUE_ELECTRIC) Queue queue, @Qualifier(EXCHANGE_TOPIC_PERSON) Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(RabbitConfig.ROUTINGKEY_ELECTRIC).noargs(); } @Bean public RabbitTemplate rabbitTemplate() { connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { log.info("消息推送到交换器成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause); } else { log.info("消息推送到交换器失败:correlationData({}),ack({}),cause({})", correlationData, ack, cause); } } }); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息推送到队列丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message); } }); return rabbitTemplate; }}
3.MqProduct生产者
package com.xlwy.fkudp.rabbitmq;import cn.hutool.json.JSONObject;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;/** * 生产者 * @author WHN * @date 2020/11/9 16:16 */@Componentpublic class MqProduct { @Autowired private RabbitTemplate rabbitTemplate; /** * 发送队列1设备上发数据 * @author WHN * @date 2020/11/9 16:42 */ public void sendElectric(String type, String electricType, JSONObject json) { JSONObject jsonObject = new JSONObject(); jsonObject.putOpt("type", type); jsonObject.putOpt("electricType", electricType); jsonObject.putOpt("data", json); this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TOPIC_PERSON,"electric.pushMsg", jsonObject.toJSONString(0)); }}
4.MqListener监听消息类
package com.xlwy.fkudp.rabbitmq;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;/** * mq监听 * @author WHN * @date 2020/11/9 16:16 */@Componentpublic class MqListener { /** * 队列1的数据 */ @RabbitListener(queues = RabbitConfig.QUEUE_ELECTRIC) @RabbitHandler public void process(String message) { System.out.println("RabbitMq 监听获取数据 : " + message); }}
5.向订阅通道中推送消息
public class AnalyzeFubangService implements Analyze{ @Autowired private MqProduct mqProduct; public void analyzeTheHeader(String hex) { mqProduct.sendElectric("02", "队列1模拟量变动",JSONUtil.parseObj(hex, false, true)); }}
6.application.yml中配置
spring: rabbitmq: host: 服务器ip port: 服务器端口 username: 用户名 password: 密码 template: mandatory: true