MQ
消息队列,先进先出,是一种可以跨进程的通信机制,在上下游传递消息,用于逻辑解耦+物理解耦的消息通信服务。
三大功能:
- 流量削峰。高峰时期的流量,超过系统处理能力的部分暂存在队列中,达到削弱峰值的作用。
- 应用解耦。微服务系统下,其中一个服务宕机,对于其的请求可以存储在消息队列中,短时间上线后可以正常处理,不至于一个服务宕机其余服务一起宕机。
- 异步处理。对于需要长时间处理的请求,请求方可以不用一直处于等待结果状态,处理完后返回给消息队列,消息队列自动通知请求方。
消息队列的选择:
- kafka:大数据,日志采集功能,高吞吐量
- rocketMQ:可靠性要求高,应对高峰冲击,保障稳定
- rabbit MQ:数据量不是很大,中小型公司,时效性微秒级
RabbitMQ
不处理消息,只是接收,存储和转发消息。基于erlang语言。
四大核心角色:
- 生产者:生产消息
- 交换机:绑定多个队列,负责接受消息,推送消息到指定队列
- 队列:存储消息,本质是大的消息缓冲区
- 消费者:消费消息
六种核心模式:
- 简单模式(Hello world)
- 工作模式(work queues)
- 发布订阅模式(publish/subscribe)
- 路由模式(routing)
- 主题模式(topics)
- 发布确认模式(publish confirms)
todo:六种模式表格
名词解释:
- broker:接收和分发消息的应用,rmq的服务器,消息实体
- exchange:交换机,消息到达broker的第一站,根据分发规则查询routingkey,确定分发队列。
- produce:生产者
- consumer:消费者
- queue:队列
- channel:信道,每个连接中的逻辑连接,可以存在多个信道,轻量级的connection
- connection:生产者,消费者于broker之间的tcp连接
- virtual host:多租户情况下的虚拟分组。一个borker中存在多个vhost,一个vhost中存在多个交换机及其对应的队列。
- binding:交换机和队列之间的虚拟连接,包含routing key,绑定信息存放到交换机的查询表中。
安装
略
基于erlang语言
查看rmq状态status rabbitmq-server.service
web后台管理插件
端口:15672
初始用户:guest
初始密码:guest
创建账号
todo p12
代码测试
略
todo:helloword模式,回调函数
工作队列模式
主要思想是避免立即执行资源密集型任务,后台分配多个线程轮询接受消息。【一条消息只能被处理一次,不能被处理多次】
存在多个消费者消费同一个队列中的消息,多个消费者采用轮询的模式进行消费。
消息应答机制
rmq一旦向消费者发送了一条消息,那么消息就会被标记删除。
并且消费者完成消费需要一段时间,如果其中一个消费者处理任务过程中挂掉了,此时消息会丢失。
为了保证消息发送过程中不丢失,rmq引入了消息应答机制,消费者在接收到消息并处理该消息后,告诉rmq已经处理完了,rmq可以删除消息了
- 自动应答:消息发送后立即被认为已经传送成功,【弊端,消费者接收到消息之前宕机,会造成数据丢失】,这种模式仅适用于在消费者可以高效并以某种速率处理这些消息的情况下使用。、
- 手动应答
- 【channel.basicAck】:肯定确认,成功接受并处理消息
- 【channel.basicNack】:否定确认
- 【channel.basicReject】:否定确认,拒绝消息,可以直接丢弃
- 【multiple】:批量应答。true会将信道中当前消息之前未应答的消息进行批量应答。false则之后应答自己。
消息自动重新入队
如果由于消费者失去连接,导致消息未发送ack确认,rmq会将消息进行重新排队。
如果此时其他消费者可以处理,他将很快重新分发给另一个消费者。这样即使存在消费者丢失连接的情况,数据也不会丢失。
设置了手动应答之后,消息不会丢失。
持久化
消息持久化能保证在mq服务停止后发来的消息不丢失。默认情况下,mq停止时,忽视队列和消息不自动持久化。
持久化要做两件事:队列持久化和消息持久化
队列持久化
队列的持久化需要在声明队列的时候进行持久化。
如果队列声明时没有持久化,需要持久化需要删除队列重新创建。
持久化的对象会在管理web中对列表中的features中进行标明。【D】
未经持久化的队列是临时队列,断开了消费者的连接将会被删除。
boolean durable = true;
channel.queueDeclare("queue_name",durable, false ,false, null);
// 声明临时队列
消息持久化
生产者发布消息时需要声明持久化。
消息持久化并不能完全保证消息不丢失。如,当消息准备保存到磁盘还未存储完时,宕机会造成丢失。对于简单任务队列可以,更强力度的保证间发布确认。
// 消息持久化需要传入参数:【MessageProperties.PERSISTENT_TEXT_PLAIN】
channel.basicpublish("交换机名称","队列名", MessageProperties.PERSISTENT_TEXT_PLAIN,
null, message.getbytes("utf-8"));
不公平分发
轮询分发的模式并不合理。容易造成数据延迟。
不公平分发遵循能者多劳原则。充分利用机器性能。
默认是轮询分发(0)。【需要在消费方进行修改,将参数修改为1】
// 不公平分发参数
int prefetchCount = 1;
channel.basicQos(prefetchCount);
预取值
prefetch:预取值,指定消费者被分配的消息数量。【定义信道中允许存在的未确认消息的最大数量】
一旦达到配置的数量,rmq将停止在通道上传递更多的消息。
优点:可以根据情况配置处理数量,提高向消费者传递数据的速度。
存在的原因:由于消息的发送是异步的,消息消费的手动确认也是异步的。所以,任何时候信道中消息的数量不止一个,就存在一个未确认消息的缓冲区,如果对缓冲区不加限制可能大批量的未确认消息涌入信道,造成不必要的RAM消耗,通过自定义缓冲区中消息数量的方式来解决。
【channel.basicQos】
- 0:轮询分发模式
- 1:不公平分发模式
- 2-n:预取值
进入预取值缓冲区的数据不会被其他消费者再消费。如果消费者掉线,可能会丢失数据。
todo:完善工作队列模式所有的信息
发布确认模式
保证数据不丢失的要求:
- 队列必须设置持久化
- 消息设置持久化
- 发布确认
发布确认的原理:
所有进入信道的消息会被指派一个id(从1开始),
消息进入队列之后会发送一个确认消息给生产者,如果消息是需要持久化的,确认消息在持久化完成后发送
生产者接收到确认消息,
发布确认方法
开启:channel.confirmSelect();
- 单个确认发布
- 批量确认发布
- 异步确认发布
单个确认发布
同步确认发布,只有上一条消息被确认了才能发送下一条消息,指定时间内没有被确认将抛出异常。
缺点:发布速度特别慢,没有确认会阻塞后续消息的发布。最多每秒数百条消息的吞吐量。
批量确认发布
同步确认发布,先发布一批消息,然后一起确认。速度较单个发布快。
缺点:发生故障时,无法确定出现问题的消息。
异步确认发布
异步确认发布,利用回调函数达到消息可靠性传递。即使出现消息确认失败,也可以再次发布。
todo:回调函数
未确认消息重新发布
未确认的消息放到一个基于内存的能被发布线程访问的队列中。比如ConcurrentLinkedQueue队列在确认回调与发布线程之间进行消息传递。
通过回调函数能够得到未确认的消息列表
交换机
作用:通过交换机把同一个消息发送给多个消费者。交换机绑定多个队列,每个队列里的消息还是只能被消费一次。
所有的消息不能直接发送给队列,只能发送给交换机(helloworld,任务队列模式都是使用的默认交换机)。
交换机一方面接受生产者的消息,另一方面绑定队列,将消息放到队列。
交换机通过不同的routingkey绑定不同的队列。发送消息时根据不同的routingkey将消息放到不同的队列,由不同的消费者进行消费。
交换机类型
- 直接direct
- 主题topic
- 标题headers
- 扇出fanout
todo:默认交换机类型,无名类型。“”
发布订阅模式fanout
fanout扇出(广播),交换机将锁接受的所有消息广播到绑定的所有队列中。
扇出交换机:交换机和队列绑定的routingkey都为空相同(多重绑定),消息发送到所有队列
直接交换机:交换机和队列绑定的routingkey不同,消息发送到不同的队列。
routingkey的作用:
绑定交换机和队列
todo:会存在两个路由对应同一队列么
主题模式
topic交换机的routingkey格式:
- 一个单词列表,以点分隔
- 单词列表不能超过255。如abc.def
- 【】代表一个个单词。如:abc.,会匹配所有abc后跟一个单词的routingkey
- 【#】代表零个或多个单词。如abc.#,会匹配abc开头的所有routingkey。
- 只有【#】号时类似fanout交换机,没有【#】【*】时类似直接交换机
todo:各个交换机对比,topic和direct和fanout的区别
交换机 | 特点 | 实例 |
---|---|---|
- 示例步骤:
- 消息TTL过期
- 队列已满,队列达到最大长度,无法添加数据
- 消息被拒绝,消费方拒绝接受并且不重新放回队列