概念、基本结构

是目前非常热门的一款开源消息中间件,互联网行业、传统行业都广泛使用,每秒处理几万条消息

整体逻辑架构

image.png

exchange类型

  1. fanout 把所有发送到该交换器的消息路由到所有与该交换器绑定的队列
  2. direct 把消息路由到BindingKey和RoutingKey完全匹配的队列
  3. topic 消息路由到BindingKey和RoutingKey基于通配符匹配的队列,”*”匹配一个单词,”#”匹配多个单词
  4. headers 一般不使用,性能差

安装和配置

原生案例

Spring整合rabbitmq

高级特性

消息可靠性

保证措施:

  1. 客户端代码中的异常捕获,包括生产者和消费者
  2. AMQP/RabbitMQ的事务机制
  3. 发送端确认机制
  4. 消息持久化机制
  5. Broker端的高可用集群
  6. 消费者确认机制
  7. 消费端限流
  8. 消息幂等性

可靠性分析

TTL机制

Time to Live 的简称,即过期时间

设置消息的TTL方式:

  1. 通过Queue属性设置,队列中所有消息都有相同的过期时间
  2. 对消息自身进行单独设置,每条消息的TTL 可以不同

如果两种方法一起使用,则消息的TTL 以两者之间较小数值为准

原生案例

  1. // 创建队列(实际上使用的是AMQP default这个direct类型的交换器)
  2. // 设置队列属性
  3. Map<String, Object> arguments = new HashMap<>();
  4. // 设置队列的TTL
  5. arguments.put("x-message-ttl", 30000);
  6. // 设置队列的空闲存活时间(如该队列根本没有消费者,一直没有使用,队列可以存活多久)
  7. arguments.put("x-expires", 10000);
  8. channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
  9. String message = "Hello World!" ;
  10. channel.basicPublish("",
  11. QUEUE_NAME,
  12. new AMQP.BasicProperties().builder().expiration("30000").build(),
  13. message.getBytes());

死信队列

死信交换机和死信队列其实就是普通的交换机和队列,只是通过特殊的参数(x-dead-letter-exchange,x-dead-letter-routing-key),其他普通队列与死信交换机进行了绑定而已。

消息变成死信的情况:

  1. 消息被拒绝(Basic.Reject/Basic.Nack),并且设置requeue参数为false;
  2. 消息过期;
  3. 队列达到最大长度。

案例

  1. @Configuration
  2. public class RabbitConfig {
  3. @Bean
  4. public Queue queue() {
  5. Map<String, Object> props = new HashMap<>();
  6. // 消息的生存时间 10s props.put("x-message-ttl", 10000);
  7. // 设置该队列所关联的死信交换器(当队列消息TTL到期后依然没有消费,则加入死信队列)
  8. props.put("x-dead-letter-exchange", "ex.go.dlx");
  9. // 设置该队列所关联的死信交换器的routingKey,如果没有特殊指定,使用原 队列的routingKey
  10. props.put("x-dead-letter-routing-key", "go.dlx");
  11. Queue queue = new Queue("q.go", true, false, false, props);
  12. return queue;
  13. }
  14. @Bean
  15. public Queue queueDlx() {
  16. Queue queue = new Queue("q.go.dlx", true, false, false);
  17. return queue;
  18. }
  19. @Bean
  20. public Exchange exchange() {
  21. DirectExchange exchange = new DirectExchange("ex.go", true, false, null);
  22. return exchange;
  23. }
  24. /**
  25. * 死信交换器
  26. * @return
  27. */
  28. @Bean
  29. public Exchange exchangeDlx() {
  30. DirectExchange exchange = new DirectExchange("ex.go.dlx", true, false, null);
  31. return exchange;
  32. }
  33. @Bean
  34. public Binding binding() {
  35. return BindingBuilder.bind(queue()).to(exchange()).with("go").noargs();
  36. }
  37. /**
  38. * 死信交换器绑定死信队列
  39. *
  40. * @return
  41. */
  42. @Bean
  43. public Binding bindingDlx() {
  44. return BindingBuilder.bind(queueDlx()).to(exchangeDlx()).with("go.dlx").noargs();
  45. }

延迟队列

考虑通过指定消息过期时间,通过死信队列在死信队列中消费消息