1、消息发送方(发布者)
1)添加maven依赖
<!-- springboot rabbitmq 使用--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2)配置RabbitMQ配置(application.properties)
#RabbitMQ 服务配置,不写默认走本地ipspring.rabbitmq.host=192.168.0.3spring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guest
3)创建发送方法
package com.example.provide.rabbitmq;import com.alibaba.fastjson.JSON;import com.example.provide.dto.UserDTO;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.util.List;import static org.springframework.integration.jmx.JmxHeaders.OPERATION_NAME;/** @auth yuesf* @data 2019/11/4*/@Componentpublic class Sender {private static final Logger logger = LoggerFactory.getLogger(Sender.class);@Autowiredprivate RabbitTemplate rabbitTemplate;public void storeInfoWindQSend(User user) {String message = JSON.toJSONString(user);logger.info("RabbitMQ: 发送消息={}", message);//指定交换机和路由的routingkeyrabbitTemplate.convertAndSend("demo.direct.exchange", "demo.direct", message);logger.info("发送消息完成 message={}", message);}}
2、消息接收方(订阅者)
1)添加maven依赖
<!-- springboot rabbitmq 使用--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2)配置RabbitMQ配置(application.properties)
#RabbitMQ 服务配置,不写默认走本地ipspring.rabbitmq.host=192.168.0.3spring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guest
3)声明RabbitMQ
示例中使用的直连交换机,声明一个交换机,一个队列。交换机与队列绑定关系
package com.example.consume.listener;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** Rabbitmq的配置示例* @auth yuesf* @data 2019/11/4*/@Configurationpublic class RabbitConfigDemo {/*** 示例交换机** @return*/@Beanpublic DirectExchange demoExchange() {return new DirectExchange("demo.direct.exchange", true, false);}/*** 示例队列** @return*/@Beanpublic Queue demoQueue() {return new Queue("demo.queue", true, false, false);}/*** 交换机与队列的绑定关系** @param demoQueue* @param demoExchange* @return*/@Beanpublic Binding bindingDemoQueue(@Qualifier("demoQueue") Queue demoQueue,@Qualifier("demoExchange") DirectExchange demoExchange) {return BindingBuilder.bind(demoQueue).to(demoExchange).with("demo.direct");}}
4)监听方法
package com.example.consume.listener;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;/** @auth yuesf* @data 2019/11/4*/@Componentpublic class RabbitDemoListener {private static final Logger logger = LoggerFactory.getLogger(RabbitDemoListener.class);@RabbitListener(queues = "demo.queue")public void goodsListenerProcess(Object message) {logger.info("接收消息 message={}", message);}}
3、手动ACK指定搭配其他注解使用方式(订阅者)
配置类:
//配置工厂类@Slf4j@Configurationpublic class RabbitConfig {@Bean(name = "oneConnectionFactory")public ConnectionFactory oneConnectionFactory(@Value("${spring.rabbitmq.host}") String host,@Value("${spring.rabbitmq.port}") int port,@Value("${spring.rabbitmq.username}") String username,@Value("${spring.rabbitmq.password}") String password) {log.info("mq队列连接信息 host={}, port={}, username={}", host, port, username);CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setPublisherConfirms(true);connectionFactory.setPublisherReturns(true);return connectionFactory;}@Bean(name = "oneFactory")public SimpleRabbitListenerContainerFactory oneFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,@Qualifier("oneConnectionFactory") ConnectionFactory oneConnectionFactory) {log.info("初始化比分 scoreFactory 实例");SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();//手动factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);configurer.configure(factory, oneConnectionFactory);return factory;}}
监听类:
//监听类@Componentpublic class Listener {//指定交换机、队列、路由routingKey//ignoreDeclarationExceptions已有交换机可能会有异常忽略掉,从源码上看不存在交换机和队列会自动创建@RabbitHandler@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "demo.queue", durable = "true"),exchange = @Exchange(value = "demoExchange",ignoreDeclarationExceptions = "true",type = ExchangeTypes.Direct),key = {"demo.direct"}),containerFactory = "oneFactory")public void listen(String msg, Channel channel, Message message) throws IOException {try {User user = JSON.parseObject(msg, User.class);// 确认收到消息,只确认当前消费者的一个消息收到channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {if (message.getMessageProperties().getRedelivered()) {log.info("消息已经回滚过,拒绝接收消息 : {}", msg);channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} else {log.info("消息即将返回队列重新处理 :{}", msg);//设置消息重新回到队列处理,requeue表示是否重新回到队列,true重新入队channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}log.error("消息消费异常, msg={},e={}", msg,e.getStackTrace());}}}
配置文件
spring:rabbitmq:host: mq.dev.qiuhui.comport: 5672username: adminpassword: TY111111listener:simple:#指定消息确认模式为手动确认acknowledge-mode: manual
