1、Routing之订阅模型-Direct

    在广播模式(fanout)中,一条消息会被所有订阅的队列消费,但是某些场景我们希望不同的消息被不同的队列消费,这时候就需要Direct类型的Exchange。

    在Direct模型下:

    队列与交换机的绑定,不是任意绑定了,需要指定一个RoutingKey

    消息的发送方向Exchange发送消息时,也必须指定消息的RoutingKey

    Exchange不再把消息交给每一个绑定的队列,要根据消息的RoutingKey进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致,队列才会接受到消息

    官网的流程图:
    1.png

    p:生产者

    x:交换机(type为direct)

    c1:消费者1,其所在队列指定了需要RoutingKey为error的消息

    c2:消费者2,其所在队列指定了需要RoutingKey为info、error、warning的消息

    上代码:

    1. //生产者
    2. package com.alex.blog.rabbitmq.direct;
    3. import com.alex.blog.util.RabbitMQUtils;
    4. import com.rabbitmq.client.Channel;
    5. import com.rabbitmq.client.Connection;
    6. import java.io.IOException;
    7. public class Provider {
    8. public static void main(String[] args) throws IOException {
    9. //获取连接
    10. Connection connection = RabbitMQUtils.getConnection();
    11. //创建通道
    12. Channel channel = connection.createChannel();
    13. //将通道声明指定的交换机,若不存在该交换机则会自己生成 参数1:交换机的名称 参数2:交换机的类型
    14. channel.exchangeDeclare("logs_direct","direct");
    15. //发送消息
    16. channel.basicPublish("logs_direct","error",null,"hello,error".getBytes());
    17. channel.basicPublish("logs_direct","info",null,"hello,info".getBytes());
    18. channel.basicPublish("logs_direct","warning",null,"hello,warning".getBytes());
    19. //关闭资源
    20. RabbitMQUtils.closeConnectionAndChanel(channel,connection);
    21. }
    22. }

    消费者1:

    1. package com.alex.blog.rabbitmq.direct;
    2. import com.alex.blog.util.RabbitMQUtils;
    3. import com.rabbitmq.client.*;
    4. import java.io.IOException;
    5. public class Customer1 {
    6. public static void main(String[] args) throws IOException {
    7. Connection connection = RabbitMQUtils.getConnection();
    8. Channel channel = connection.createChannel();
    9. //通道绑定交换机
    10. channel.exchangeDeclare("logs_direct","direct");
    11. //交换机绑定队列(临时队列)
    12. String queueName1 = channel.queueDeclare().getQueue();
    13. //绑定交换机和队列
    14. channel.queueBind(queueName1,"logs_direct","error");
    15. //消费消息
    16. channel.basicConsume(queueName1,true,new DefaultConsumer(channel){
    17. @Override
    18. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    19. System.out.println("消费者1:"+new String(body));
    20. }
    21. });
    22. }
    23. }

    消费者2:

    1. package com.alex.blog.rabbitmq.direct;
    2. import com.alex.blog.util.RabbitMQUtils;
    3. import com.rabbitmq.client.*;
    4. import java.io.IOException;
    5. public class Customer2 {
    6. public static void main(String[] args) throws IOException {
    7. Connection connection = RabbitMQUtils.getConnection();
    8. Channel channel = connection.createChannel();
    9. //通道绑定交换机
    10. channel.exchangeDeclare("logs_direct","direct");
    11. //交换机绑定队列(临时队列)
    12. String queueName2 = channel.queueDeclare().getQueue();
    13. //绑定交换机和队列
    14. channel.queueBind(queueName2,"logs_direct","info");
    15. //消费消息
    16. channel.basicConsume(queueName2,true,new DefaultConsumer(channel){
    17. @Override
    18. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    19. System.out.println("消费者2:"+new String(body));
    20. }
    21. });
    22. }
    23. }

    消费者3:

    1. package com.alex.blog.rabbitmq.direct;
    2. import com.alex.blog.util.RabbitMQUtils;
    3. import com.rabbitmq.client.*;
    4. import java.io.IOException;
    5. public class Customer3 {
    6. public static void main(String[] args) throws IOException {
    7. Connection connection = RabbitMQUtils.getConnection();
    8. Channel channel = connection.createChannel();
    9. //通道绑定交换机
    10. channel.exchangeDeclare("logs_direct","direct");
    11. //交换机绑定队列(临时队列)
    12. String queueName3 = channel.queueDeclare().getQueue();
    13. //绑定交换机和队列
    14. channel.queueBind(queueName3,"logs_direct","warning");
    15. //消费消息
    16. channel.basicConsume(queueName3,true,new DefaultConsumer(channel){
    17. @Override
    18. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    19. System.out.println("消费者3:"+new String(body));
    20. }
    21. });
    22. }
    23. }

    然后我们按顺序启动消费者1、2、3,再启动生产者,后台页面如下:

    消费者1:
    2.png

    消费者2:
    3.png

    消费者3:
    4.png

    可以看到,消息的发送是根据交换机的Routingkey与消息的RoutingKey来对应发送的,当然一个消费者也可以绑定多个RoutingKey,此处我们修改消费者3来演示,让消费者3也可以接受到RoutingKey为info的消息,新增代码:

    1. //绑定交换机和队列
    2. channel.queueBind(queueName3,"logs_direct","info");

    结果如下:
    5.png

    2、Routing之订阅模型-Topic

    Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列,只不过Topic类型的Exchange可以让队列在绑定RoutingKey的时候使用通配符!这种RoutingKey一般都是由一个或者多个单词组成,多个单词之间用“.”分割。
    6.png

    支持的通配符:

    1. # 匹配一个或多个词,比如 goods.addgoods.add.sub都能匹配到。
    2. * 匹配一个,比如goods.delete能匹配到,goods.delete.haha就匹配不到。

    上代码:

    1. //生产者
    2. package com.alex.blog.rabbitmq.topic;
    3. import com.alex.blog.util.RabbitMQUtils;
    4. import com.rabbitmq.client.Channel;
    5. import com.rabbitmq.client.Connection;
    6. import java.io.IOException;
    7. public class Provider {
    8. public static void main(String[] args) throws IOException {
    9. //获取连接
    10. Connection connection = RabbitMQUtils.getConnection();
    11. //创建通道
    12. Channel channel = connection.createChannel();
    13. //将通道声明指定的交换机,若不存在该交换机则会自己生成 参数1:交换机的名称 参数2:交换机的类型
    14. channel.exchangeDeclare("logs_topic","topic");
    15. String routingKey = "user.save.info";
    16. //发送消息
    17. channel.basicPublish("logs_topic",routingKey,null,("动态路由发送消息,RoutingKey为:"+routingKey).getBytes());
    18. RabbitMQUtils.closeConnectionAndChanel(channel,connection);
    19. }
    20. }

    这里,发送消息的RoutingKey为:user.save.info

    消费者1、2、3、4其中的区别只有RoutingKey不同,所以只贴一份代码:

    消费者:

    1. package com.alex.blog.rabbitmq.topic;
    2. import com.alex.blog.util.RabbitMQUtils;
    3. import com.rabbitmq.client.*;
    4. import java.io.IOException;
    5. public class Customer1 {
    6. public static void main(String[] args) throws IOException {
    7. Connection connection = RabbitMQUtils.getConnection();
    8. Channel channel = connection.createChannel();
    9. //通道绑定交换机
    10. channel.exchangeDeclare("logs_topic","topic");
    11. //交换机绑定队列(临时队列)
    12. String queueName1 = channel.queueDeclare().getQueue();
    13. //绑定交换机和队列
    14. channel.queueBind(queueName1,"logs_topic","user.*.*");
    15. //消费消息
    16. channel.basicConsume(queueName1,true,new DefaultConsumer(channel){
    17. @Override
    18. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    19. System.out.println("消费者1:"+new String(body));
    20. }
    21. });
    22. }
    23. }
    1. 消费者1RoutingKey为:user.*.*
    2. 消费者2RoutingKey为:*.save.*
    3. 消费者3RoutingKey为:*.*.info
    4. 消费者4RoutingKey为:#.info

    结果为:

    消费者1:
    7.png

    消费者2:
    8.png

    消费者3:
    9.png

    消费者4:
    10.png