Direct exchange(直连交换机)是根据消息携带的路由键(routing key)将消息投递给对应队列的
    Fanout exchange(扇型交换机)将消息路由给绑定到它身上的所有队列
    Topic exchange(主题交换机)队列通过路由键绑定到交换机上,然后,交换机根据消息里的路由值,将消息路 由给一个或多个绑定队列
    Headers exchange(头交换机)类似主题交换机,但是头交换机使用多个消息属性来代替路由键建立路由规则。 通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。
    1、扇形交换机
    发布订阅.png
    思路解读(重点理解):
    (1)一个生产者,多个消费者
    (2)每一个消费者都有自己的一个队列
    (3)生产者没有直接发消息到队列中,而是发送到交换机
    (4)每个消费者的队列都绑定到交换机上
    (5)消息通过交换机到达每个消费者的队列
    该模式就是FanoutExchange(扇型交换机)将消息路由给绑定到它身上的所有队列
    以用户发邮件案例讲解
    注意:交换机没有存储消息功能,如果消息发送到没有绑定消费队列的交换机,消息则丢失。

    1. / 生产者 交换机类型 producerfanout类型
    2. public class FanoutProducer {
    3. // 交换机名称
    4. private static final String DESTINATION_NAME = "my_fanout_estination";
    5. public static void main(String[] args) throws IOException, TimeoutException {
    6. // 1. 建立mq连接
    7. Connection connection = MQConnectionUtils.newConnection();
    8. // 2.创建通道
    9. Channel channel = connection.createChannel();
    10. // 3.生产者绑定交换机 参数1 交换机名称 参数2 交换机类型
    11. channel.exchangeDeclare(DESTINATION_NAME, "fanout",true);
    12. // 4.创建消息
    13. String msg = "my_fanout_destination_msg";
    14. System.out.println("生产者投递消息:" + msg);
    15. // 5.发送消息 my_fanout_estination routingKey
    16. channel.basicPublish(DESTINATION_NAME, "", null, msg.getBytes());
    17. // 6.关闭通道 和连接
    18. channel.close();
    19. connection.close();
    20. }
    21. }
    // 邮件消费者
    public class ConsumerEmailFanout {
        private static final String EMAIL_QUEUE = "email_queue_fanout";
        // 交换机名称
        private static final String DESTINATION_NAME = "my_fanout_estination";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            System.out.println("邮件消费启动");
            // 1. 建立mq连接
            Connection connection = MQConnectionUtils.newConnection();
            // 2.创建通道
            Channel channel = connection.createChannel();
            // 3.消费声明队列
            channel.queueDeclare(EMAIL_QUEUE, false, false, false, null);
            // 4.消费者队列绑定交换机
            channel.queueBind(EMAIL_QUEUE, DESTINATION_NAME, "");
            // 5.消费监听消息
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    String msg = new String(body, "UTF-8");
                    System.out.println("邮件消费者获取生产消息:" + msg);
                }
            };
            channel.basicConsume(EMAIL_QUEUE, true, defaultConsumer);
    
        }
    
    }
    


    // 邮件消费者
    public class ConsumerSmsFanout {
        private static final String SMS_QUEUE = "sms_queue_fanout";
        // 交换机名称
        private static final String DESTINATION_NAME = "my_fanout_estination";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            System.out.println("短信消费者启动");
            // 1. 建立mq连接
            Connection connection = MQConnectionUtils.newConnection();
            // 2.创建通道
            Channel channel = connection.createChannel();
            // 3.消费声明队列
            channel.queueDeclare(SMS_QUEUE, false, false, false, null);
            // 4.消费者队列绑定交换机
            channel.queueBind(SMS_QUEUE, DESTINATION_NAME, "");
            // 5.消费监听消息
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    String msg = new String(body, "UTF-8");
                    System.out.println("短信消费者获取生产消息:" + msg);
                }
            };
            channel.basicConsume(SMS_QUEUE, true, defaultConsumer);
    
        }
    
    }
    

    2、直接交换机direct (routingKey)
    交换机根据routingKey进行消息与消费者队列的匹配

    路由模式2.png

    // 生产者  交换机类型 producerfanout类型
    public class RoutingProducer {
    
        // 交换机名称
        private static final String DESTINATION_NAME = "my_routing_estination";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 1. 建立mq连接
            Connection connection = MQConnectionUtils.newConnection();
            // 2.创建通道
            Channel channel = connection.createChannel();
            // 3.生产者绑定交换机 参数1 交换机名称 参数2 交换机类型
            channel.exchangeDeclare(DESTINATION_NAME, "direct",true);
            String routingKey = "yushengjun";
            // 4.创建消息
            String msg = "my_routing_destination_msg" + routingKey;
            System.out.println("生产者投递消息:" + msg);
            // 5.发送消息 my_fanout_estination routingKey
            channel.basicPublish(DESTINATION_NAME, routingKey, null, msg.getBytes());
            // 6.关闭通道 和连接
            channel.close();
            connection.close();
    
        }
    
    }
    
    // 邮件消费者
    public class ConsumerSmsRouting {
        private static final String SMS_QUEUE = "sms_queue_routing";
        // 交换机名称
        private static final String DESTINATION_NAME = "my_routing_estination";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            System.out.println("短信消费者启动");
            // 1. 建立mq连接
            Connection connection = MQConnectionUtils.newConnection();
            // 2.创建通道
            Channel channel = connection.createChannel();
            // 3.消费声明队列
            channel.queueDeclare(SMS_QUEUE, false, false, false, null);
            // 4.消费者队列绑定交换机 绑定路由件 路由键 绑定邮件和短信
            channel.queueBind(SMS_QUEUE, DESTINATION_NAME, "email");
            channel.queueBind(SMS_QUEUE, DESTINATION_NAME, "sms");
            // 5.消费监听消息
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    String msg = new String(body, "UTF-8");
                    System.out.println("短信消费者获取生产消息:" + msg);
                }
            };
            channel.basicConsume(SMS_QUEUE, true, defaultConsumer);
    
        }
    
    }
    
    // 邮件消费者
    public class ConsumerEmailRouting {
        private static final String SMS_QUEUE = "eamil_queue_routing";
        // 交换机名称
        private static final String DESTINATION_NAME = "my_routing_estination";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            System.out.println("邮件消费者启动");
            // 1. 建立mq连接
            Connection connection = MQConnectionUtils.newConnection();
            // 2.创建通道
            Channel channel = connection.createChannel();
            // 3.消费声明队列
            channel.queueDeclare(SMS_QUEUE, false, false, false, null);
            // 4.消费者队列绑定交换机 绑定路由件
            channel.queueBind(SMS_QUEUE, DESTINATION_NAME, "email");
            // 5.消费监听消息
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    String msg = new String(body, "UTF-8");
                    System.out.println("短信消费者获取生产消息:" + msg);
                }
            };
            channel.basicConsume(SMS_QUEUE, true, defaultConsumer);
    
        }
    
    }
    

    3、主题交换机
    zhuti.png
    只是将生产者交换机有direct替换成topic,消费者叫routingKey替换成带或带#
    符号#:匹配一个或者多个词lazy.#可以匹配lazy.irs或者lazy.irs.cor
    **符号
    :只能匹配一个词lazy.可以匹配lazy.irs或者lazy.cor*

    RabbitMq.pptxRabbitMQ环境搭建.docx上课代码.zip