在 RabbitMQ 在设计的时候,也就是消息的发布和消息的消费之间是解耦的。

不做任何配置的情况下,生产者是不知道消息是否真正到达RabbitMQ,也就是说消息发布操作不返回任何消息给生产者,也就是生产者发送消息了就发送了,中间有任何报错,生产者是不知道的.除非生产者抛出异常了,我对异常进行对应的捕获,当然这种情况不一定是消息丢失造成的,可能是服务器挂了等等之类的.

怎么确保消息的可靠性,会采取投递机制.

在 RabbitMQ 中,有不同的投递机制(生产者),但是每一种机制都对性能有一定的影响。一般来讲速度快的可靠性低,可靠性好的性能差,具体怎么使用需要根据你的应用程序来定,所以说没有最好的方式,只有最合适的方式。只有把你的项目和技术相结合,才能找到适合你的平衡。


生产者投递机制 - 图1
生产者进行消息发布权衡的时候其实是鱼和熊掌不可兼得,当你速度越快的时候,你的可靠性越低,当你可靠性越高的时候,你的速度越慢.
1. 无保障是速度最快的,发送完了就发了,也不去管数据到底是否被消费,速度最快,但是最不可靠.
2. 失败者通知,速度性能会消耗一点,但是没有关系,失败者通知至少可以确认你没有路由的情况,我的消息发送失败了.
3. 更好的方式当然使用发布者确认方式
4. 当然不能给我一个nck就行了,我还是希望我的消息一定要发送三到RabbitMQ上,所以我们会使用备用交换器
5. 如果你觉得还不保险,可以使用高可用队列,高可用队列是属于RabbitMQ的集群方案了.
6. 事务的可靠性其实是非常高的,因为事务基本上可以确认没有丢失,但是事务的性能是很低的
7. 如果还是不行就事务加高可用队列方式
8. 一般消息也好,交换器也好,队列也好,一般情况下都是非持久化的,如果你要确保最高可靠性,那么你就要把消息,交换器和队列都要做成持久化的
消息持久化是可靠性最高的,我像数据库一样写入到磁盘里面,当然这些消息是可靠性最高的,但是它的速度最慢

我们一般会使用失败者通知+发布者确认两个一起用,或者再加个备用交换器,三个一起用,其实这样就会完成一个可靠性非常高的方案,这个方案的速度还是比较快的.

生产者投递机制 - 图2

在 RabbitMQ 中实际项目中,生产者和消费者都是客户端,它们都可以完成申明交换器、申明队列和绑定关系,但是在我们的实战过程中,我们在生产者代码中申明交换器,在消费者代码中申明队列和绑定关系。
另外还要申明的就是,生产者发布消息时不一定非得需要消费者,对于 RabbitMQ 来说,如果是单纯的生产者你只需要生产者客户端、申明交换器、申明队列、确定绑定关系,数据就能从生产者发送至 RabbitMQ。

无保障


无保障的方式,通过 basicPublish 发布你的消息并使用正确的交换器和路由信息,你的消息会被接收并发送到合适的队列中。但是如果有网络问题,或者消息不可路由(就是路由键不匹配),或者 RabbitMQ 自身有问题的话,这种方式就有风险。所以无保证的消息发送一般情况下不推荐。

失败确认

案例:ZJJ_RabbitMQ_2019/11/03_17:25:11_flckv



消息发送失败的时候给通知

在发送消息时设置 mandatory 标志,告诉 RabbitMQ,如果消息不可路由,应该将消息返回给发送者,并通知失败。可以这样认为,开启 mandatory是开启故障检测模式。

如果你进行了这个设置的话,我的生产者发布消息到交换器里面,交换器里面只要不绑定广播路由(因为广播路由是不看你路由键,全盘接收的),如果路由键和队列找不到匹配关系,这个状态是消息不可路由,这个时候就会启动失败通知机制.

注意:它只会让 RabbitMQ 向你通知失败,而不会通知成功。如果消息正确路由到队列,则发布者不会受到任何通知。带来的问题是无法确保发布消息一定是成功的,因为通知失败的消息可能会丢失。

在失败通知handleReturn代码块儿里面可以进行业务处理代码,比如说重发,或者记录日志等等.

生产者投递机制 - 图3

监听器的小甜点
在信道关闭和连接关闭时,还有两个监听器可以使用
生产者投递机制 - 图4

事务(了解)

案例:ZJJ_RabbitMQ_2019/11/03_18:41:17_ewfrd


加入事务的的情况下比无保障权衡性能至少下降2到10倍,同时使生产者应用程序产生同步,这样的话和使用RPC远程调用没啥区别了.所以,事务权衡是比较鸡肋的功能


事务的实现主要是对信道(Channel)的设置,主要的方法有三个:
1. channel.txSelect()声明启动事务模式;
2. channel.txComment()提交事务;
3. channel.txRollback()回滚事务;

在发送消息之前,需要声明 channel 为事务模式,提交或者回滚事务即可。
开启事务后,客户端和 RabbitMQ 之间的通讯交互流程:
 客户端发送给服务器 Tx.Select(开启事务模式)
 服务器端返回 Tx.Select-Ok(开启事务模式 ok)
 推送消息
 客户端发送给事务提交 Tx.Commit
 服务器端返回 Tx.Commit-Ok
以上就完成了事务的交互流程,如果其中任意一个环节出现问题,就会抛出 IoException 移除,这样用户就可以拦截异常进行事务回滚,或决定要不要重复消息
生产者投递机制 - 图5
那么,既然已经有事务了,为何还要使用发送方确认模式呢,原因是因为事务的性能是非常差的。根据相关资料,事务会降低 2~10 倍的性能。

发送方确认模式(confirm)


案例:ZJJ_RabbitMQ_2019/11/03_18:57:07_k47r4


如何确保消息正确地发送至RabbitMQ?

我发布的消息要经过我生产者的确认,确认消费者收到了.
这种权衡既可以保证我的效率

确认的三种实现方式:
1. 一般确认(同步方式)
2. 批量确认(同步方式)
3. 异步监听确认

所以确认方式是多样的.

比如说我生产消息,如果你的消息没有绑定路由键或者是说你的路由键跟队列绑定的路由键是不能匹配的,那么就会发送失败者通知.
如果RabbitMQ内部发生了错误问题消息无法投入队列,这个时候会有返回标志,就是发送方确认的话,会返回一个nack,没有确认, 如果是发送成功了,就会返回ack,也就是说向你确认一下我这个消息已经成功的经过了交换器,然后经过这些路由判断,已经成功的到了我们的队列里面来了.
如果你没有成功,也就是说你经过交换器了,也有路由键,但是在投递过程中,内部发生了一些未知的异常,在这个时候就会返回一个nack 告诉生产者我发送失败了.

假如我发送了A数据过去到RabbitMQ,我怎么知道这条消息怎么个确认?肯定有个这样的机制,在发送的时候有一个id号唯一标识,这个id号是从1开始,也就是说我们每一条发给RabbitMQ的消息,它就会有一个编号,在发送消息的时候会指派 假如A数据id号是1,由这个id号为1和我们的RabbitMQ来进行相互的确认,这个标号为1的消息发送的时候是否有ACK, 如果有ACK就是发送成功了.或者是Nack发送失败了,就是由这个id来确认,不然的话判断起来会非常的麻烦(判断这个消息到底是什么东西).
这个id唯一标识是基于信道的


生产者投递机制 - 图6

Confirm 的三种实现方式:
方式一:channel.waitForConfirms()普通发送方确认模式;消息到达交换器,就会返回 true。
方式二:channel.waitForConfirmsOrDie()批量确认模式;使用同步方式等所有的消息发送之后才会执行后面代码,只要有一个消息未到达交换器就会抛出 IOException 异常。 注意,批量确认模式容易发生问题,你单个确认方式,比如说在for循环里面,如果一条有问题了你可以定位到,如果是批量确认模式,你来了一堆,你不知道是哪条有问题(因为是批量确认的). 如果你每十个每十个的批量确认,你效率可能要低一点,如果出现问题,你在10个里面确认要总比好几百个方便些.


方式三:channel.addConfirmListener()异步监听发送方确认模式;异步监听机制不需要生产者把消息全部发送完,我直接就启动一个发送者确认监听器,一旦有问题了,我再来进行对应的处理,
handleAck是消息成功的进入到队列里面的回调,handleNack是消息失败,没有进入到队列里面的回调


备用交换器

案例:ZJJ_RabbitMQ_2019/11/04_11:24:54_d9o3h



在第一次声明交换器时被指定,用来提供一种预先存在的交换器,如果主交换器无法路由消息(无法路由消息的话生产者发送的消息就丢失了,生产者发送数据我希望直接发送成功,不希望被失败者通知等等功能经常打回去),那么消息将被路由到这个新的备用交换器。

备用交换器就是处理如果我中间主交换器无法路由消息,消息又希望放到RabbitMQ中间,所以我会在主交换器中间绑定一个备用交换器,如果主交换器挂了,就会用备用交换器,把数据发送到备用交换器绑定的队列里面来.这种情况永远都是保证我们消息队列的高可用.

生产者投递机制 - 图7

如果发布消息时同时设置了 mandatory =true 失败者通知会发生什么?

如果主交换器无法路由消息,RabbitMQ 并不会通知发布者,因为,向备用交换器发送消息,表示消息已经被路由了。注意,新的备用交换器就是普通的交换器,没有任何特殊的地方。
失败者通知是我的消息没地方去我才返回给生产者的,而绑定备用交换器的话,消息是有地方去的.

使用备用交换器,向往常一样,声明 Queue 和备用交换器,把 Queue 绑定到备用交换器上。然后在声明主交换器时,通过交换器的参数,alternate-exchange,,将备用交换器设置给主交换器。
建议备用交换器设置为 faout 类型,Queue 绑定时的路由键设置为“#”

(二)使用场景总结


发送方式 发送TPS 发送结果反馈 可靠性 适用场景
可靠+同步 不丢失 1. 重要通知邮件,
2. 报名短信通知,
3. 营销短信系统等等
可靠+异步 不丢失 1. 用户视频上传后通知启动转码服务
2. 转码完成后通知推送转码结果等等
无保障 最快 可能丢失 适用于某些耗时非常短,但对可靠性要求并不高的场景,列如日志收集.不关键的东西收集等等.