1.1基本概念介绍
**1.消息队列中间件 (MQ中间件)**<br /> MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。<br /> **2.消息队列中间件的作用**
- 异步处理
- 应用解耦
- 流量削峰
消息通知
缺点:
1. 系统可用性降低
2. 系统复杂性增加
3.相关MQ产品介绍
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka
1.2RabbitMQ的安装和启动
**1.创建**
docker run \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3.8-management
2.访问
MQ服务器的端口 5672
后台管理端web服务器 15672
1.3AMQP协议模型和相关概念
broker: 一个中间件实例,及一个RabbitMQ 服务器<br /> message: 消息,就是要传输的数据<br /> producer: 消息的生产者,就是将数据发送给RabbitMQ的客户端<br /> consumer: 消息的消费者,用户获取并处理生产者发送的数据<br /> exchange: 交换器,消息需要从交换器按照不同的规则分发到指定的目的<br /> queue: 队列, 消息最终要存放到队列,然后由对应的消费者消费队列里<br /> routing key: 路由key,交换会根据路由名称 决定消息该发往哪里
4.4RabbitMQ支持的消息模式
1.4种交换器类型
Direct Exchange 路由
Fanout Exchange 广播
Topic Exchange 主题
Headers Exchange(性能差,不常用)
2.5种工作模式
- 简单模式:
生产发送消息至指定队列,消费者监听该队列消息
- 工作队列模式:
生产发送消息至指定队列,多个消费者监听一个队列
消息只会被消费一次
默认平均消费
可设置能者多劳模式
- 发布订阅-广播
Fanout Exchange
交换机接收消息后,广播给所有绑定队列
- 发布订阅-路由
Direct Exchange
交换机接收消息后,匹配对应的路由,发送给满足条件队列
- 发布订阅-主题
Topic Exchange
交换机接收消息后,匹配对应的路由,发送给满足条件队列
路由支持通配符: # 0 或 多个单词 * 代表1个单词
1.5Spring整合rabbitmq
1.实现
SpringAMQP提供了Rabbitmq 消息队列的相关调用实现
2.生产者工程:
步骤:
1. 引入依赖
spring-boot-start-amqp
2. 配置rabbitmq连接参数
3. 注入RabbitTemplate发送消息
3.消费者工程:
步骤:
1. 引入依赖
spring-boot-start-amqp
2. 配置rabbitmq连接参数
3. 配置监听 @RabbitListener 注解
4.队列交换机绑定
@Configuration + @Bean
@Configuration
public class FanoutConfig {
/**
* 声明交换机
* @return Fanout类型交换机
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("itcast.fanout");
}
/**
* 第1个队列
*/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
/**
* 第2个队列
*/
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
@RabbitListener中通过注解声明
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}
5.消息转换器
Spring通过MessageConverter实现消息的序列化反序列化
默认使用JDK序列化
体积大,性能差
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}