1.1基本概念介绍

  1. **1.消息队列中间件 MQ中间件)**<br /> MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。<br /> **2.消息队列中间件的作用**
  • 异步处理
  • 应用解耦
  • 流量削峰
  • 消息通知

    缺点:
    1. 系统可用性降低
    2. 系统复杂性增加
    3.相关MQ产品介绍

RabbitMQ ActiveMQ RocketMQ Kafka
公司/社区 Rabbit Apache 阿里 Apache
开发语言 Erlang Java Java Scala&Java
协议支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定义协议 自定义协议
可用性 一般
单机吞吐量 一般 非常高
消息延迟 微秒级 毫秒级 毫秒级 毫秒以内
消息可靠性 一般 一般

追求可用性:Kafka、 RocketMQ 、RabbitMQ

追求可靠性:RabbitMQ、RocketMQ

追求吞吐能力:RocketMQ、Kafka

追求消息低延迟:RabbitMQ、Kafka

1.2RabbitMQ的安装和启动

**1.创建**
docker run \

 -e RABBITMQ_DEFAULT_USER=itcast \

 -e RABBITMQ_DEFAULT_PASS=123321 \

 -v mq-plugins:/plugins \

 --name mq \

 --hostname mq \

 -p 15672:15672 \

 -p 5672:5672 \

 -d \

 rabbitmq:3.8-management

2.访问
MQ服务器的端口 5672
后台管理端web服务器 15672

1.3AMQP协议模型和相关概念

broker: 一个中间件实例,及一个RabbitMQ 服务器<br />    message: 消息,就是要传输的数据<br />    producer: 消息的生产者,就是将数据发送给RabbitMQ的客户端<br />    consumer: 消息的消费者,用户获取并处理生产者发送的数据<br />    exchange: 交换器,消息需要从交换器按照不同的规则分发到指定的目的<br />    queue: 队列, 消息最终要存放到队列,然后由对应的消费者消费队列里<br />    routing key: 路由key,交换会根据路由名称 决定消息该发往哪里

4.4RabbitMQ支持的消息模式

1.4种交换器类型
Direct Exchange 路由
Fanout Exchange 广播
Topic Exchange 主题
Headers Exchange(性能差,不常用)
2.5种工作模式
- 简单模式:
生产发送消息至指定队列,消费者监听该队列消息
- 工作队列模式:
生产发送消息至指定队列,多个消费者监听一个队列
消息只会被消费一次
默认平均消费
可设置能者多劳模式
- 发布订阅-广播
Fanout Exchange
交换机接收消息后,广播给所有绑定队列
- 发布订阅-路由
Direct Exchange
交换机接收消息后,匹配对应的路由,发送给满足条件队列
- 发布订阅-主题
Topic Exchange
交换机接收消息后,匹配对应的路由,发送给满足条件队列
路由支持通配符: # 0 或 多个单词 * 代表1个单词

1.5Spring整合rabbitmq

1.实现
SpringAMQP提供了Rabbitmq 消息队列的相关调用实现
2.生产者工程:
步骤:
1. 引入依赖
spring-boot-start-amqp
2. 配置rabbitmq连接参数
3. 注入RabbitTemplate发送消息
3.消费者工程:
步骤:
1. 引入依赖
spring-boot-start-amqp
2. 配置rabbitmq连接参数
3. 配置监听 @RabbitListener 注解
4.队列交换机绑定
@Configuration + @Bean

@Configuration

public class FanoutConfig {

    /**

     * 声明交换机

     * @return Fanout类型交换机

     */

    @Bean

    public FanoutExchange fanoutExchange(){

        return new FanoutExchange("itcast.fanout");

    }

    /**

     * 第1个队列

     */

    @Bean

    public Queue fanoutQueue1(){

        return new Queue("fanout.queue1");

    }

    /**

     * 绑定队列和交换机

     */

    @Bean

    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){

        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);

    }

    /**

     * 第2个队列

     */

    @Bean

    public Queue fanoutQueue2(){

        return new Queue("fanout.queue2");

    }

    /**

     * 绑定队列和交换机

     */

    @Bean

    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){

        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);

    }

}
    @RabbitListener中通过注解声明
@RabbitListener(bindings = @QueueBinding(

            value = @Queue(name = "topic.queue1"),

            exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),

            key = "china.#"

    ))

    public void listenTopicQueue1(String msg){

        System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");

    }



    @RabbitListener(bindings = @QueueBinding(

            value = @Queue(name = "topic.queue2"),

            exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),

            key = "#.news"

    ))

    public void listenTopicQueue2(String msg){

        System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");

    }

5.消息转换器
Spring通过MessageConverter实现消息的序列化反序列化
默认使用JDK序列化
体积大,性能差

   @Bean
    public MessageConverter jsonMessageConverter(){

        return new Jackson2JsonMessageConverter();

    }