依赖

  1. <dependency>
  2. <groupId>com.rabbitmq</groupId>
  3. <artifactId>amqp-client</artifactId>
  4. <version>5.7.3</version>
  5. </dependency>

Producer

  1. public class Producer {
  2. public static void main(String[] args) throws Exception {
  3. // 假数据框架 Facker
  4. Faker faker = new Faker(Locale.CHINA);
  5. ConnectionFactory connectionFactory = null;
  6. Connection connection = null;
  7. Channel channel = null;
  8. try {
  9. // 1. 创建 ConnectionFactory
  10. connectionFactory = new ConnectionFactory();
  11. connectionFactory.setHost("127.0.0.1");
  12. connectionFactory.setPort(5672);
  13. connectionFactory.setVirtualHost("/");
  14. // 2. 通过 ConnectionFactory 创建 Connect
  15. connection = connectionFactory.newConnection();
  16. // 3. 通过 Connect 创建一个 Channel
  17. channel = connection.createChannel();
  18. // 4. 通过 Channel 发送 msg
  19. String exchange = "test";
  20. String routingKey = "test01";
  21. AMQP.BasicProperties basicProperties = null;
  22. for (int i = 0; i < 10000; i++) {
  23. String msg = faker.name().fullName();
  24. byte[] msgBody = msg.getBytes(StandardCharsets.UTF_8);
  25. channel.basicPublish(
  26. exchange, // exchange
  27. routingKey, // routingKey
  28. basicProperties, // properties
  29. msgBody // body
  30. );
  31. }
  32. } finally {
  33. // 5. 关闭 Channel 和 Connection
  34. if (channel != null) {
  35. channel.close();
  36. }
  37. if (connection != null) {
  38. connection.close();
  39. }
  40. }
  41. }
  42. }

Confirm Listener

  • 在 channel 上开启确认模式

    1. channel.confirmSelect();
  • 在 channel 上添加监听

    1. // 监听回应
    2. channel.addConfirmListener(new ConfirmListener() {
    3. @Override
    4. public void handleAck(long deliveryTag, boolean multiple) throws IOException {
    5. System.out.println("ack!");
    6. }
    7. @Override
    8. public void handleNack(long deliveryTag, boolean multiple) throws IOException {
    9. System.out.println("no ack");
    10. }
    11. });
    • 监听成功和失败的返回结果,根据具体的结果对消息进行重新发送,或者记录日志

Return Listener

  • 接受路由不可达的响应
    • routingKey 不符合
  • 在发送时候需要设置 mandatory 为true
    • true 则监听器会接受到路由不可达的消息
    • 如果为默认的 false,则 broker 端会自动删除该消息
  1. // 方法声明
  2. void basicPublish(
  3. String exchange,
  4. String routingKey,
  5. boolean mandatory,
  6. BasicProperties props,
  7. byte[] body)throws IOException;
  • 添加 returnListener
  1. channel.addReturnListener(new ReturnListener() {
  2. @Override
  3. public void handleReturn(int replyCode,
  4. String replyText,
  5. String exchange,
  6. String routingKey,
  7. AMQP.BasicProperties properties,
  8. byte[] body) throws IOException {
  9. System.out.println("不可达");
  10. }
  11. });

Consumer

  1. public class FanoutConsumer {
  2. public static void main(String[] args) throws Exception {
  3. ConnectionFactory connectionFactory = null;
  4. Connection connection = null;
  5. Channel channel = null;
  6. try {
  7. // 1. 创建 ConnectionFactory
  8. connectionFactory = new ConnectionFactory();
  9. connectionFactory.setHost("127.0.0.1");
  10. connectionFactory.setPort(5672);
  11. connectionFactory.setVirtualHost("/");
  12. // 2. 通过 ConnectionFactory 创建 Connect
  13. connection = connectionFactory.newConnection();
  14. // 3. 通过 Connect 创建一个 Channel
  15. channel = connection.createChannel();
  16. // 4. 声明 channel 属性
  17. String exchangeName = "test";
  18. // 设置 exchange 的属性 fanout
  19. String exchangeType = "fanout";
  20. // fanout 属性下,routingKey 是无所谓的
  21. String routingKey = "wtf";
  22. String queueName = "test_fanout_queue";
  23. // queue 声明
  24. channel.queueDeclare(
  25. queueName,
  26. false, // durable 队列是否持久化,重启可用
  27. false, // exclusive 是否独占,仅一个连接可用
  28. false, // autoDelete 没有东西和该 queue 绑定,则会自动删除
  29. null); // arguments
  30. // exchange 声明
  31. channel.exchangeDeclare(
  32. exchangeName,
  33. exchangeType,
  34. true, // durable 是否持久化消息
  35. false, // autoDelete
  36. false, // internal
  37. null // arguments
  38. );
  39. // 绑定 queue 和 exchange
  40. channel.queueBind(queueName, exchangeName, routingKey);
  41. // 创建消费者
  42. // 》》》》 新版本通过重写 DefaultConsumer#handleDelivery 进行数据消费
  43. MyConsumer consumer = new MyConsumer(channel);
  44. // 绑定 channel 和 Consumer
  45. channel.basicConsume(
  46. queueName,
  47. true, // 是否 autoAck
  48. consumer);
  49. // 消费消息
  50. while(true) {
  51. }
  52. } finally {
  53. // 5. 关闭 Channel 和 Connection
  54. if (channel != null) {
  55. channel.close();
  56. }
  57. if (connection != null) {
  58. connection.close();
  59. }
  60. }
  61. }
  62. }
  1. class MyConsumer extends DefaultConsumer {
  2. private Channel channel;
  3. public MyConsumer(Channel channel){
  4. super(channel);
  5. this.channel = channel;
  6. }
  7. @Override
  8. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  9. System.err.println("-----------consume message----------");
  10. System.err.println("consumerTag: " + consumerTag);
  11. System.err.println("envelope: " + envelope);
  12. System.err.println("properties: " + properties);
  13. System.err.println("body: " + new String(body));
  14. // 手动 ack
  15. // channel.basicAck(envelope.getDeliveryTag(), false);
  16. }
  17. }

autoAck

  • autoAck=true,即 自动确认模式,一旦 rabbitmq 将消息分发给消费者,就会直接删除该消息
    • 如果消费者拿到消息却没有做处理,此时杀死正在执行的消费者,会丢失正在处理的消息
  • autoAck=false, 即 手动模式,一旦消费者挂掉,但是没有手动ack,则该消息会交付给其他消费者
    • 当消费者发送消息应答(ack),则 rabbitmq 会删除该消息

限流 qos

  • qos 服务质量保证。如果一定瞬目的消息未被确认前,不进行消费新的消息
  • 前提,要求 no_ask = fasle(关闭自动签收)

    1. channel.basicConsume(queueName,
    2. false, // 关闭自动签收
    3. consumer);
  • channel 设置 basicQos ```java channel.basicQos(0,

    1. 10,
    2. false);

/**

  1. * Request specific "quality of service" settings.
  2. *
  3. * These settings impose limits on the amount of data the server
  4. * will deliver to consumers before requiring acknowledgements.
  5. * Thus they provide a means of consumer-initiated flow control.
  6. * @see com.rabbitmq.client.AMQP.Basic.Qos
  7. * @param prefetchSize maximum amount of content (measured in
  8. * octets) that the server will deliver, 0 if unlimited
  9. * 消息体大小限制,0表示不限制
  10. * @param prefetchCount maximum number of messages that the server
  11. * will deliver, 0 if unlimited
  12. * 同时给 consumer 同时推送的最大消息数量
  13. * 如果该数量的消息没有 ack,则 consumer block until 消息 ack
  14. * @param global true if the settings should be applied to the
  15. * entire channel rather than each consumer
  16. * Qos 设置是否 channel 全局启用?
  17. * false 表示仅在当前 Consumer 启用, 即作用在绑定的 consumer
  18. * @throws java.io.IOException if an error is encountered

*/ void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

  1. <a name="jSfgn"></a>
  2. ### Nack 和 重回队列
  3. - nack 表示告诉 broker 消费失败
  4. - **ackAck 肯定是 false**
  5. - 重回队列是为了对没有处理成功的消息,讲消息重新投递到 broker 的尾端
  6. - **一般都会设置 false(默认 false)**
  7. ```java
  8. // 消费端消费消息时,给 broker 的响应
  9. channel.basicNack(envelope.getDeliveryTag(),
  10. false, // autoAck
  11. false); // 重回队列,建议 false
  12. /**
  13. * Reject one or several received messages.
  14. *
  15. * Supply the <code>deliveryTag</code> from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
  16. * or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected.
  17. * @see com.rabbitmq.client.AMQP.Basic.Nack
  18. * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
  19. * @param multiple true to reject all messages up to and including
  20. * the supplied delivery tag; false to reject just the supplied
  21. * delivery tag.
  22. * @param requeue true if the rejected message(s) should be requeued rather
  23. * than discarded/dead-lettered
  24. * @throws java.io.IOException if an error is encountered
  25. */
  26. void basicNack(long deliveryTag, boolean multiple, boolean requeue)
  27. throws IOException;

TTL

  • time to live, 生存时间

执行消息的TTL

  • 设置消息的 expiration 属性
    • 过期时间内未被消费会被 broker 自动删除
      1. // 额外参数
      2. AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
      3. .deliveryMode(2) // 持久化
      4. .contentEncoding("UTF-8") // 编码格式
      5. .expiration("10000") // ******** 过期时间 ***********
      6. .headers(new HashMap<String, Object>(){ // 自定义参数
      7. {
      8. put("key01", "value1");
      9. put("key02", "value2");
      10. }
      11. })
      12. .build();
  1. channel.basicPublish("normal_exchange", // exchange
  2. "routingKey", // routing key
  3. true , // mandatory 开启 return,监听是否可达
  4. properties, // 设置 proeprties
  5. msg.getBytes());

设置队列为过期队列

  • 从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的清除
  • 队列创建时设置 arguments 中的项 x-message-ttl

image.png


Exchange

  • 接受消息,根据路由键(routing key)转发消息所绑定的队列
  • 属性
    • name: 交换机名称
    • type: 交换机类型
      • direct
      • topic
      • fanout
      • headers
    • durability: 是否需要持久化,server 重启的时候是否保留
    • auto delete: 当最后一个绑定到 exchange 的队列删除后,是否自动删除该 exchange
    • internal: 当前 exchange 是否用于 rabbitmq 内部使用,默认为 false
      • 基本就是 false
    • argument:扩展参数,用于扩展 amqp 协议自定义化使用

direct exchange

  • 和 queue 之间的 binding 的 routingkey 名称需要全部命中才会转发

topic exchange

  • 所有关心 routing key 中指定的 topic 的 queue 都会转发
    • 其实就是 Message 用指定规则的 routingKey,Consumer 端进行 routingKey 的模糊匹配
  • 可以使用通配符进行模糊匹配
    • # 匹配一个或者多个
    • * 匹配一个
      • demo.* 能够匹配 demo.a, demo.wtf,无法匹配 demo.a.b
      • #.demo 可以匹配 a.b.c.d.demo

fanout exchange

  • 不处理 routing key, 只是简单将队列绑定到 exchange 上
  • 发送到 exchange 的消息都会转发到与 exchange 绑定的所有队列上
    • 类似广播
  • fanout exchange 的速度是最快的

Queue

  • 消息队列,实际存储消息数据
  • 属性
    • durability: 是否持久化, durable 是, transient 否
    • auto delete : yes 则表示当最后一个监听被移除后,该 queue 会自动删除

Message

  • server 和 client 之间传送的数据
  • 由 properties 和 payload(body) 组成
  • 常用属性
    • dilivery mode
    • haders(自定义属性)
  • 其他属性

    • correlation_id、reply_to、expiration, message_id
    • timestamp、type、user_id、appid、cluster_id
      1. // com.rabbitmq.client.AMQP.BasicProperties
      2. public static class BasicProperties extends AMQBasicProperties {
      3. private String contentType;
      4. private String contentEncoding;
      5. private Map<String, Object> headers;
      6. private Integer deliveryMode;
      7. private Integer priority;
      8. private String correlationId;
      9. private String replyTo;
      10. private String expiration;
      11. private String messageId;
      12. private Date timestamp;
      13. private String type;
      14. private String userId;
      15. private String appId;
      16. private String clusterId;
      17. }
  • 生产端设置 properties 即可

    1. AMQP.BasicProperties basicProperties =
    2. new AMQP.BasicProperties.Builder()
    3. .deliveryMode(2)
    4. .contentEncoding(StandardCharsets.UTF_8.toString())
    5. .expiration("20000")
    6. // 进行设置
    7. .build();

virtual host

  • 虚拟地址,用于逻辑隔离,最上层的消息路由,类似 redis 不同的 db
  • 一个 virtual host 里面可以有若干个 exchange 和 queue
  • 一个 virtual host 里面不能有相同名称的 exchange 或 queue

死信队列

  • Dead-Letter-Exchange
  • 利用 DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新 publish 到另一个 Exchange,这个 Exchange 就是DLX

    进入的时机

  • 消息被拒绝(basic.reject/basic.nack) 且 requeue=false(重回队列false)

  • 消息 ttl 过期
  • 队列达到最大长度

设置死信队列

  1. // 死信队列的 exchange 是个普通 exchange
  2. channel.exchangeDeclare(
  3. "dlx.exchange", // exhcangeName
  4. "topic", // type
  5. true, // durable
  6. false, // autoDelete
  7. null // arguments
  8. );
  9. // 对应的 queue 也是一个普通的 queue
  10. channel.queueDeclare(
  11. "dlx.queue", // queueName
  12. true, // durable
  13. false, // exclusive
  14. false, // autoDelete
  15. null
  16. ); // arguments
  17. // binding
  18. //////////////////////////////////////////////////////
  19. //注意 routing key 可以接受所有消息
  20. /////////////////////////////////////////////////////
  21. channel.queueBind("dlx.queue", "dlx.exchange", "#");
  • 注意死信队列的 routingKey 是 #

绑定死信队列

  • 需要死信队列的队列加上属性 arguments.put("x-dead-letter-exchange", "dlx.exchange"); ```java String exchangeName = “normal_exchange”; String exchangeType = “topic”; String queueName = “normal_queue”;

// 声明一个普通的 exchange channel.exchangeDeclare( exchangeName, exchangeType, true, // durable false, // autoDelete null // argument );

Map arguments = new HashMap<>(); // 绑定死信队列, 死信将会到达死信队列 arguments.put(“x-dead-letter-exchange”, “dlx.exchange”);

// 声明一个普通的 queue channel.queueDeclare( queueName,
true, // durable false, // exclusive 独占模式 false, // autoDelete arguments // 设置该普通队列拥有死信队列 ); channel.queueBind(queueName, exchangeName, “routingKey”); ```