Spring Boot 集成 RabbitMQ 非常简单,如果只是简单的使用配置非常少,Spring Boot 提供了spring-boot-starter-amqp 项目对消息各种支持。
springboot集成rabbitmq
1.引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2.application.yml配置
spring:rabbitmq:addresses: localhost:5672virtual-host: /rabbitmqusername: adminpassword: admin
3.创建RabbitmqConfig.java
package com.itmck.springbootmq.config;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/*** 太阳当空照,花儿对我笑* <p>* Create by M ChangKe 2021/7/17 16:00**/@Configurationpublic class RabbitConfig {@Beanpublic Queue Queue() {/*** name:队列名* durable:是否持久化,默认false.持久化队列会被存储在磁盘上.当消息代理重启仍然存在* exclusive:默认false,只能被当前的连接使用.当前连接关闭后队列被删除,优先级高于durable* autoDelete:默认false.没有生产者或者消费者,队列会被删除* @return*/// return new Queue("hello");//源码可知当前和下面一样功能return new Queue("hello", true, false, false);}}
4.创建生产者
package com.itmck.springbootmq.compnent;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;/*** 太阳当空照,花儿对我笑* <p>* Create by M ChangKe 2021/7/17 16:01**/@Slf4j@Componentpublic class HelloSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send(String message) {log.info("发送消息:{}",message);this.rabbitTemplate.convertAndSend("hello",message);}}
5.创建消费端
package com.itmck.springbootmq.compnent;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;/*** 太阳当空照,花儿对我笑* <p>* Create by M ChangKe 2021/7/17 16:00**/@Slf4j@Component@RabbitListener(queues = "hello")public class HelloReceiver {@RabbitHandlerpublic void process(String message) {log.info("消费端接收消息: {}", message);}}
测试如下:
package com.itmck.springbootmq;import com.itmck.springbootmq.compnent.HelloSender;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;@SpringBootTestpublic class RabbitMqHelloTest {@Autowiredprivate HelloSender helloSender;@Testpublic void hello() {helloSender.send("hello rabbitmq");}}
直接启动测试类,因为客生产者以及消费者都在一个服务里面,所以打印如下:
通过api进行创建队列
上面是通过配置进行队列的创建,现在使用api进行队列的创建,这种方式可动态创建多个队列.
代码如下:
@Testpublic void create() {String queueName = "mck_queue";String dlExchangeName = "mckExchange";ConnectionFactory connectionFactory = rabbitTemplate.getConnectionFactory();try (Connection connection = connectionFactory.createConnection();Channel channel = connection.createChannel(true)) {log.debug("创建交换机");channel.exchangeDeclare(dlExchangeName, "direct", true);//创建交换机channel.queueDeclare(queueName, true, false, false, null);//创建队列channel.queueBind(queueName, dlExchangeName, "DL_KEY", null);//将交换机和队列绑定channel.close();log.info("队列创建完成");} catch (Exception e) {log.error("队列创建失败", e);}}
步骤总结:
- 通过RabbitTemplate获取连接管道Channel实例
- 通过管道创建交换机 channel.exchangeDeclare(dlExchangeName, “direct”, true);
- 声明创建队列 channel.queueDeclare(queueName, true, false, false, null);
- 将交换机和队列绑定 channel.queueBind(queueName, dlExchangeName, “DL_KEY”, null);
控制台如下:
通过rabbitmq控制台可以看到创建完成,交换机如下:

队列如下

通过控制台进行创建队列
rabbitmq提供了控制台进行队列的操作登录控制台 ip:15672 注意:默认控制台端口是15672 不同于配置文件的5672
手动创建队列

点击队列名
创建交换机

同理队列

上面就是rabbitmq几种创建队列的方式
几种常用队列的使用
下面图示: P:代表publisher生产者 c:代表consumer消费者 红色长条:代表队列 X:代表交换机
1.简单模式
简单模式就是使用简单的队列进行监听.一个生产者一个消费者,一个队列.如下:

//通过javaconfig方式进行创建队列@Configurationpublic class RabbitConfig {@Beanpublic Queue queue() {return new Queue("hello", true, false, false);}}//创建生产者@Slf4j@Componentpublic class HelloSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(String message) {log.info("发送消息:{}",message);this.rabbitTemplate.convertAndSend("hello",message);}}//创建消费者(监听者),进行消费@Slf4j@Component@RabbitListener(queues = "hello")public class HelloReceiver {@RabbitHandlerpublic void process(String message) {log.info("消费端接收消息: {}", message);}}
2.work queues模式
工作队列方式:在工人之间分配任务(竞争消费者模式),能者多劳动 默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个消费者。平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为轮询。

代码与上述一样,开启多实例消费端
模拟一个生产者,两个消费者.默认情况下,轮询进行消费

idea开启多实例,配置如下.操作是先进行一个实例启动.然后修改端口再次运行.


3.Publish/Subscribe
发布/订阅模式:同时将向多个消费者传递一条消息。这种模式被称为“发布/订阅”。或者说,发布一条消息同时被多个消费者进行监听. 扇形交换机,该交换机会把消息发送到所有binding到该交换机上的queue。这种是publisher/subcribe模式。用来做广播最好。
所有该exchagne上指定的routing-key都会被ignore掉。

/*** 太阳当空照,花儿对我笑* <p>* Create by M ChangKe 2021/7/17 16:01**/@Slf4j@Componentpublic class HelloSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(String message) {String context = "hi, fanout msg ";System.out.println("Sender : " + context);this.rabbitTemplate.convertAndSend("fanoutExchange","", context);}}@Slf4j@Component@RabbitListener(queues = "fanout.A")public class FanOutHelloReceiver1 {@RabbitHandlerpublic void process(String message) {log.info("消费端接收消息: {}", message);}}@Slf4j@Component@RabbitListener(queues = "fanout.B")public class FanOutHelloReceiver2 {@RabbitHandlerpublic void process(String message) {log.info("消费端接收消息: {}", message);}}
4.Routing路由方式
路由方式这里多了一个路由键,根据不同的路由键,可以将消息分别送到不同的队列,进行消费
Directed Exchange
路由键exchange,该交换机收到消息后会把消息发送到指定routing-key的queue中。那消息交换机是怎么知道的呢?其实,producer deliver消息的时候会把routing-key add到 message header中。routing-key只是一个messgae的attribute。
Default Exchange
这种是特殊的Direct Exchange,是rabbitmq内部默认的一个交换机。该交换机的name是空字符串,所有queue都默认binding 到该交换机上。所有binding到该交换机上的queue,routing-key都和queue的name一样。

@Configurationpublic class RabbitConfig {@Beanpublic Queue directQueue() {return new Queue("direct_queue");}@Beanpublic DirectExchange directExchange() {return new DirectExchange("direct_Exchange");}@Beanpublic Binding bindingDirectExchange() { //直连交换机return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct_key");}}
5.Topics主题模式
通配符交换机,exchange会把消息发送到一个或者多个满足通配符规则的routing-key的queue。其中表号匹配一个word,#匹配多个word和路径,路径之间通过.隔开。如满足a..c的routing-key有a.hello.c;满足#.hello的routing-key有a.b.c.helo。

@Configurationpublic class TopicRabbitConfig {final static String message = "topic.message";final static String messages = "topic.messages";@Beanpublic Queue queueMessage() {return new Queue(TopicRabbitConfig.message);}@Beanpublic Queue queueMessages() {return new Queue(TopicRabbitConfig.messages);}@BeanTopicExchange exchange() {return new TopicExchange("exchange");}@BeanBinding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");}@BeanBinding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");}}public void send1() {String context = "hi, i am message 1";System.out.println("Sender : " + context);this.rabbitTemplate.convertAndSend("exchange", "topic.message", context);}public void send2() {String context = "hi, i am messages 2";System.out.println("Sender : " + context);this.rabbitTemplate.convertAndSend("exchange", "topic.messages", context);}@Slf4j@Component@RabbitListener(queues = "topic.message")public class FanOutHelloReceiver1 {@RabbitHandlerpublic void process(String message) {log.info("topic.message消费端接收消息: {}", message);}}@Slf4j@Component@RabbitListener(queues = "topic.messages")public class FanOutHelloReceiver2 {@RabbitHandlerpublic void process(String message) {log.info("topic.messages 消费端接收消息: {}", message);}}
运行send1此时 topic.message与topic.messages都可以收到消息

运行send2 此时只有topic.messages可以收到消息

6.RPC模式
不怎么用,省略
7.Publisher Confirms
发布确认模式:包括,是否发送到交换机的确认,以及消息是否路由到队列确认.消费者的nack/ack机制
先看一下知乎帖子:
其实这个确认机制如果不让回原队列,可以配置死信队列进行使用.nack消息,进入死信队列. 如果是程序性bug导致nack,那程序修复后,可以使死信队列中的消息重新入队列,二次消费即可
application.yml
spring:rabbitmq:addresses: localhost:5672virtual-host: /rabbitmqusername: adminpassword: adminpublisher-confirms: truepublisher-returns: truetemplate:mandatory: true
RabbitmqConfig.java
使用javaconfig方式进行队列的创建与绑定
@Configuration@EnableRabbitpublic class RabbitmqConfig {//创建队列@Beanpublic Queue ttlQueue() {Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", "spring.direct.deadExchange");args.put("x-dead-letter-routing-key", "spring.deadRouting");//args.put("x-message-ttl",10000); // 设置消息过期时间无效果//args.put("x-expires",10000); // 设置队列过期时间无效果return new Queue("spring.direct.ttl.queue", true, false, false, args);}//创建死信队列@Beanpublic Queue deadQueue() {return new Queue("spring.direct.deadQueue");}//创建交换机@Beanpublic DirectExchange directExchange() {return new DirectExchange("spring.direct.exchange");}// 这里直接将死信队列以及正常消费队列都绑定到一个交换机上@Beanpublic Binding queueBinding1() {return BindingBuilder.bind(ttlQueue()).to(directExchange()).with("spring.routing");}@Beanpublic Binding queueBinding2() {return BindingBuilder.bind(deadQueue()).to(directExchange()).with("spring.deadRouting");}// 这里直接将死信队列以及正常消费队列都绑定到一个交换机上@Beanpublic RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {return new RabbitAdmin(connectionFactory);}/**** 这是生产端,使用RabbitTemplate连接** @param connectionFactory 连接工厂* @return*/@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate template = new RabbitTemplate(connectionFactory);template.setMessageConverter(new Jackson2JsonMessageConverter());return template;}/*** 这是配置在消费端,SimpleRabbitListenerContainerFactory进行连接* @param connectionFactory 连接工厂* @return*/@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());return factory;}}
生产者
@Component@Slf4jpublic class RabbitOrderSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;// @Autowired// private IBrokerMessageLogService brokerMessageLogService;@Autowiredprivate BrokerMessageLogServiceImpl brokerMessageLogService2;/*** 发送消息, 构建自定义对象消息** @param order*/public void sendOrder(TOrder order) {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);/*** 自定义对象,消息唯一ID,* 每个发送的消息都需要配备一个 CorrelationData 相关数据对象,* CorrelationData 对象内部只有一个 id 属性,用来表示当前消息唯一性。* 真实场景是需要做业务无关消息 ID 生成,同时要记录下这个 id 用来纠错和对账。*/CorrelationData correlationData = new CorrelationData(order.getMessageId());// 发送消息rabbitTemplate.convertAndSend(Constant.ORDER_EXCHANGE, Constant.ORDER_ROUTING, order, correlationData);log.info("消息已发送,messageId={}", order.getMessageId());}/*** 成功接收后回调, 确认消息被rabbitmq成功接收*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String s) {String messageId = correlationData.getId();if (ack) {// 如果成功接收了,就修改消息记录表的状态为success。BrokerMessageLog brokerMessageLog = new BrokerMessageLog();brokerMessageLog.setStatus(Constant.ORDER_SEND_SUCCESS);brokerMessageLog.setUpdateTime(LocalDateTime.now());QueryWrapper<BrokerMessageLog> queryWrapper = new QueryWrapper<BrokerMessageLog>();queryWrapper.eq("message_id", messageId);brokerMessageLogService2.update(brokerMessageLog, queryWrapper);log.info("消息发送成功, messageId={}", messageId);} else {//失败则进行具体的后续操作:重试 或者补偿等手段log.error("消息发送失败, messageId={}", messageId);}}/*** 失败后回调*/@Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {byte[] body = message.getBody();MessageProperties messageProperties = message.getMessageProperties();log.error("消息发送失败, body={}", new String(body));}}
消费端
package com.guoj.rabbitmq.receive;import com.guoj.rabbitmq.entity.TOrder;import com.guoj.rabbitmq.utils.Constant;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.annotation.*;import org.springframework.amqp.support.AmqpHeaders;import org.springframework.messaging.handler.annotation.Headers;import org.springframework.messaging.handler.annotation.Payload;import org.springframework.stereotype.Component;import java.util.Map;@Component@Slf4jpublic class OrderReceive {/*** 使用这个注解,可以自动在rabbitmq中创建出交换机、队列及routingKey的绑定关系。* 使用时,可以先启动消费方把这些关系都自动创建出来。* <p>* exchange 交换机* ---------------------* name:交换机名称* durable: 是否持久化* type: 消息模式(direct、topic、fanout、header)* ignoreDeclarationExceptions: 忽略声明异常* <p>* value 队列* --------------------* value: 哪个队列* durable:是否持久化* <p>* key 路由key*/@RabbitHandler@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(name = Constant.ORDER_EXCHANGE,durable = Constant.DURABLE,type = Constant.MESSAGE_TYPE,ignoreDeclarationExceptions = Constant.IGNORE_DECLARATION_EXCEPTIONS),value = @Queue(value = Constant.ORDER_QUEUE,durable = Constant.DURABLE),key = Constant.ORDER_CONSUMER_KEY))public void onOrderMessage(@Payload TOrder order, Channel channel,@Headers Map<String, Object> headers)throws Exception {// 消费消息log.info("-----------------------收到消息, 开始消费-----------------------");log.info("订单id: {}", order.getId());// 代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 ChannelLong deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);/*** 手工ACK,实际中一般使用手工签收,自动签收容易丢失消息。* 这行如果注释掉,也能收到消息,但是rabbitmq的消息还在,没有没签收。* 第二个参数是multiple:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息*/channel.basicAck(deliveryTag, false);/*** 手工nack** requeue 第三个参数:再次入队列,如果配合死信队列,这里设置为false.直接进入死信队列**/channel.basicNack(deliveryTag,false,false);}}
当channel.basicNack(deliveryTag,false,false); 时,消息会进入死信队列
常见问题
消息持久化
在生产环境中,我们需要考虑万一生产者挂了,消费者挂了,或者 rabbitmq 挂了怎么样。一般来说,如果生产者挂了或者消费者挂了,其实是没有影响,因为消息就在队列里面。那么万一 rabbitmq 挂了,之前在队列里面的消息怎么办,其实可以做消息持久化,RabbitMQ 会把信息保存在磁盘上。
做法是可以先从 Connection 对象中拿到一个 Channel 信道对象,然后再可以通过该对象设置 消息持久化。
生产者或者消费者断线重连
ACK 确认机制
每个Consumer可能需要一段时间才能处理完收到的数据。如果在这个过程中,Consumer出错了,异常退出了,而数据还没有处理完成,那么 非常不幸,这段数据就丢失了。因为我们采用no-ack的方式进行确认,也就是说,每次Consumer接到数据后,而不管是否处理完 成,RabbitMQ Server会立即把这个Message标记为完成,然后从queue中删除了。
如果一个Consumer异常退出了,它处理的数据能够被另外的Consumer处理,这样数据在这种情况下就不会丢失了(注意是这种情况下)。
为了保证数据不被丢失,RabbitMQ支持消息确认机制,即acknowledgments。为了保证数据能被正确处理而不仅仅是被Consumer收到,那么我们不能采用no-ack。而应该是在处理完数据后发送ack。
在处理数据后发送的ack,就是告诉RabbitMQ数据已经被接收,处理完成,RabbitMQ可以去安全的删除它了。
如果Consumer退出了但是没有发送ack,那么RabbitMQ就会把这个Message发送到下一个Consumer。这样就保证了在Consumer异常退出的情况下数据也不会丢失。
