简介

RabbitMQ提供了6种工作模式:简单模式、work queues、Publish/Subscribe发布与订阅模式、Routing路由模式、Topics主题模式、RPC远程调用模式(远程调用,不太算MQ;暂不作介绍)。
官网对应模式介绍:https://www.rabbitmq.com/getstarted.html
image.png
RabbitMQ 基础架构:
image.png
RabbitMQ相关概念:

  • Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
  • Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
  • Connection:publisher/consumer 和 broker 之间的 TCP 连接
  • Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
  • Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
  • Queue:消息最终被送到这里等待 consumer 取走
  • Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

    六种模式入门:

    简单模式

    image.png
    上图的模型中,有以下概念:

  • P:生产者,也就是要发送消息的程序

  • C:消费者:消息的接收者,会一直等待消息到来
  • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息

maven坐标

  1. <dependencies>
  2. <!--rabbitmq java 客户端-->
  3. <dependency>
  4. <groupId>com.rabbitmq</groupId>
  5. <artifactId>amqp-client</artifactId>
  6. <version>5.6.0</version>
  7. </dependency>
  8. </dependencies>

Java代码

  1. //生产者
  2. public class Producer_HelloWord {
  3. public static void main(String[] args) throws IOException, TimeoutException {
  4. // 1.创建工厂
  5. ConnectionFactory factory = new ConnectionFactory();
  6. // 2.设置参数
  7. factory.setHost("192.168.31.128"); //默认ip localhost
  8. factory.setPort(5672); //默认端口 5672
  9. factory.setVirtualHost("/"); //虚拟机 默认是/
  10. factory.setUsername("guest"); //用户名 默认guest
  11. factory.setPassword("guest"); //密码 默认guest
  12. // 3.创建连接 Connection
  13. Connection connection = factory.newConnection();
  14. // 4.创建Channel
  15. Channel channel = connection.createChannel();
  16. // 5.创建消息队列queue
  17. /*
  18. queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
  19. 参数:
  20. 1. queue:队列名称
  21. 2. durable:是否持久化,当mq重启之后,还在
  22. 3. exclusive:
  23. * 是否独占。只能有一个消费者监听这队列
  24. * 当Connection关闭时,是否删除队列
  25. *
  26. 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
  27. 5. arguments:参数。
  28. */
  29. //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
  30. channel.queueDeclare("hello_world",true,false,false,null);
  31. String body = "hello RabbitMQ~~";
  32. // 6.发送消息
  33. channel.basicPublish("","hello_world",null,body.getBytes());
  34. // 7.关闭
  35. channel.close();
  36. connection.close();
  37. }
  38. }
  39. -------------------------------------------------------------------------------------------------------------------
  40. //消费者
  41. public class Consumer_HelloWord {
  42. public static void main(String[] args) throws IOException, TimeoutException {
  43. //1.创建连接工厂
  44. ConnectionFactory factory = new ConnectionFactory();
  45. //2. 设置参数
  46. factory.setHost("192.168.31.128"); //默认ip localhost
  47. factory.setPort(5672); //默认端口 5672
  48. factory.setVirtualHost("/"); //虚拟机 默认是/
  49. factory.setUsername("guest"); //用户名 默认guest
  50. factory.setPassword("guest"); //密码 默认guest
  51. //3. 创建连接 Connection
  52. Connection connection = factory.newConnection();
  53. //4. 创建Channel
  54. Channel channel = connection.createChannel();
  55. //5. 创建队列Queue
  56. /*
  57. queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
  58. 参数:
  59. 1. queue:队列名称
  60. 2. durable:是否持久化,当mq重启之后,还在
  61. 3. exclusive:
  62. * 是否独占。只能有一个消费者监听这队列
  63. * 当Connection关闭时,是否删除队列
  64. *
  65. 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
  66. 5. arguments:参数。
  67. */
  68. //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
  69. channel.queueDeclare("hello_world", true, false, false, null);
  70. /*
  71. basicConsume(String queue, boolean autoAck, Consumer callback)
  72. 参数:
  73. 1. queue:队列名称
  74. 2. autoAck:是否自动确认
  75. 3. callback:回调对象
  76. */
  77. // 接收消息
  78. Consumer consumer = new DefaultConsumer(channel) {
  79. /*
  80. 回调方法,当收到消息后,会自动执行该方法
  81. 1. consumerTag:标识
  82. 2. envelope:获取一些信息,交换机,路由key...
  83. 3. properties:配置信息
  84. 4. body:数据
  85. */
  86. @Override
  87. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  88. System.out.println("consumerTag:" + consumerTag);
  89. System.out.println("Exchange:" + envelope.getExchange());
  90. System.out.println("RoutingKey:" + envelope.getRoutingKey());
  91. System.out.println("properties:" + properties);
  92. System.out.println("body:" + new String(body));
  93. }
  94. };
  95. channel.basicConsume("hello_world", true, consumer);
  96. //关闭资源?不要
  97. }
  98. }

工作队列

image.png

  • Work Queues:与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
  • 应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

    订阅模式

    image.png
    在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:

  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)

  • C:消费者,消息的接收者,会一直等待消息到来
  • Queue:消息队列,接收消息、缓存消息
  • Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!

Java代码

  1. //生产者
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. // 1.创建连接工厂
  4. ConnectionFactory factory = new ConnectionFactory();
  5. // 2.设置参数
  6. factory.setHost("192.168.31.128"); //默认ip localhost
  7. factory.setPort(5672); //默认端口 5672
  8. factory.setVirtualHost("/"); //虚拟机 默认是/
  9. factory.setUsername("guest"); //用户名 默认guest
  10. factory.setPassword("guest"); //密码 默认guest
  11. // 3.创建连接
  12. Connection connection = factory.newConnection();
  13. // 4.创建Channel
  14. Channel channel = connection.createChannel();
  15. // 5.创建交换机
  16. /*
  17. exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
  18. 参数:
  19. 1. exchange:交换机名称
  20. 2. type:交换机类型
  21. DIRECT("direct"),:定向
  22. FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
  23. TOPIC("topic"),通配符的方式
  24. HEADERS("headers");参数匹配
  25. 3. durable:是否持久化
  26. 4. autoDelete:自动删除
  27. 5. internal:内部使用。 一般false
  28. 6. arguments:参数
  29. */
  30. String exchange="test_fanout";
  31. channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT,true,false,false,null);
  32. // 6.创建队列
  33. String queue1Name="test_fanout_queue1";
  34. String queue2Name="test_fanout_queue2";
  35. channel.queueDeclare(queue1Name,true,false,false,null);
  36. channel.queueDeclare(queue2Name,true,false,false,null);
  37. // 7.绑定队列和交换机
  38. /*
  39. queueBind(String queue, String exchange, String routingKey)
  40. 参数:
  41. 1. queue:队列名称
  42. 2. exchange:交换机名称
  43. 3. routingKey:路由键,绑定规则
  44. 如果交换机的类型为fanout ,routingKey设置为""
  45. */
  46. channel.queueBind(queue1Name,exchange,"");
  47. channel.queueBind(queue2Name,exchange,"");
  48. String body = "日志信息:小张查询了近15天的销量";
  49. // 8.发消息
  50. channel.basicPublish(exchange,"",null,body.getBytes());
  51. // 9.释放资源
  52. channel.close();
  53. connection.close();
  54. }
  55. ----------------------------------------------------------------------------------------
  56. //消费者
  57. public static void main(String[] args) throws IOException, TimeoutException {
  58. // 1.创建连接工厂
  59. ConnectionFactory factory = new ConnectionFactory();
  60. // 2.设置参数
  61. factory.setHost("192.168.31.128"); //默认ip localhost
  62. factory.setPort(5672); //默认端口 5672
  63. factory.setVirtualHost("/"); //虚拟机 默认是/
  64. factory.setUsername("guest"); //用户名 默认guest
  65. factory.setPassword("guest"); //密码 默认guest
  66. // 3.创建连接
  67. Connection connection = factory.newConnection();
  68. // 4.创建Channel
  69. Channel channel = connection.createChannel();
  70. // 5.创建队列
  71. channel.queueDeclare("test_fanout_queue2",true,false,false,null);
  72. // 6.接收消息
  73. DefaultConsumer consumer = new DefaultConsumer(channel){
  74. @Override
  75. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  76. System.out.println("body:" + new String(body));
  77. System.out.println("我是负责存入数据库的");
  78. }
  79. };
  80. channel.basicConsume("test_fanout_queue2",true,consumer);
  81. }

Rotting路由模式

image.png

  • P:生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key
  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列
  • C1:消费者,其所在队列指定了需要 routing key 为 error 的消息
  • C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息 ```java //生产者 public static void main(String[] args) throws IOException, TimeoutException { // 1.创建工厂
    1. ConnectionFactory factory = new ConnectionFactory();
    // 2.设置参数
    1. //2. 设置参数
    2. factory.setHost("172.16.98.133");//ip 默认值 localhost
    3. factory.setPort(5672); //端口 默认值 5672
    4. factory.setVirtualHost("/itcast");//虚拟机 默认值/
    5. factory.setUsername("heima");//用户名 默认 guest
    6. factory.setPassword("heima");//密码 默认值 guest
    // 3.创建连接
    1. Connection connection = factory.newConnection();
    // 4.创建channel
    1. Channel channel = connection.createChannel();
    // 5.创建交换机
    1. String exchangeName = "test_direct";
    2. channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
    3. //6. 创建队列
    4. String queue1Name = "test_direct_queue1";
    5. String queue2Name = "test_direct_queue2";
    6. channel.queueDeclare(queue1Name,true,false,false,null);
    7. channel.queueDeclare(queue2Name,true,false,false,null);
    // 7.绑定交换机
    1. channel.queueBind(queue1Name,exchangeName,"error");
    2. channel.queueBind(queue1Name,exchangeName,"warning");
    3. channel.queueBind(queue1Name,exchangeName,"info");
    4. channel.queueBind(queue1Name,exchangeName,"error");
    5. String body = "日志信息:张三调用了delete方法...出错误了。。。日志级别:error...";
    // 8.发送信息
    1. channel.basicPublish(exchangeName,"warning",null,body.getBytes());
    // 9.释放资源
    1. channel.close();
    2. connection.close();
    }

  1. //消费者
  2. public static void main(String[] args) throws IOException, TimeoutException {

// 1.创建工厂 ConnectionFactory factory = new ConnectionFactory(); // 2.设置参数 factory.setHost(“192.168.31.128”);//ip 默认值 localhost factory.setPort(5672); //端口 默认值 5672 factory.setVirtualHost(“/“);//虚拟机 默认值/ factory.setUsername(“guest”);//用户名 默认 guest factory.setPassword(“guest”);//密码 默认值 guest // 3. 创建连接 Connection Connection connection = factory.newConnection(); // 4.创建channel Channel channel = connection.createChannel(); // 5.接收消息 String queue1Name = “test_direct_queue1”; String queue2Name = “test_direct_queue2”;

  1. DefaultConsumer consumer = new DefaultConsumer(channel){
  2. @Override
  3. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  4. System.out.println("body:"+new String(body));
  5. System.out.println("将日志信息打印到控制台.....");
  6. }
  7. };
  8. channel.basicConsume(queue2Name,true,consumer);
  9. }

```

Topic通配符模式

image.png

  • 红色 Queue:绑定的是 usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到
  • 黄色 Queue:绑定的是 #.news ,因此凡是以 .news 结尾的 routing key 都会被匹配

°通配符*代表一个单词
°通配符#代表0~多个单词