概念、基本结构
是目前非常热门的一款开源消息中间件,互联网行业、传统行业都广泛使用,每秒处理几万条消息
整体逻辑架构

exchange类型
- fanout 把所有发送到该交换器的消息路由到所有与该交换器绑定的队列
- direct 把消息路由到BindingKey和RoutingKey完全匹配的队列
- topic 消息路由到BindingKey和RoutingKey基于通配符匹配的队列,”*”匹配一个单词,”#”匹配多个单词
- headers 一般不使用,性能差
安装和配置
原生案例
Spring整合rabbitmq
高级特性
消息可靠性
保证措施:
- 客户端代码中的异常捕获,包括生产者和消费者
- AMQP/RabbitMQ的事务机制
- 发送端确认机制
- 消息持久化机制
- Broker端的高可用集群
- 消费者确认机制
- 消费端限流
- 消息幂等性
可靠性分析
TTL机制
Time to Live 的简称,即过期时间
设置消息的TTL方式:
- 通过Queue属性设置,队列中所有消息都有相同的过期时间
- 对消息自身进行单独设置,每条消息的TTL 可以不同
如果两种方法一起使用,则消息的TTL 以两者之间较小数值为准
原生案例
// 创建队列(实际上使用的是AMQP default这个direct类型的交换器)// 设置队列属性Map<String, Object> arguments = new HashMap<>();// 设置队列的TTLarguments.put("x-message-ttl", 30000);// 设置队列的空闲存活时间(如该队列根本没有消费者,一直没有使用,队列可以存活多久)arguments.put("x-expires", 10000);channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);String message = "Hello World!" ;channel.basicPublish("",QUEUE_NAME,new AMQP.BasicProperties().builder().expiration("30000").build(),message.getBytes());
死信队列
死信交换机和死信队列其实就是普通的交换机和队列,只是通过特殊的参数(x-dead-letter-exchange,x-dead-letter-routing-key),其他普通队列与死信交换机进行了绑定而已。
消息变成死信的情况:
- 消息被拒绝(Basic.Reject/Basic.Nack),并且设置requeue参数为false;
- 消息过期;
- 队列达到最大长度。
案例
@Configurationpublic class RabbitConfig {@Beanpublic Queue queue() {Map<String, Object> props = new HashMap<>();// 消息的生存时间 10s props.put("x-message-ttl", 10000);// 设置该队列所关联的死信交换器(当队列消息TTL到期后依然没有消费,则加入死信队列)props.put("x-dead-letter-exchange", "ex.go.dlx");// 设置该队列所关联的死信交换器的routingKey,如果没有特殊指定,使用原 队列的routingKeyprops.put("x-dead-letter-routing-key", "go.dlx");Queue queue = new Queue("q.go", true, false, false, props);return queue;}@Beanpublic Queue queueDlx() {Queue queue = new Queue("q.go.dlx", true, false, false);return queue;}@Beanpublic Exchange exchange() {DirectExchange exchange = new DirectExchange("ex.go", true, false, null);return exchange;}/*** 死信交换器* @return*/@Beanpublic Exchange exchangeDlx() {DirectExchange exchange = new DirectExchange("ex.go.dlx", true, false, null);return exchange;}@Beanpublic Binding binding() {return BindingBuilder.bind(queue()).to(exchange()).with("go").noargs();}/*** 死信交换器绑定死信队列** @return*/@Beanpublic Binding bindingDlx() {return BindingBuilder.bind(queueDlx()).to(exchangeDlx()).with("go.dlx").noargs();}
延迟队列
考虑通过指定消息过期时间,通过死信队列在死信队列中消费消息
