概念、基本结构
是目前非常热门的一款开源消息中间件,互联网行业、传统行业都广泛使用,每秒处理几万条消息
整体逻辑架构
exchange类型
- fanout 把所有发送到该交换器的消息路由到所有与该交换器绑定的队列
- direct 把消息路由到BindingKey和RoutingKey完全匹配的队列
- topic 消息路由到BindingKey和RoutingKey基于通配符匹配的队列,”*”匹配一个单词,”#”匹配多个单词
- headers 一般不使用,性能差
安装和配置
原生案例
Spring整合rabbitmq
高级特性
消息可靠性
保证措施:
- 客户端代码中的异常捕获,包括生产者和消费者
- AMQP/RabbitMQ的事务机制
- 发送端确认机制
- 消息持久化机制
- Broker端的高可用集群
- 消费者确认机制
- 消费端限流
- 消息幂等性
可靠性分析
TTL机制
Time to Live 的简称,即过期时间
设置消息的TTL方式:
- 通过Queue属性设置,队列中所有消息都有相同的过期时间
- 对消息自身进行单独设置,每条消息的TTL 可以不同
如果两种方法一起使用,则消息的TTL 以两者之间较小数值为准
原生案例
// 创建队列(实际上使用的是AMQP default这个direct类型的交换器)
// 设置队列属性
Map<String, Object> arguments = new HashMap<>();
// 设置队列的TTL
arguments.put("x-message-ttl", 30000);
// 设置队列的空闲存活时间(如该队列根本没有消费者,一直没有使用,队列可以存活多久)
arguments.put("x-expires", 10000);
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
String message = "Hello World!" ;
channel.basicPublish("",
QUEUE_NAME,
new AMQP.BasicProperties().builder().expiration("30000").build(),
message.getBytes());
死信队列
死信交换机和死信队列其实就是普通的交换机和队列,只是通过特殊的参数(x-dead-letter-exchange,x-dead-letter-routing-key),其他普通队列与死信交换机进行了绑定而已。
消息变成死信的情况:
- 消息被拒绝(Basic.Reject/Basic.Nack),并且设置requeue参数为false;
- 消息过期;
- 队列达到最大长度。
案例
@Configuration
public class RabbitConfig {
@Bean
public Queue queue() {
Map<String, Object> props = new HashMap<>();
// 消息的生存时间 10s props.put("x-message-ttl", 10000);
// 设置该队列所关联的死信交换器(当队列消息TTL到期后依然没有消费,则加入死信队列)
props.put("x-dead-letter-exchange", "ex.go.dlx");
// 设置该队列所关联的死信交换器的routingKey,如果没有特殊指定,使用原 队列的routingKey
props.put("x-dead-letter-routing-key", "go.dlx");
Queue queue = new Queue("q.go", true, false, false, props);
return queue;
}
@Bean
public Queue queueDlx() {
Queue queue = new Queue("q.go.dlx", true, false, false);
return queue;
}
@Bean
public Exchange exchange() {
DirectExchange exchange = new DirectExchange("ex.go", true, false, null);
return exchange;
}
/**
* 死信交换器
* @return
*/
@Bean
public Exchange exchangeDlx() {
DirectExchange exchange = new DirectExchange("ex.go.dlx", true, false, null);
return exchange;
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with("go").noargs();
}
/**
* 死信交换器绑定死信队列
*
* @return
*/
@Bean
public Binding bindingDlx() {
return BindingBuilder.bind(queueDlx()).to(exchangeDlx()).with("go.dlx").noargs();
}
延迟队列
考虑通过指定消息过期时间,通过死信队列在死信队列中消费消息