交换机

  1. 当消息提供者向RabbitMQ Server发布消息时,消息先过经过交换机,再根据不同的交换机以不同的规则,
  2. 按照RoutingKey规则绑定到队列,将消息放入队列中。
  3. NoName Exchange (无名交换机):
  4. 如果我们使用""标识交换机,默认交换机。在无名交换机中RountingKey是可以指定队列名称的。
  5. Direct Exchange(直连交换机):
  6. 将一个名为Q的消息队列与某个名为D的直连交换机通过值为R的路由键绑定在一起,当一个消息和路由键R发送
  7. 到直连交换机D上时,直连交换机D会把消息根据路由键R分发到Q队列,这种模式类似于一对一。
  8. Fanout Exchange(扇型交换机):
  9. 当一个消息发送到扇形交换机F上时,则扇形交换机F会将消息分别发送给所有绑定到F上的消息队列。
  10. 扇形交换机将消息路由给绑定到自身的所有消息队列,也就是说路由键在扇形交换机里没有作用,
  11. 故消息队列绑定扇形交换机时,路由键可为空。这个模式类似于广播。
  12. Topic Exchange(主题交换机):
  13. 主题交换机是一种发布/订阅的模式,结合了直连交换机与扇形交换机的特点,消息队列与主题交换机的绑定
  14. 也是通过路由键的。当一个Msg和路由键规则发送到一个主题交换机T时,T会根据路由键规则来筛选出符合
  15. 规则的绑定到自身消息队列的路由键(可能是1个,也可能是N个,也可能是0个),根据符合的路由键,
  16. 将消息发送到其对应的消息队列里。这个模式类似于多播,当消息的路由规则只匹配到一个路由键时,
  17. 此时主题交换机可以看作是直连交换机,当路由规则匹配了主题交换机上所有绑定的队列的路由键时,
  18. 此时主题交换机可以看作是扇形交换机。

消息模型

1.简单消息模型

简单模式.png

2.工作队列模型

工作队列模式.png

3.发布订阅模型

发布订阅模式.png

4.路由模型

路由模式.png

5.主题模型

主题模式.png

绑定

  1. 交换机和队列之间的联系,是通过RoutingKey路由规则绑定。

直接交换机

  1. 直接交换机一般使用路由模式绑定队列,指定路由规则选择性分发。

Consumer

  1. //路由名称
  2. private static final String EXCHANGE = "direct_routing";
  3. public static void main(String[] args) {
  4. //获取通道
  5. Channel channel = RabbitMqUtils.getChannel();
  6. try {
  7. //声明交换机
  8. channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
  9. //声明临时队列
  10. String queue = channel.queueDeclare().getQueue();
  11. //队列,交换机,路由Key
  12. channel.queueBind(queue, EXCHANGE, "google");
  13. //消费消息
  14. DeliverCallback deliverCallback = (consumerTag, message) -> {
  15. System.out.println(new String(message.getBody()));
  16. };
  17. CancelCallback cancelCallback = consumerTag -> {
  18. System.out.println("消息消费异常");
  19. };
  20. System.out.println("ConsumerA准备就绪");
  21. channel.basicConsume(queue, true, deliverCallback, cancelCallback);
  22. } catch (IOException e) {
  23. e.printStackTrace();
  24. }
  25. }

Provider

  1. //路由名称
  2. private static final String EXCHANGE = "direct_routing";
  3. public static void main(String[] args) throws IOException {
  4. Connection connection = RabbitMqUtils.getConnection();
  5. Channel channel = connection.createChannel();
  6. try {
  7. for (int i = 0; i < 10; i++) {
  8. //发送消息
  9. channel.basicPublish(EXCHANGE, "google", null, ("msg" + i).getBytes());
  10. }
  11. } catch (IOException e) {
  12. e.printStackTrace();
  13. } finally {
  14. RabbitMqUtils.close(connection, channel);
  15. }
  16. }

扇出交换机

Provider

  1. //交换机名称
  2. public static final String EXCHANGE = "my.fanout";
  3. public static void main(String[] args) {
  4. Channel channel = RabbitMqUtils.getChannel();
  5. try {
  6. for (int i = 0; i < 100; i++) {
  7. channel.basicPublish(EXCHANGE, "", null, ("message" + i).getBytes());
  8. }
  9. } catch (IOException e) {
  10. e.printStackTrace();
  11. }
  12. }

ConsumerA

  1. //交换机名称
  2. public static final String EXCHANGE = "my.fanout";
  3. public static void main(String[] args) {
  4. //获取队列
  5. Channel channel = RabbitMqUtils.getChannel();
  6. try {
  7. //声明交换机(交换机名称,交换机类型)
  8. channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);
  9. //声明一个临时队列(断开TCP连接即删除队列)
  10. String queue = channel.queueDeclare().getQueue();
  11. //绑定交换机和队列
  12. channel.queueBind(queue, EXCHANGE, "");
  13. System.out.println("ConsumerA准备就绪...");
  14. //消费消息
  15. DeliverCallback deliverCallback = (consumerTag, message) -> {
  16. System.out.println(new String(message.getBody()));
  17. };
  18. CancelCallback cancelCallback = consumerTag -> {
  19. System.out.println("消费错误...");
  20. };
  21. channel.basicConsume(queue, true, deliverCallback, cancelCallback);
  22. } catch (IOException e) {
  23. e.printStackTrace();
  24. }
  25. }

ConsumerB

  1. //交换机名称
  2. public static final String EXCHANGE = "my.fanout";
  3. public static void main(String[] args) {
  4. //获取队列
  5. Channel channel = RabbitMqUtils.getChannel();
  6. try {
  7. //声明交换机(交换机名称,交换机类型)
  8. channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);
  9. //声明一个临时队列(断开TCP连接即删除队列)
  10. String queue = channel.queueDeclare().getQueue();
  11. //绑定交换机和队列
  12. channel.queueBind(queue, EXCHANGE, "");
  13. System.out.println("ConsumerB准备就绪...");
  14. //消费消息
  15. DeliverCallback deliverCallback = (consumerTag, message) -> {
  16. System.out.println(new String(message.getBody()));
  17. };
  18. CancelCallback cancelCallback = consumerTag -> {
  19. System.out.println("消费错误...");
  20. };
  21. channel.basicConsume(queue, true, deliverCallback, cancelCallback);
  22. } catch (IOException e) {
  23. e.printStackTrace();
  24. }
  25. }

主题交换机

  1. Topic交换机的路由需要满足一定的条件,必须是一个单词列表。#表示匹配0个或多个单词,*表示一个单词。
  2. *.user.* 中间带user的三个单词组合
  3. user.# 多个单词,第一个是user

Provider

  1. //交换机名称
  2. private static final String EXCHANGE = "my.topic";
  3. public static void main(String[] args) {
  4. Connection connection = RabbitMqUtils.getConnection();
  5. Channel channel = null;
  6. try {
  7. channel = connection.createChannel();
  8. for (int i = 0; i < 20; i++) {
  9. channel.basicPublish(EXCHANGE, "v.e.rabbit", null, ("msg" + i).getBytes());
  10. }
  11. } catch (IOException e) {
  12. e.printStackTrace();
  13. } finally {
  14. RabbitMqUtils.close(connection, channel);
  15. }
  16. }

ConsumerA

  1. //交换机名称
  2. private static final String EXCHANGE = "my.topic";
  3. public static void main(String[] args) throws IOException {
  4. //获取通道
  5. Channel channel = RabbitMqUtils.getChannel();
  6. //声明交换机
  7. channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);
  8. //创建队列
  9. channel.queueDeclare("Q1", false, false, false, null);
  10. //交换机绑定队列
  11. channel.queueBind("Q1", EXCHANGE, "*.orange.*");
  12. //消费消息
  13. DeliverCallback deliverCallback = (consumerTag, message) ->
  14. System.out.println("NodeA:" + new String(message.getBody()));
  15. CancelCallback cancelCallback = consumerTag -> System.out.println("消息消费异常");
  16. channel.basicConsume("Q1", true, deliverCallback, cancelCallback);
  17. }

ConsumerB

  1. //交换机名称
  2. private static final String EXCHANGE = "my.topic";
  3. public static void main(String[] args) throws IOException {
  4. //获取通道
  5. Channel channel = RabbitMqUtils.getChannel();
  6. //声明交换机
  7. channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);
  8. //创建队列
  9. channel.queueDeclare("Q2", false, false, false, null);
  10. //交换机绑定队列
  11. channel.queueBind("Q2", EXCHANGE, "*.*.rabbit");
  12. channel.queueBind("Q2", EXCHANGE, "lazy.#");
  13. //消费消息
  14. DeliverCallback deliverCallback = (consumerTag, message)
  15. -> System.out.println("NodeB:" + new String(message.getBody()));
  16. CancelCallback cancelCallback = consumerTag -> System.out.println("消息消费异常");
  17. channel.basicConsume("Q2", true, deliverCallback, cancelCallback);
  18. }