mq队列模型:
Basic Queue 简单队列模式 使用默认交换机
work Queue 工作队列模式 使用默认交换机
发布/订阅 模式
fanout: 广播 每个交换机的队列都会收到消息
Direct:路由 使用 指定的key 指定交换机的队列会收到消息
Topic: 话题 话题 用 ”.“ 链接 通配符 # 多个 * 一个 符合统配条件的交换机队列会收到消息
交换机的作用:
- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
- 不能缓存消息,路由失败,消息丢失
- FanoutExchange的会将消息路由到每个绑定的队列
mq的优势
- 吞吐量提升:无需等待订阅者处理完成,响应更快速
- 故障隔离:服务没有直接调用,不存在级联失败问题
- 调用间没有阻塞,不会造成无效的资源占用
- 耦合度极低,每个服务都可以灵活插拔,可替换
- 流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件
声明队列、交换机、绑定关系的Bean是什么?
- Queue 队列
- FanoutExchange 交换机
- Binding 绑定队列和交换机关系
fanout 广播模式下 消息队列发送方式
- 可以有多个队列
- 每个队列都要绑定到Exchenge(交换机)
- 生产者发送消息,只能发送到交换机,交换机决定发送到那个队列,生产者无法决定
- 交换机把消息发送给绑定过的 所有队列
- 订阅队列的消费者都能收到消息
Direct 模式下消息队列发送方式
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey
。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Routingkey与消息的
Routing key`完全一致,才会接收到消息
Topic模式下消息队列发送方式
- Topic
类型的
Exchange与
Direct相比,都是可以根据
RoutingKey把消息路由到不同的队列。只不过
Topic类型
Exchange可以让队列在绑定
Routing key` 的时候使用通配符!
Routingkey
一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
- 通配符规则:
#
:匹配一个或多个词*
:匹配不多不少恰好1个词
springAMQP
发送消息 和 接收消息代码
1 .引入 spring-amqp 依赖 2 . 注入RabitTemplate 3 编写逻辑,绑定队列 发送消息
配置文件
spring:
rabbitmq:
host: 192.168.200.130 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: root # 用户名
password: root # 密码
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
//发送消息
@Autowired
private RabbitTemplate rabbitTemplate;
public void testFanoutExchange() {
// 队列名称
String exchangeName = "itcast.fanout";
// 消息
String message = "hello, everyone!";
//中间冒号声明队列绑定交换机的 key 不写默认都可以收到
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
//接收消息
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}
发布订阅模式
基于@Bean 的声明队列和交换机
需要绑定 交换机和消息队列
fanout 广播 创建队列和创建交换机方式
//创建交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("itcast.fanout");
}
/**
* 第1个队列 这个是创建队列
*/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
基于注解声明队列和交换机
@RabbitListener(bindings = @QueueBinding(
//队列名称
value = @Queue(name = "direct.queue1"),
//交换机名称
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}