一、概述
RabbitMQ是一个被广泛使用的开源消息队列。它是轻量级且易于部署的,它能支持多种消息协议。RabbitMQ可以部署在分布式和联合配置中,以满足高规模、高可用性的需求。
二、配置
依赖:
<!--消息队列相关依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置:
spring:
rabbitmq:
host: localhost # rabbitmq的连接地址
port: 5672 # rabbitmq的连接端口号
virtual-host: /mall # rabbitmq的虚拟host
username: mall # rabbitmq的用户名
password: mall # rabbitmq的密码
publisher-confirms: true #如果对异步消息需要回调必须设置为true
三、Direct Exchange
Direct Exchange 是 RabbitMQ 默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列。
第一个 X - Q1 就有一个 binding key,名字为 orange; X - Q2 就有 2 个 binding key,名字为 black 和 green。当消息中的 路由键 和 这个 binding key 对应上的时候,那么就知道了该消息去到哪一个队列中。
Ps:为什么 X 到 Q2 要有 black,green,2个 binding key呢,一个不就行了吗? - 这个主要是因为可能又有 Q3,而Q3只接受 black 的信息,而Q2不仅接受black 的信息,还接受 green 的信息。
队列配置
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean
public Queue Queue() {
return new Queue("hello");
}
@Bean
public Queue one2manyQueue() {
return new Queue("one2many");
}
@Bean
public Queue many2manyQueue() {
return new Queue("many2many");
}
}
以上, 定义了 一对一、一对多、多对多 的队列
一对一
配置一个发送者和一个接收者。
发送者:
@Component
public class HelloSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "hello " + new Date();
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("hello", context);
}
}
接收者:
@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver : " + hello);
}
}
:::info 注意:接收者注解中的 queues 一定要与发送者 convertAndSend 中指定的名字一致, 并且需要在 RabbitConfig 中进行定义 :::
在控制器中测试:
@Autowired
private HelloSender helloSender;
@RequestMapping("/one2one")
public String one2one () {
helloSender.send();
return "ok";
}
为什么要在控制器中测试而不在单元测试中测试呢? 因为单元测试中只能看到发送者的信息, 还没等接收者接收完毕程序就挂了…
一对多
无非就是定义多个接收者:
@Component
@RabbitListener(queues = "one2many")
public class One2ManyReceiver1 {
@RabbitHandler
public void process(String msg) {
System.out.println("Receiver 1: " + msg);
}
}
@Component
@RabbitListener(queues = "one2many")
public class One2ManyReceiver2 {
@RabbitHandler
public void process(String msg) {
System.out.println("Receiver 2: " + msg);
}
}
发送者跟前面的一致:
@Component
public class One2ManySender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(int i) {
String context = "queue " + i;
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("one2many", context);
}
}
在控制器中测试:
@Autowired
private One2ManySender one2ManySender;
@RequestMapping("/one2many")
public String one2many () {
for (int i=0;i<100;i++){
one2ManySender.send(i);
}
return "ok";
}
可以看到,打印出的结果基本上是 Receiver 1
和 Receiver 2
交替接收的
多对多
增加一个发送者:
@Component
public class Many2ManySender1 {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(int i) {
String context = "queue " + i;
System.out.println("Sender1 : " + context);
this.rabbitTemplate.convertAndSend("many2many", context);
}
}
@Component
public class Many2ManySender2 {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(int i) {
String context = "queue " + i;
System.out.println("Sender2 : " + context);
this.rabbitTemplate.convertAndSend("many2many", context);
}
}
接收者仍然为2个, 不赘述了, 注意将注解中的 queues 改为 ‘many2many’ 即可
控制器中测试:
@Autowired
private Many2ManySender1 many2ManySender1;
@Autowired
private Many2ManySender2 many2ManySender2;
@RequestMapping("/many2many")
public String many2many () {
for (int i=0;i<100;i++){
many2ManySender1.send(i);
many2ManySender2.send(i);
}
return "ok";
}
四、Topic Exchange
topic 是 RabbitMQ 中最灵活的一种方式,可以根据 routing_key 自由的绑定不同的队列
Topic Exchange 转发消息主要是根据通配符。 在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息。
在这种交换机模式下:
- 路由键必须是一串字符,用句号(
.
) 隔开,比如说 agreements.us,或者 agreements.eu.stockholm 等。 - 路由模式必须包含一个 星号(
*
),主要用于匹配路由键指定位置的一个单词,比如说,一个路由模式是这样子:agreements..b.*,那么就只能匹配路由键是这样子的:第一个单词是 agreements,第四个单词是 b。 井号(#)就表示相当于一个或者多个单词,例如一个匹配模式是 agreements.eu.berlin.#,那么,以agreements.eu.berlin 开头的路由键都是可以的。
具体代码发送的时候还是一样,第一个参数表示交换机,第二个参数表示 routing key,第三个参数即消息。如下:
rabbitTemplate.convertAndSend("testTopicExchange", "key1.a.c.key2", "this is RabbitMQ!");
topic 和 direct 类似, 只是匹配上支持了”模式”, 在”点分”的 routing_key 形式中, 可以使用两个通配符:
*
表示一个词.#
表示零个或多个词.
Topic 相关配置:
@Configuration
public class TopicRabbitConfig {
final static String message = "topic.message";
final static String messages = "topic.messages";
@Bean
public Queue queueMessage() {
return new Queue(TopicRabbitConfig.message);
}
@Bean
public Queue queueMessages() {
return new Queue(TopicRabbitConfig.messages);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("exchange");
}
@Bean
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}
@Bean
Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
}
使用 queueMessages 同时匹配两个队列 (topic.#
代表topic下的所有队列),queueMessage 只匹配 topic.message
队列
发送者:
@Component
public class TopicSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send1() {
String context = "hi, i am message 1";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("exchange", "topic.message", context);
}
public void send2() {
String context = "hi, i am messages 2";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("exchange", "topic.messages", context);
}
}
接收者1:
@Component
@RabbitListener(queues = "topic.message")
public class TopicReceiver {
@RabbitHandler
public void process(String message) {
System.out.println("Topic Receiver1 : " + message);
}
}
接收者2:
@Component
@RabbitListener(queues = "topic.messages")
public class TopicReceiver2 {
@RabbitHandler
public void process(String message) {
System.out.println("Topic Receiver2 : " + message);
}
}
控制器中测试:
@Component
public class TopicSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send1() {
String context = "hi, i am message 1";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("exchange", "topic.message", context);
}
public void send2() {
String context = "hi, i am messages 2";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("exchange", "topic.messages", context);
}
}
topic1
打印:
Sender : hi, i am message 1
Topic Receiver2 : hi, i am message 1
Topic Receiver1 : hi, i am message 1
topic2
打印:
Sender : hi, i am messages 2
Topic Receiver2 : hi, i am messages 2
发送 send1 会匹配到 topic.#
和 topic.message
两个Receiver都可以收到消息,发送send2只有 topic.#
可以匹配所有只有Receiver2监听到消息
当然, 由于 topic.#
可以匹配所有 topic
下的队列, 换成下面的形式, 也只有Receiver2能够接收得到:
public void send() {
String context = "hi, i am message all";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("topicExchange", "topic.1", context);
}
五、Fanout Exchange
Fanout 就是我们熟悉的广播模式或者订阅模式,给 Fanout 交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
Fanout 相关配置
@Configuration
public class FanoutRabbitConfig {
@Bean
public Queue AMessage() {
return new Queue("fanout.A");
}
@Bean
public Queue BMessage() {
return new Queue("fanout.B");
}
@Bean
public Queue CMessage() {
return new Queue("fanout.C");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}
}
这里使用了 A、B、C 三个队列绑定到 Fanout 交换机上面,发送端的 routing_key 写任何字符都会被忽略:
@Component
public class FanoutSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "hi, fanout msg ";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("fanoutExchange", "", context);
}
}
三个接收者如下:
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {
@RabbitHandler
public void process(String message) {
System.out.println("fanout Receiver A: " + message);
}
}
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {
@RabbitHandler
public void process(String message) {
System.out.println("fanout Receiver B: " + message);
}
}
@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {
@RabbitHandler
public void process(String message) {
System.out.println("fanout Receiver C: " + message);
}
}
控制器中测试:
@RequestMapping("/fanout")
public String fanout () {
fanoutSender.send();
return "ok";
}
结果如下:
Sender : hi, fanout msg
...
fanout Receiver B: hi, fanout msg
fanout Receiver A : hi, fanout msg
fanout Receiver C: hi, fanout msg
结果说明,绑定到 fanout 交换机上面的队列都收到了消息
六、Headers Exchange
headers 也是根据规则匹配, 相较于 direct 和 topic 固定地使用 routing_key , headers 则是一个自定义匹配规则的类型. 在队列与交换器绑定时, 会设定一组键值对规则, 消息中也包括一组键值对( headers 属性), 当这些键值对有一对, 或全部匹配时, 消息被投送到对应队列.
七、对象的支持
Spring Boot 以及完美的支持对象的发送和接收,不需要格外的配置。
// 发送者
public void send(User user) {
System.out.println("Sender object: " + user.toString());
this.rabbitTemplate.convertAndSend("object", user);
}
...
// 接收者
@RabbitHandler
public void process(User user) {
System.out.println("Receiver object : " + user);
}
结果如下:
Sender object: User{name='neo', pass='123456'}
Receiver object : User{name='neo', pass='123456'}
八、实例:订单延迟失效消息
需求:用户下单后,生成订单信息,如果用户超过一定时间没有进行支付,则取消此订单。
添加配置类
首先创建一个消息队列枚举配置:
@Getter
@AllArgsConstructor
public enum QueueEnum {
// 消息通知队列
QUEUE_ORDER_CANCEL("mall.order.direct", "mall.order.cancel", "mall.order.cancel"),
// 消息通知ttl队列
QUEUE_TTL_ORDER_CANCEL("mall.order.direct.ttl", "mall.order.cancel.ttl", "mall.order.cancel.ttl");
private String exchange; // 交换机名称
private String name; // 队列名称
private String routeKey; // 路由键
}
再创建一个RabbitMQ的配置类:
package com.example.test.configuration;
import com.example.test.dto.QueueEnum;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 消息队列配置
*/
@Configuration
public class RabbitMqConfig {
/**
* 订单消息实际消费队列所绑定的交换机
*/
@Bean
DirectExchange orderDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange())
.durable(true)
.build();
}
/**
* 订单延迟队列所绑定的交换机
*/
@Bean
DirectExchange orderTtlDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange())
.durable(true)
.build();
}
/**
* 订单实际消费队列
*/
@Bean
public Queue orderQueue() {
return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName());
}
/**
* 订单延迟队列(死信队列)
*/
@Bean
public Queue orderTtlQueue() {
return QueueBuilder
.durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName())
.withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange())//到期后转发的交换机
.withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())//到期后转发的路由键
.build();
}
/**
* 将订单队列绑定到交换机
*/
@Bean
Binding orderBinding(DirectExchange orderDirect,Queue orderQueue){
return BindingBuilder
.bind(orderQueue)
.to(orderDirect)
.with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey());
}
/**
* 将订单延迟队列绑定到交换机
*/
@Bean
Binding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){
return BindingBuilder
.bind(orderTtlQueue)
.to(orderTtlDirect)
.with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey());
}
}
在RabbitMQ管理页面可以看到以下交换机和队列:
交换机及队列说明:
mall.order.direct(取消订单消息队列所绑定的交换机)
绑定的队列为mall.order.cancel,一旦有消息以mall.order.cancel为路由键发过来,会发送到此队列。mall.order.direct.ttl(订单延迟消息队列所绑定的交换机)
绑定的队列为mall.order.cancel.ttl,一旦有消息以mall.order.cancel.ttl为路由键发送过来,会转发到此队列,并在此队列保存一定时间,等到超时后会自动将消息发送到mall.order.cancel(取消订单消息消费队列)。
发送者与接收者
发送者:
@Component
public class CancelOrderSender {
private static Logger LOGGER = LoggerFactory.getLogger(CancelOrderSender.class);
@Autowired
private AmqpTemplate amqpTemplate;
public void sendMessage(Long orderId,final long delayTimes){
// 给延迟队列发送消息
amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 给消息设置延迟毫秒值
message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
return message;
}
});
LOGGER.info("send delay message orderId:{}",orderId);
}
}
接收者:
@Component
@RabbitListener(queues = "mall.order.cancel")
public class CancelOrderReceiver {
private static Logger LOGGER = LoggerFactory.getLogger(CancelOrderReceiver.class);
@Autowired
private OmsPortalOrderService portalOrderService;
@RabbitHandler
public void handle(Long orderId){
LOGGER.info("receive delay message orderId:{}",orderId);
// 执行取消订单的操作
portalOrderService.cancelOrder(orderId);
}
}
创建Service
public interface OmsPortalOrderService {
// 根据提交信息生成订单
@Transactional
CommonResult generateOrder(OrderParam orderParam);
// 取消单个超时订单
@Transactional
void cancelOrder(Long orderId);
}
服务的实现类:
@Service
public class OmsPortalOrderServiceImpl implements OmsPortalOrderService {
private static Logger LOGGER = LoggerFactory.getLogger(OmsPortalOrderServiceImpl.class);
@Autowired
private CancelOrderSender cancelOrderSender;
@Override
public CommonResult generateOrder(OrderParam orderParam) {
// todo 执行一系列下单操作
LOGGER.info("process generateOrder");
// 下单完成后开启一个延迟消息,用于当用户没有付款时取消订单(orderId应该在下单后生成)
sendDelayMessageCancelOrder(11L);
return CommonResult.success(null, "下单成功");
}
@Override
public void cancelOrder(Long orderId) {
// todo 执行一系列取消订单操作
LOGGER.info("process cancelOrder orderId:{}",orderId);
}
private void sendDelayMessageCancelOrder(Long orderId) {
// 获取订单超时时间,假设为60分钟(测试用的30秒)
long delayTimes = 30 * 1000;
// 发送延迟消息
cancelOrderSender.sendMessage(orderId, delayTimes);
}
}
其中OrderParam类为接收订单参数的dto:
@Data
public class OrderParam {
// 收货地址id
private Long memberReceiveAddressId;
// 优惠券id
private Long couponId;
// 使用的积分数
private Integer useIntegration;
// 支付方式
private Integer payType;
}
在控制器中使用
@Controller
@RequestMapping("/order")
public class OmsPortalOrderController {
@Autowired
private OmsPortalOrderService portalOrderService;
@RequestMapping(value = "/generateOrder", method = RequestMethod.POST)
@ResponseBody
public Object generateOrder(@RequestBody OrderParam orderParam) {
return portalOrderService.generateOrder(orderParam);
}
}
测试:
2020-03-27 16:37:24.763 INFO 7896 --- [io-10010-exec-4] c.e.t.s.impl.OmsPortalOrderServiceImpl : process generateOrder
2020-03-27 16:37:24.769 INFO 7896 --- [io-10010-exec-4] c.e.test.component.CancelOrderSender : send delay message orderId:11
2020-03-27 16:37:54.778 INFO 7896 --- [ntContainer#0-1] c.e.test.component.CancelOrderReceiver : receive delay message orderId:11
2020-03-27 16:37:54.778 INFO 7896 --- [ntContainer#0-1] c.e.t.s.impl.OmsPortalOrderServiceImpl : process cancelOrder orderId:11