1 Work Queues工作队列模式

1.1 模式说明

Work Queues工作队列模式.png

  • Work Queues和入门案例中的简单模式相对,多个一个或一些消费端,多个消费端共同消费同一个队列中的消息。
  • 应用场景:对于任务过重或者任务较多的情况使用工作队列模式可以提高任务处理的速度。

1.2 应用示例

  • Work Queues和入门案例中的简单模式的代码是几乎有一样的,可以完全复制,并复制多一个消费者进行多个消费者同时消费消息的测试。
  • 示例:
  • 连接工具类:
  1. package com.xudaxian;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. /**
  7. * @version 1.0
  8. * @since 2021-02-04 14:07
  9. */
  10. public class ConnectionUtils {
  11. /**
  12. * 获取 Connection
  13. *
  14. * @return
  15. * @throws IOException
  16. * @throws TimeoutException
  17. */
  18. public static Connection getConnection() throws IOException, TimeoutException {
  19. //创建连接工厂
  20. ConnectionFactory connectionFactory = new ConnectionFactory();
  21. //主机地址,默认是localhost
  22. connectionFactory.setHost("192.168.18.120");
  23. //连接端口
  24. connectionFactory.setPort(5672);
  25. //虚拟主机的名称,默认为/
  26. connectionFactory.setVirtualHost("/xudaxian");
  27. //连接的用户名,默认为guest
  28. connectionFactory.setUsername("xudaxian");
  29. //连接的密码:默认为guest
  30. connectionFactory.setPassword("123456");
  31. //创建连接,并返回
  32. return connectionFactory.newConnection();
  33. }
  34. }
  • 生产者:
  1. package com.xudaxian;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import java.io.IOException;
  5. import java.nio.charset.StandardCharsets;
  6. import java.util.HashMap;
  7. import java.util.concurrent.TimeoutException;
  8. /**
  9. * 生产者
  10. *
  11. * @version 1.0
  12. * @since 2021-02-04 10:40
  13. */
  14. public class Producer {
  15. public static String QUEUE_NAME = "work_queue";
  16. public static void main(String[] args) throws IOException, TimeoutException {
  17. Connection connection = ConnectionUtils.getConnection();
  18. //创建信道
  19. Channel channel = connection.createChannel();
  20. //声明创建队列
  21. //参数1:队列名称
  22. //参数2:是否持久化队列
  23. //参数3:是否独占本地连接
  24. //参数4:是否在不使用的时候自动删除队列
  25. //参数5:队列的其他参数
  26. channel.queueDeclare(QUEUE_NAME, true, false, false, new HashMap<>());
  27. for (int i = 0; i < 30; i++) {
  28. //定义要发送的消息
  29. String message = "你好啊,RabbitMQ" + i;
  30. channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
  31. System.out.println("已经发送消息:" + message);
  32. }
  33. //释放资源
  34. channel.close();
  35. connection.close();
  36. }
  37. }
  • 消费者1:
  1. package com.xudaxian;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.nio.charset.StandardCharsets;
  5. import java.util.HashMap;
  6. import java.util.concurrent.TimeoutException;
  7. /**
  8. * 消费者
  9. *
  10. * @version 1.0
  11. * @since 2021-02-04 11:08
  12. */
  13. public class Consumer1 {
  14. public static String QUEUE_NAME = "work_queue";
  15. public static void main(String[] args) throws IOException, TimeoutException {
  16. Connection connection = ConnectionUtils.getConnection();
  17. //创建信道
  18. Channel channel = connection.createChannel();
  19. //声明创建队列
  20. //参数1:队列名称
  21. //参数2:是否持久化队列
  22. //参数3:是否独占本地连接
  23. //参数4:是否在不使用的时候自动删除队列
  24. //参数5:队列的其他参数
  25. channel.queueDeclare(QUEUE_NAME, true, false, false, new HashMap<>());
  26. //一次只能接受并处理一个消息
  27. channel.basicQos(1);
  28. //监听消息的消费者
  29. DefaultConsumer consumer = new DefaultConsumer(channel) {
  30. /**
  31. * @param consumerTag 消费者标签,在channel.basicConsume的时候可以指定
  32. * @param envelope 消息包内容,可以从其中获取消息的id,消息routing key,交换机、消息和重试标记(收到消息失败后是否需要重新发送)
  33. * @param properties 消息属性
  34. * @param body 消息
  35. * @throws IOException IO异常
  36. */
  37. @Override
  38. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  39. System.out.println("路由key:" + envelope.getRoutingKey());
  40. System.out.println("交换机:" + envelope.getExchange());
  41. System.out.println("消息id:" + envelope.getDeliveryTag());
  42. System.out.println("消费者1 接收到的消息:" + new String(body, StandardCharsets.UTF_8));
  43. System.out.println();
  44. System.out.println("============================================");
  45. System.out.println();
  46. try {
  47. Thread.sleep(1000);
  48. channel.basicAck(envelope.getDeliveryTag(), false);
  49. } catch (InterruptedException e) {
  50. e.printStackTrace();
  51. }
  52. }
  53. };
  54. //监听消息
  55. //参数1:队列名称
  56. //参数2:是否自动确认,设置为true表示消息接收到消息自动向MQ回复接收到了,MQ收到消息后会删除消息,设置为false,需要手动确认
  57. //参数3:监听消息的消费者
  58. channel.basicConsume(QUEUE_NAME, false, consumer);
  59. //不需要释放资源,应该保持一直监听消息
  60. // channel.close();
  61. // connection.close();
  62. }
  63. }
  • 消费者2:
  1. package com.xudaxian;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.nio.charset.StandardCharsets;
  5. import java.util.HashMap;
  6. import java.util.concurrent.TimeoutException;
  7. /**
  8. * 消费者
  9. *
  10. * @version 1.0
  11. * @since 2021-02-04 11:08
  12. */
  13. public class Consumer2 {
  14. public static String QUEUE_NAME = "work_queue";
  15. public static void main(String[] args) throws IOException, TimeoutException {
  16. Connection connection = ConnectionUtils.getConnection();
  17. //创建信道
  18. Channel channel = connection.createChannel();
  19. //声明创建队列
  20. //参数1:队列名称
  21. //参数2:是否持久化队列
  22. //参数3:是否独占本地连接
  23. //参数4:是否在不使用的时候自动删除队列
  24. //参数5:队列的其他参数
  25. channel.queueDeclare(QUEUE_NAME, true, false, false, new HashMap<>());
  26. //一次只能接受并处理一个消息
  27. channel.basicQos(1);
  28. //监听消息的消费者
  29. DefaultConsumer consumer = new DefaultConsumer(channel) {
  30. /**
  31. * @param consumerTag 消费者标签,在channel.basicConsume的时候可以指定
  32. * @param envelope 消息包内容,可以从其中获取消息的id,消息routing key,交换机、消息和重试标记(收到消息失败后是否需要重新发送)
  33. * @param properties 消息属性
  34. * @param body 消息
  35. * @throws IOException IO异常
  36. */
  37. @Override
  38. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  39. System.out.println("路由key:" + envelope.getRoutingKey());
  40. System.out.println("交换机:" + envelope.getExchange());
  41. System.out.println("消息id:" + envelope.getDeliveryTag());
  42. System.out.println("消费者2 接收到的消息:" + new String(body, StandardCharsets.UTF_8));
  43. System.out.println();
  44. System.out.println("============================================");
  45. System.out.println();
  46. try {
  47. Thread.sleep(1000);
  48. channel.basicAck(envelope.getDeliveryTag(), false);
  49. } catch (InterruptedException e) {
  50. e.printStackTrace();
  51. }
  52. }
  53. };
  54. //监听消息
  55. //参数1:队列名称
  56. //参数2:是否自动确认,设置为true表示消息接收到消息自动向MQ回复接收到了,MQ收到消息后会删除消息,设置为false,需要手动确认
  57. //参数3:监听消息的消费者
  58. channel.basicConsume(QUEUE_NAME, false, consumer);
  59. //不需要释放资源,应该保持一直监听消息
  60. // channel.close();
  61. // connection.close();
  62. }
  63. }

1.3 测试

  • 先启动两个消费者,然后再启动生产者,让其发送消息,到IDEA的两个消费者的控制台查看是否竞争性的接受到消息。

Work Queues工作队列模式测试之消费者1.png

Work Queues工作队列模式测试之消费者2.png

1.4 小结

  • 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。

2 订阅模式类型

  • 订阅模式示例图:

订阅模型示例图.png

  • 在之前的案例中,只有3个角色:
    • P:生产者,也就是要发送消息的程序。
    • C:消费者,消息的接受者,会一直等待消息的到来。
    • queue:消息队列,图中红色的部分。
  • 在订阅模型中,多个一个exchange的角色,而且过程略有变化。
    • P:生产者,也就是要发送消息的程序,但是不再发送给队列中,而是发送给X(交换机)。
    • C:消费者,消息的接受者,会一直等待消息的到来。
    • Queue:消息队列,接受消息,缓存消息。
    • Exchange:交换机,图中的X。一方面,接受生产者发送的消息,另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange一般有如下的3种:
      • Fanout:广播,将消息交给所有绑定到交换机的队列。
      • Direct:定向,将消息交给符合指定routing key的队列。
      • Topic:通配符,将消息交给符合routing pattern(路由模式)的队列。
  • Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列和Exchange绑定,或者没有符合路由规则的队列,那么消息就会丢失。

3 Publish/Subscribe发布与订阅模式

3.1 模式说明

Publish-Subscribe发布与订阅模式.png

  • 发布订阅模式:
  • RabbitMQ的工作模式 - 图6每个消费者监听自己的队列。
  • RabbitMQ的工作模式 - 图7生产者将消息发送给Broker,由交换机将消息转发到绑定到此交换机的每个队列,每个绑定的交换机的队列都将接收到消息。

  • 举例:

FanoutExchange举例.png

3.2 应用示例

  • 示例:
  • 连接工具类:
  1. package com.xudaxian;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. /**
  7. * @version 1.0
  8. * @since 2021-02-04 14:07
  9. */
  10. public class ConnectionUtils {
  11. /**
  12. * 获取 Connection
  13. *
  14. * @return
  15. * @throws IOException
  16. * @throws TimeoutException
  17. */
  18. public static Connection getConnection() throws IOException, TimeoutException {
  19. //创建连接工厂
  20. ConnectionFactory connectionFactory = new ConnectionFactory();
  21. //主机地址,默认是localhost
  22. connectionFactory.setHost("192.168.18.120");
  23. //连接端口
  24. connectionFactory.setPort(5672);
  25. //虚拟主机的名称,默认为/
  26. connectionFactory.setVirtualHost("/xudaxian");
  27. //连接的用户名,默认为guest
  28. connectionFactory.setUsername("xudaxian");
  29. //连接的密码:默认为guest
  30. connectionFactory.setPassword("123456");
  31. //创建连接,并返回
  32. return connectionFactory.newConnection();
  33. }
  34. }
  • 生产者:
  1. package com.xudaxian;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import java.io.IOException;
  6. import java.nio.charset.StandardCharsets;
  7. import java.util.HashMap;
  8. import java.util.concurrent.TimeoutException;
  9. /**
  10. * 发布和订阅模式使用的是Fanout
  11. * @version 1.0
  12. * @since 2021-02-04 10:40
  13. */
  14. public class Producer {
  15. //交换机的名称
  16. public static String FANOUT_EXCHANGE = "fanout_exchange";
  17. //队列名称
  18. public static String FANOUT_QUEUE_1 = "fanout_queue_1";
  19. //队列名称
  20. public static String FANOUT_QUEUE_2 = "fanout_queue_2";
  21. public static void main(String[] args) throws IOException, TimeoutException {
  22. Connection connection = ConnectionUtils.getConnection();
  23. //创建信道
  24. Channel channel = connection.createChannel();
  25. //声明交换机
  26. //参数1: 交换机名称
  27. //参数2:交换机类型 fanout topic direct headers(已经不使用了)
  28. channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
  29. //声明创建队列
  30. //参数1:队列名称
  31. //参数2:是否持久化队列
  32. //参数3:是否独占本地连接
  33. //参数4:是否在不使用的时候自动删除队列
  34. //参数5:队列的其他参数
  35. channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, new HashMap<>());
  36. channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, new HashMap<>());
  37. //队列绑定交换机
  38. channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHANGE, "");
  39. channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHANGE, "");
  40. for (int i = 0; i < 30; i++) {
  41. //定义要发送的消息
  42. String message = "你好啊,RabbitMQ,发送订阅模式" + i;
  43. //参数1:交换机的名称,如果没有指定则使用默认的Default Exchange
  44. //参数2:路由key,简单模式可以传队列名称
  45. //参数3:消息的其他属性
  46. //参数4:消息的内容
  47. channel.basicPublish(FANOUT_EXCHANGE, "", null, message.getBytes(StandardCharsets.UTF_8));
  48. System.out.println("已经发送消息:" + message);
  49. }
  50. //释放资源
  51. channel.close();
  52. connection.close();
  53. }
  54. }
  • 消费者1:
  1. package com.xudaxian;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.nio.charset.StandardCharsets;
  5. import java.util.HashMap;
  6. import java.util.concurrent.TimeoutException;
  7. /**
  8. * 消费者
  9. *
  10. * @version 1.0
  11. * @since 2021-02-04 11:08
  12. */
  13. public class Consumer1 {
  14. //交换机的名称
  15. public static String FANOUT_EXCHANGE = "fanout_exchange";
  16. //队列的名称
  17. public static String FANOUT_QUEUE_1 = "fanout_queue_1";
  18. public static void main(String[] args) throws IOException, TimeoutException {
  19. Connection connection = ConnectionUtils.getConnection();
  20. //创建信道
  21. Channel channel = connection.createChannel();
  22. //声明交换机
  23. //参数1: 交换机名称
  24. //参数2:交换机类型 fanout topic direct headers(已经不使用了)
  25. channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
  26. //声明创建队列
  27. //参数1:队列名称
  28. //参数2:是否持久化队列
  29. //参数3:是否独占本地连接
  30. //参数4:是否在不使用的时候自动删除队列
  31. //参数5:队列的其他参数
  32. channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, new HashMap<>());
  33. //队列绑定交换机
  34. channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHANGE, "");
  35. //监听消息的消费者
  36. DefaultConsumer consumer = new DefaultConsumer(channel) {
  37. /**
  38. * @param consumerTag 消费者标签,在channel.basicConsume的时候可以指定
  39. * @param envelope 消息包内容,可以从其中获取消息的id,消息routing key,交换机、消息和重试标记(收到消息失败后是否需要重新发送)
  40. * @param properties 消息属性
  41. * @param body 消息
  42. * @throws IOException IO异常
  43. */
  44. @Override
  45. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  46. System.out.println("路由key:" + envelope.getRoutingKey());
  47. System.out.println("交换机:" + envelope.getExchange());
  48. System.out.println("消息id:" + envelope.getDeliveryTag());
  49. System.out.println("消费者1 接收到的消息:" + new String(body, StandardCharsets.UTF_8));
  50. System.out.println();
  51. System.out.println("============================================");
  52. System.out.println();
  53. }
  54. };
  55. //监听消息
  56. //参数1:队列名称
  57. //参数2:是否自动确认,设置为true表示消息接收到消息自动向MQ回复接收到了,MQ收到消息后会删除消息,设置为false,需要手动确认
  58. //参数3:监听消息的消费者
  59. channel.basicConsume(FANOUT_QUEUE_1, true, consumer);
  60. //不需要释放资源,应该保持一直监听消息
  61. // channel.close();
  62. // connection.close();
  63. }
  64. }
  • 消费者2:
  1. package com.xudaxian;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.nio.charset.StandardCharsets;
  5. import java.util.HashMap;
  6. import java.util.concurrent.TimeoutException;
  7. /**
  8. * 消费者
  9. *
  10. * @version 1.0
  11. * @since 2021-02-04 11:08
  12. */
  13. public class Consumer2 {
  14. //交换机的名称
  15. public static String FANOUT_EXCHANGE = "fanout_exchange";
  16. //队列的名称
  17. public static String FANOUT_QUEUE_2 = "fanout_queue_2";
  18. public static void main(String[] args) throws IOException, TimeoutException {
  19. Connection connection = ConnectionUtils.getConnection();
  20. //创建信道
  21. Channel channel = connection.createChannel();
  22. //声明交换机
  23. //参数1: 交换机名称
  24. //参数2:交换机类型 fanout topic direct headers(已经不使用了)
  25. channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
  26. //声明创建队列
  27. //参数1:队列名称
  28. //参数2:是否持久化队列
  29. //参数3:是否独占本地连接
  30. //参数4:是否在不使用的时候自动删除队列
  31. //参数5:队列的其他参数
  32. channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, new HashMap<>());
  33. //队列绑定交换机
  34. channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHANGE, "");
  35. //监听消息的消费者
  36. DefaultConsumer consumer = new DefaultConsumer(channel) {
  37. /**
  38. * @param consumerTag 消费者标签,在channel.basicConsume的时候可以指定
  39. * @param envelope 消息包内容,可以从其中获取消息的id,消息routing key,交换机、消息和重试标记(收到消息失败后是否需要重新发送)
  40. * @param properties 消息属性
  41. * @param body 消息
  42. * @throws IOException IO异常
  43. */
  44. @Override
  45. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  46. System.out.println("路由key:" + envelope.getRoutingKey());
  47. System.out.println("交换机:" + envelope.getExchange());
  48. System.out.println("消息id:" + envelope.getDeliveryTag());
  49. System.out.println("消费者2 接收到的消息:" + new String(body, StandardCharsets.UTF_8));
  50. System.out.println();
  51. System.out.println("============================================");
  52. System.out.println();
  53. }
  54. };
  55. //监听消息
  56. //参数1:队列名称
  57. //参数2:是否自动确认,设置为true表示消息接收到消息自动向MQ回复接收到了,MQ收到消息后会删除消息,设置为false,需要手动确认
  58. //参数3:监听消息的消费者
  59. channel.basicConsume(FANOUT_QUEUE_2, true, consumer);
  60. //不需要释放资源,应该保持一直监听消息
  61. // channel.close();
  62. // connection.close();
  63. }
  64. }

3.3 测试

  • 启动所有的消费者,然后使用生产者发送消息,在每个消费者对应的控制台可以看到生产者发送的所有消息,明显出现广播的效果。

广播模式之消费者1.png

广播模式之消费者2.png

3.4 小结

  • 交换器需要和队列进行绑定,绑定之后,一个消息可以被多个消费者接收到。

3.5 发布订阅模式和工作队列的区别

  • RabbitMQ的工作模式 - 图11工作队列模式不用定义交换机,而发布订阅模式需要定义交换机。
  • RabbitMQ的工作模式 - 图12发布订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方式面向队列发送消息(底层使用默认的交换机)。
  • RabbitMQ的工作模式 - 图13发布订阅模式需要设置交换机和队列的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑定到默认的交换机上。

4 Routing路由模式

4.1 模式说明

  • 路由模式的特点:
  • RabbitMQ的工作模式 - 图14队列和交换机的绑定,不能是任意的绑定了,而是要指定一个routing key
  • RabbitMQ的工作模式 - 图15消息的发送方在向Exchange发送消息的时候,也必须指定消息的routing key
  • RabbitMQ的工作模式 - 图16Exchange不再把消息交给每一个绑定的队列,而是根据消息的routing key进行判断,只有队列的routing key和消息的routing key完全一致,才会接受到消息。

路由模式.png

  • 图解:
  • P:生产者,向Exchange发送消息,发送消息的时候,会指定一个routing key
  • X:Exchange(交换机),接受生产者的消息,然后把消息传递给和routing key完全匹配的队列。
  • C1:消费者,其所在的队列指定了需要routing keyerror的消息。
  • C2:消费者,其所在的队列指定了需要routing keyinfoerrorwarning的消息。

  • 举例:

DirectExchange举例.jpg

4.2 应用示例

  • 在编码上,和发布订阅模式的区别是交换机的类型为Direct,还有队列绑定交换机的时候需要指定routing key
  • 示例:
  • 连接工具类:
  1. package com.xudaxian;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. /**
  7. * @version 1.0
  8. * @since 2021-02-04 14:07
  9. */
  10. public class ConnectionUtils {
  11. /**
  12. * 获取 Connection
  13. *
  14. * @return
  15. * @throws IOException
  16. * @throws TimeoutException
  17. */
  18. public static Connection getConnection() throws IOException, TimeoutException {
  19. //创建连接工厂
  20. ConnectionFactory connectionFactory = new ConnectionFactory();
  21. //主机地址,默认是localhost
  22. connectionFactory.setHost("192.168.18.120");
  23. //连接端口
  24. connectionFactory.setPort(5672);
  25. //虚拟主机的名称,默认为/
  26. connectionFactory.setVirtualHost("/xudaxian");
  27. //连接的用户名,默认为guest
  28. connectionFactory.setUsername("xudaxian");
  29. //连接的密码:默认为guest
  30. connectionFactory.setPassword("123456");
  31. //创建连接,并返回
  32. return connectionFactory.newConnection();
  33. }
  34. }
  • 生产者:
  1. package com.xudaxian;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import java.io.IOException;
  6. import java.nio.charset.StandardCharsets;
  7. import java.util.HashMap;
  8. import java.util.concurrent.TimeoutException;
  9. /**
  10. * 路由模式的交换机类型为Direct
  11. *
  12. * @version 1.0
  13. * @since 2021-02-04 10:40
  14. */
  15. public class Producer {
  16. //交换机的名称
  17. public static String DIRECT_EXCHANGE = "direct_exchange";
  18. //队列名称
  19. public static String DIRECT_QUEUE_1 = "direct_queue_1";
  20. //队列名称
  21. public static String DIRECT_QUEUE_2 = "direct_queue_2";
  22. public static void main(String[] args) throws IOException, TimeoutException {
  23. Connection connection = ConnectionUtils.getConnection();
  24. //创建信道
  25. Channel channel = connection.createChannel();
  26. //声明交换机
  27. //参数1: 交换机名称
  28. //参数2:交换机类型 fanout topic direct headers(已经不使用了)
  29. channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);
  30. //声明创建队列
  31. //参数1:队列名称
  32. //参数2:是否持久化队列
  33. //参数3:是否独占本地连接
  34. //参数4:是否在不使用的时候自动删除队列
  35. //参数5:队列的其他参数
  36. channel.queueDeclare(DIRECT_QUEUE_1, true, false, false, new HashMap<>());
  37. channel.queueDeclare(DIRECT_QUEUE_2, true, false, false, new HashMap<>());
  38. //队列绑定交换机
  39. channel.queueBind(DIRECT_QUEUE_1, DIRECT_EXCHANGE, "error");
  40. channel.queueBind(DIRECT_QUEUE_2, DIRECT_EXCHANGE, "info");
  41. channel.queueBind(DIRECT_QUEUE_2, DIRECT_EXCHANGE, "error");
  42. channel.queueBind(DIRECT_QUEUE_2, DIRECT_EXCHANGE, "warning");
  43. //发送消息
  44. channel.basicPublish(DIRECT_EXCHANGE, "error", null, "路由模式为error".getBytes(StandardCharsets.UTF_8));
  45. channel.basicPublish(DIRECT_EXCHANGE, "info", null, "路由模式为info".getBytes(StandardCharsets.UTF_8));
  46. channel.basicPublish(DIRECT_EXCHANGE, "warning", null, "路由模式为warning".getBytes(StandardCharsets.UTF_8));
  47. //释放资源
  48. channel.close();
  49. connection.close();
  50. }
  51. }
  • 消费者1:
  1. package com.xudaxian;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.nio.charset.StandardCharsets;
  5. import java.util.HashMap;
  6. import java.util.concurrent.TimeoutException;
  7. /**
  8. * 消费者
  9. *
  10. * @version 1.0
  11. * @since 2021-02-04 11:08
  12. */
  13. public class Consumer1 {
  14. //交换机的名称
  15. public static String DIRECT_EXCHANGE = "direct_exchange";
  16. //队列名称
  17. public static String DIRECT_QUEUE_1 = "direct_queue_1";
  18. public static void main(String[] args) throws IOException, TimeoutException {
  19. Connection connection = ConnectionUtils.getConnection();
  20. //创建信道
  21. Channel channel = connection.createChannel();
  22. //声明交换机
  23. //参数1: 交换机名称
  24. //参数2:交换机类型 fanout topic direct headers(已经不使用了)
  25. channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);
  26. //声明创建队列
  27. //参数1:队列名称
  28. //参数2:是否持久化队列
  29. //参数3:是否独占本地连接
  30. //参数4:是否在不使用的时候自动删除队列
  31. //参数5:队列的其他参数
  32. channel.queueDeclare(DIRECT_QUEUE_1, true, false, false, new HashMap<>());
  33. //队列绑定交换机
  34. channel.queueBind(DIRECT_QUEUE_1, DIRECT_EXCHANGE, "error");
  35. //监听消息的消费者
  36. DefaultConsumer consumer = new DefaultConsumer(channel) {
  37. /**
  38. * @param consumerTag 消费者标签,在channel.basicConsume的时候可以指定
  39. * @param envelope 消息包内容,可以从其中获取消息的id,消息routing key,交换机、消息和重试标记(收到消息失败后是否需要重新发送)
  40. * @param properties 消息属性
  41. * @param body 消息
  42. * @throws IOException IO异常
  43. */
  44. @Override
  45. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  46. System.out.println("路由key:" + envelope.getRoutingKey());
  47. System.out.println("交换机:" + envelope.getExchange());
  48. System.out.println("消息id:" + envelope.getDeliveryTag());
  49. System.out.println("消费者1 接收到的消息:" + new String(body, StandardCharsets.UTF_8));
  50. System.out.println();
  51. System.out.println("============================================");
  52. System.out.println();
  53. }
  54. };
  55. //监听消息
  56. //参数1:队列名称
  57. //参数2:是否自动确认,设置为true表示消息接收到消息自动向MQ回复接收到了,MQ收到消息后会删除消息,设置为false,需要手动确认
  58. //参数3:监听消息的消费者
  59. channel.basicConsume(DIRECT_QUEUE_1, true, consumer);
  60. //不需要释放资源,应该保持一直监听消息
  61. // channel.close();
  62. // connection.close();
  63. }
  64. }
  • 消费者2:
  1. package com.xudaxian;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.nio.charset.StandardCharsets;
  5. import java.util.HashMap;
  6. import java.util.concurrent.TimeoutException;
  7. /**
  8. * 消费者
  9. *
  10. * @version 1.0
  11. * @since 2021-02-04 11:08
  12. */
  13. public class Consumer2 {
  14. //交换机的名称
  15. public static String DIRECT_EXCHANGE = "direct_exchange";
  16. //队列名称
  17. public static String DIRECT_QUEUE_2 = "direct_queue_2";
  18. public static void main(String[] args) throws IOException, TimeoutException {
  19. Connection connection = ConnectionUtils.getConnection();
  20. //创建信道
  21. Channel channel = connection.createChannel();
  22. //声明交换机
  23. //参数1: 交换机名称
  24. //参数2:交换机类型 fanout topic direct headers(已经不使用了)
  25. channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);
  26. //声明创建队列
  27. //参数1:队列名称
  28. //参数2:是否持久化队列
  29. //参数3:是否独占本地连接
  30. //参数4:是否在不使用的时候自动删除队列
  31. //参数5:队列的其他参数
  32. channel.queueDeclare(DIRECT_QUEUE_2, true, false, false, new HashMap<>());
  33. //队列绑定交换机
  34. channel.queueBind(DIRECT_QUEUE_2, DIRECT_EXCHANGE, "error");
  35. channel.queueBind(DIRECT_QUEUE_2, DIRECT_EXCHANGE, "info");
  36. channel.queueBind(DIRECT_QUEUE_2, DIRECT_EXCHANGE, "warning");
  37. //监听消息的消费者
  38. DefaultConsumer consumer = new DefaultConsumer(channel) {
  39. /**
  40. * @param consumerTag 消费者标签,在channel.basicConsume的时候可以指定
  41. * @param envelope 消息包内容,可以从其中获取消息的id,消息routing key,交换机、消息和重试标记(收到消息失败后是否需要重新发送)
  42. * @param properties 消息属性
  43. * @param body 消息
  44. * @throws IOException IO异常
  45. */
  46. @Override
  47. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  48. System.out.println("路由key:" + envelope.getRoutingKey());
  49. System.out.println("交换机:" + envelope.getExchange());
  50. System.out.println("消息id:" + envelope.getDeliveryTag());
  51. System.out.println("消费者2 接收到的消息:" + new String(body, StandardCharsets.UTF_8));
  52. System.out.println();
  53. System.out.println("============================================");
  54. System.out.println();
  55. }
  56. };
  57. //监听消息
  58. //参数1:队列名称
  59. //参数2:是否自动确认,设置为true表示消息接收到消息自动向MQ回复接收到了,MQ收到消息后会删除消息,设置为false,需要手动确认
  60. //参数3:监听消息的消费者
  61. channel.basicConsume(DIRECT_QUEUE_2, true, consumer);
  62. //不需要释放资源,应该保持一直监听消息
  63. // channel.close();
  64. // connection.close();
  65. }
  66. }

4.3 测试

  • 启动所有的消费者,然后使用生产者发送消息,在消费者对应的控制台可以看到生产者发送对应routing key对应队列的消息,到达之后按照需要接受。

路由模式测试之消费者1.png

路由模式测试之消费者2.png

  • 在执行完测试代码后,可以在RabbitMQ管理后台查找Exchange选项卡,点击direct_exchange的交换机,可以查看到如下的绑定:

路由模式之RabbitMQ管理后台查看.png

4.4 小结

  • Routing模式要求队列在绑定交换机时要指定routing key,消息会转发到符合routing key的队列。

5 Topic通配符模式

5.1 模式说明

  • Topic类型和Direct相比,都是可以根据routing key将消息路由到不同的队列,只不过Exchange类型为Topic可以让队列在绑定routing key的时候使用通配符。

通配符模式.png

  • routing key 一般都是有一个或多个单词组成,多个单词之间以“.”分割,例如:item.insert
  • 通配符的规则:
    • #:匹配一个或多个词。
    • *:只能匹配一个词。
  • 举例:
    • item.#:能够匹配item.insert.abc或item.insert。
    • item.*:只能匹配item.insert。

通配符模式2.png

  • 图解:

    • 红色的Queue:绑定的是usa.#,因此凡是以usa.开头的routing key都会匹配到。
    • 黄色的Queue:绑定的是#.new,因此凡是以.new结尾的routing key都会匹配到。
  • 举例:

TopicExchange举例.png
TopicExchange举例2.png
TopicExchange举例3.png

5.2 应用示例

  • 示例:
  • 连接工具类:
  1. package com.xudaxian;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. /**
  7. * @version 1.0
  8. * @since 2021-02-04 14:07
  9. */
  10. public class ConnectionUtils {
  11. /**
  12. * 获取 Connection
  13. *
  14. * @return
  15. * @throws IOException
  16. * @throws TimeoutException
  17. */
  18. public static Connection getConnection() throws IOException, TimeoutException {
  19. //创建连接工厂
  20. ConnectionFactory connectionFactory = new ConnectionFactory();
  21. //主机地址,默认是localhost
  22. connectionFactory.setHost("192.168.18.120");
  23. //连接端口
  24. connectionFactory.setPort(5672);
  25. //虚拟主机的名称,默认为/
  26. connectionFactory.setVirtualHost("/xudaxian");
  27. //连接的用户名,默认为guest
  28. connectionFactory.setUsername("xudaxian");
  29. //连接的密码:默认为guest
  30. connectionFactory.setPassword("123456");
  31. //创建连接,并返回
  32. return connectionFactory.newConnection();
  33. }
  34. }
  • 生产者:
  1. package com.xudaxian;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import java.io.IOException;
  6. import java.nio.charset.StandardCharsets;
  7. import java.util.HashMap;
  8. import java.util.concurrent.TimeoutException;
  9. /**
  10. * @version 1.0
  11. * @since 2021-02-04 22:43
  12. */
  13. public class Producer {
  14. //交换机的名称
  15. public static String TOPIC_EXCHANGE = "topic_exchange";
  16. //队列名称
  17. public static String TOPIC_QUEUE_1 = "topic_queue_1";
  18. //队列名称
  19. public static String TOPIC_QUEUE_2 = "topic_queue_2";
  20. public static void main(String[] args) throws IOException, TimeoutException {
  21. Connection connection = ConnectionUtils.getConnection();
  22. //创建信道
  23. Channel channel = connection.createChannel();
  24. //声明创建队列
  25. //参数1:队列名称
  26. //参数2:是否持久化队列
  27. //参数3:是否独占本地连接
  28. //参数4:是否在不使用的时候自动删除队列
  29. //参数5:队列的其他参数
  30. channel.queueDeclare(TOPIC_QUEUE_1, true, false, false, new HashMap<>());
  31. channel.queueDeclare(TOPIC_QUEUE_2, true, false, false, new HashMap<>());
  32. //声明交换机
  33. //参数1: 交换机名称
  34. //参数2:交换机类型 fanout topic direct headers(已经不使用了)
  35. channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
  36. channel.queueBind(TOPIC_QUEUE_1, TOPIC_EXCHANGE, "item.#");
  37. channel.queueBind(TOPIC_QUEUE_2, TOPIC_EXCHANGE, "item.*");
  38. //发送消息
  39. channel.basicPublish(TOPIC_EXCHANGE, "item.insert", null, "新增商品,item.insert".getBytes(StandardCharsets.UTF_8));
  40. channel.basicPublish(TOPIC_EXCHANGE, "item.update", null, "更新商品,item.update".getBytes(StandardCharsets.UTF_8));
  41. channel.basicPublish(TOPIC_EXCHANGE, "item.delete", null, "删除商品,item.delete".getBytes(StandardCharsets.UTF_8));
  42. channel.basicPublish(TOPIC_EXCHANGE, "item.insert.abc", null, "其他信息,item.insert.abc".getBytes(StandardCharsets.UTF_8));
  43. //释放资源
  44. channel.close();
  45. connection.close();
  46. }
  47. }
  • 消费者1:
  1. package com.xudaxian;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.nio.charset.StandardCharsets;
  5. import java.util.HashMap;
  6. import java.util.concurrent.TimeoutException;
  7. /**
  8. * 消费者 接收两种类型:item.update和item.delete
  9. *
  10. * @version 1.0
  11. * @since 2021-02-04 11:08
  12. */
  13. public class Consumer1 {
  14. //交换机的名称
  15. public static String TOPIC_EXCHANGE = "topic_exchange";
  16. //队列名称
  17. public static String TOPIC_QUEUE_1 = "topic_queue_1";
  18. public static void main(String[] args) throws IOException, TimeoutException {
  19. Connection connection = ConnectionUtils.getConnection();
  20. //创建信道
  21. Channel channel = connection.createChannel();
  22. //声明交换机
  23. //参数1: 交换机名称
  24. //参数2:交换机类型 fanout topic direct headers(已经不使用了)
  25. channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
  26. //声明创建队列
  27. //参数1:队列名称
  28. //参数2:是否持久化队列
  29. //参数3:是否独占本地连接
  30. //参数4:是否在不使用的时候自动删除队列
  31. //参数5:队列的其他参数
  32. channel.queueDeclare(TOPIC_QUEUE_1, true, false, false, new HashMap<>());
  33. //队列绑定交换机
  34. channel.queueBind(TOPIC_QUEUE_1, TOPIC_EXCHANGE, "item.insert");
  35. channel.queueBind(TOPIC_QUEUE_1, TOPIC_EXCHANGE, "item.delete");
  36. //监听消息的消费者
  37. DefaultConsumer consumer = new DefaultConsumer(channel) {
  38. /**
  39. * @param consumerTag 消费者标签,在channel.basicConsume的时候可以指定
  40. * @param envelope 消息包内容,可以从其中获取消息的id,消息routing key,交换机、消息和重试标记(收到消息失败后是否需要重新发送)
  41. * @param properties 消息属性
  42. * @param body 消息
  43. * @throws IOException IO异常
  44. */
  45. @Override
  46. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  47. System.out.println("路由key:" + envelope.getRoutingKey());
  48. System.out.println("交换机:" + envelope.getExchange());
  49. System.out.println("消息id:" + envelope.getDeliveryTag());
  50. System.out.println("消费者1 接收到的消息:" + new String(body, StandardCharsets.UTF_8));
  51. System.out.println();
  52. System.out.println("============================================");
  53. System.out.println();
  54. }
  55. };
  56. //监听消息
  57. //参数1:队列名称
  58. //参数2:是否自动确认,设置为true表示消息接收到消息自动向MQ回复接收到了,MQ收到消息后会删除消息,设置为false,需要手动确认
  59. //参数3:监听消息的消费者
  60. channel.basicConsume(TOPIC_QUEUE_1, true, consumer);
  61. //不需要释放资源,应该保持一直监听消息
  62. // channel.close();
  63. // connection.close();
  64. }
  65. }
  • 消费者2:
  1. package com.xudaxian;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.nio.charset.StandardCharsets;
  5. import java.util.HashMap;
  6. import java.util.concurrent.TimeoutException;
  7. /**
  8. * 消费者
  9. *
  10. * @version 1.0
  11. * @since 2021-02-04 11:08
  12. */
  13. public class Consumer2 {
  14. //交换机的名称
  15. public static String TOPIC_EXCHANGE = "topic_exchange";
  16. //队列名称
  17. public static String TOPIC_QUEUE_2 = "topic_queue_2";
  18. public static void main(String[] args) throws IOException, TimeoutException {
  19. Connection connection = ConnectionUtils.getConnection();
  20. //创建信道
  21. Channel channel = connection.createChannel();
  22. //声明交换机
  23. //参数1: 交换机名称
  24. //参数2:交换机类型 fanout topic direct headers(已经不使用了)
  25. channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
  26. //声明创建队列
  27. //参数1:队列名称
  28. //参数2:是否持久化队列
  29. //参数3:是否独占本地连接
  30. //参数4:是否在不使用的时候自动删除队列
  31. //参数5:队列的其他参数
  32. channel.queueDeclare(TOPIC_QUEUE_2, true, false, false, new HashMap<>());
  33. //队列绑定交换机
  34. channel.queueBind(TOPIC_QUEUE_2, TOPIC_EXCHANGE, "item.*");
  35. //监听消息的消费者
  36. DefaultConsumer consumer = new DefaultConsumer(channel) {
  37. /**
  38. * @param consumerTag 消费者标签,在channel.basicConsume的时候可以指定
  39. * @param envelope 消息包内容,可以从其中获取消息的id,消息routing key,交换机、消息和重试标记(收到消息失败后是否需要重新发送)
  40. * @param properties 消息属性
  41. * @param body 消息
  42. * @throws IOException IO异常
  43. */
  44. @Override
  45. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  46. System.out.println("路由key:" + envelope.getRoutingKey());
  47. System.out.println("交换机:" + envelope.getExchange());
  48. System.out.println("消息id:" + envelope.getDeliveryTag());
  49. System.out.println("消费者2 接收到的消息:" + new String(body, StandardCharsets.UTF_8));
  50. System.out.println();
  51. System.out.println("============================================");
  52. System.out.println();
  53. }
  54. };
  55. //监听消息
  56. //参数1:队列名称
  57. //参数2:是否自动确认,设置为true表示消息接收到消息自动向MQ回复接收到了,MQ收到消息后会删除消息,设置为false,需要手动确认
  58. //参数3:监听消息的消费者
  59. channel.basicConsume(TOPIC_QUEUE_2, true, consumer);
  60. //不需要释放资源,应该保持一直监听消息
  61. // channel.close();
  62. // connection.close();
  63. }
  64. }

5.3 测试

  • 启动所有消费者,然后使用生产者发送消息,在消费者对应的控制台上可以看到生产者发送对应的routing key对应队列的消息。

Topic通配符模式测试之消费者1.png

Topic通配符模式测试之消费者2.png

5.4 小结

  • Topic主题模式可以实现Publish/Subscribe发布与订阅的模式Routing路由模式的功能,只是Topic在配置routing key的时候可以使用通配符,显得更加灵活。

6 模式总结

  • RabbitMQ的工作模式 - 图29简单模式:一个生产者,一个消费者,不需要设置交换机(使用默认的交换机)。
  • RabbitMQ的工作模式 - 图30工作队列模式Work Queues:一个生产者,多个消费者(竞争的关系),不需要设置交换机(使用默认的交换机)。
  • RabbitMQ的工作模式 - 图31发布订阅模式Publish/Subscribe:需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机将消息发送到绑定的队列中。
  • RabbitMQ的工作模式 - 图32路由模式Routing key:需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机根据routing key将消息发送到对应的队列中。
  • RabbitMQ的工作模式 - 图33通配符模式Topic:需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列中。

7 模式模拟网站

**