消息中间件

3.1 点对点
PTP点对点:使用queue作为通信载体
说明:
消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。
消息被消费以后,queue中不再存储,所以消息消费者不可能消费到已经被消费的消息。 Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
消息中间件 - 图1
3.2 发布/订阅
Pub/Sub发布订阅(广播):使用topic作为通信载体
说明:
消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。
queue实现了负载均衡,将producer生产的消息发送到消息队列中,由多个消费者消费。但一个消息只能被一个消费者接受,当没有消费者可用时,这个消息会被保存直到有一个可用的消费者。
topic实现了发布和订阅,当你发布一个消息,所有订阅这个topic的服务都能得到这个消息,所以从1到N个订阅者都能得到一个消息的拷贝。
消息中间件 - 图2

应用场景

5.1 **异步通信
有些业务不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
送快递要等到送到哪才能进行下一步,现在生产者送完就走,不用等着,放入队列里就好,解耦合,且异步
5.2 **解耦
降低工程间的强依赖程度,针对异构系统进行适配。在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。通过消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口,当应用发生变化时,可以独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
5.3 **冗余
有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
5.4 **扩展性
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。便于分布式扩容。
5.5 **过载保护
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量无法提取预知;如果以为了能处理这类瞬间峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
5.6 **可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
5.7 **顺序保证
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。
5.8 **缓冲
在任何重要的系统中,都会有需要不同的处理时间的元素。消息队列通过一个缓冲层来帮助任务最高效率的执行,该缓冲有助于控制和优化数据流经过系统的速度。以调节系统响应时间。
5.9 **数据流处理**
分布式系统产生的海量数据流,如:业务日志、监控数据、用户行为等,针对这些数据流进行实时或批量采集汇总,然后进行大数据分析是当前互联网的必备技术,通过消息队列完成此类数据收集是最好的选择。

同步和异步区别?

同步可以理解为在执行完一个函数或方法之后,一直等待系统返回值或消息,这时程序是出于阻塞的,只有接收到返回的值或消息后才往下执行其他的命令
异步执行完函数或方法后,不必阻塞性地等待返回值或消息,只需要向系统委托一个异步过程,那么当系统接收到返回值或消息时,系统会自动触发委托的异步过程,从而完成一个完整的流程。

同步,就是实时处理(如打电话),比如服务器一接收客户端请求,马上响应,这样客户端可以在最短的时间内得到结果,但是如果多个客户端,或者一个客户端发出的请求很频繁,服务器无法同步处理,就会造成涌塞。
同步如打电话,通信双方不能断(我们是同时进行,同步),你一句我一句,这样的好处是,对方想表达的信息我马上能收到,但是,我在打着电话,我无法做别的事情。
异步,就是分时处理(如收发短信),服务器接收到客户端请求后并不是立即处理,而是等待服务器比较空闲的时候加以处理,可以避免涌塞。
异步如收发收短信,对比打电话,打电话我一定要在电话的旁边听着,保证双方都在线,而收发短信,对方不用保证此刻我一定在手机旁,同时,我也不用时刻留意手机有没有来短信。这样的话,我看着视频,然后来了短信,我就处理短信(也可以不处理),接着再看视频。
需要注意的是, 由于数据写入消息队列后立即返回给用户, 数据在后续的业务校验、写数据库等操作可能失败, 因此在使用消息队列进行业务异步处理后,需要适当修改业务流程进行配合,如订单提交后, 订单数据写入消息队列, 不能立即返回用户订单提交成功,需要在消息队列的订单消费者进程真正处理完该订单,甚至商品出库后, 再通过电子邮件或SMS 消息通知用户订单成功,以免交易纠纷。
在不使用消息队列的情况下,用户的请求数据直接写入数据库,在高并发的情况下,会对数据库造成巨大的压力, 同时也使得响应延迟加剧。在使用消息队列后,用户请求的数据发送给消息队列后立即返回,再由消息队列的消费者进程(通常情况下, 该进程
通常独立部署在专门的服务器集群上)从消息队列中获取数据, 异步写入数据库。由于消息队列服务器处理速度远快于数据库(消息队列服务器也比数据库具有更好的伸缩性),因此用户的响应延迟可得到有效改善。
消息队列具有很好的削峰作用一一即通过异步处理, 将短时间高并发产生的事务消息存储在消息队列中,从而削平高峰期的并发事务。在电子商务网站促销活动中,合理使用消息队列,可有效抵御促销活动刚开始大量涌入的订单对系统造成的冲击。

7.1 RocketMQ(为何选用做秒杀?)

阿里系下开源的一款分布式、队列模型的消息中间件,原名Metaq,3.0版本名称改为RocketMQ,是阿里参照kafka设计思想使用java实现的一套mq。同时将阿里系内部多款mq产品(Notify、metaq)进行整合,只维护核心功能,去除了所有其他运行时依赖,保证核心功能最简化,在此基础上配合阿里上述其他开源产品实现不同场景下mq的架构,目前主要多用于订单交易系统。
具有以下特点:
· 能够保证严格的消息顺序
· 提供针对消息的过滤功能
· 提供丰富的消息拉取模式
· 高效的订阅者水平扩展能力 为了提升Broker的可用性,以及提升服务器性能,通常会做集群的部署。每个Broker保存总数据的一部分,因此可以实现横向扩展。
· 实时的消息订阅机制
· 亿级消息堆积能力
RocketMQ是纯java编写 开源
· 既实现了系统之间的解耦,又可以保证最终的数据一致性
RocketMQ单机大约能承受10万QPS的请求。
综合以上对比,可以得出,RocketMQ 和 Kafka 的吞吐量最高,在高吞吐量的业务要求下,首选这两个mq。
Kafka 在topic 达到一定量以上时,吞吐量急剧下降;而RocketMQ影响较小。
NameServer是一个功能齐全的服务器,其角色类似Dubbo中的Zookeeper,但NameServer与Zookeeper相比更轻量。主要是因为每个NameServer节点互相之间是独立的,没有任何信息交互
消息中间件 - 图3
Message
Message(消息)就是要传输的信息。
一条消息必须有一个主题(Topic),主题可以看做是你的信件要邮寄的地址。
一条消息也可以拥有一个可选的标签(Tag)和额处的键值对,它们可以用于设置一个业务 Key 并在 Broker 上查找此消息以便在开发期间查找问题。
Topic
Topic(主题)可以看做消息的规类,它是消息的第一级类型。比如一个电商系统可以分为:交易消息、物流消息等,一条消息必须有一个 Topic 。
Topic 与生产者和消费者的关系非常松散,一个 Topic 可以有0个、1个、多个生产者向其发送消息,一个生产者也可以同时向不同的 Topic 发送消息。
一个 Topic 也可以被 0个、1个、多个消费者订阅。
Tag
Tag(标签)可以看作子主题,它是消息的第二级类型,用于为用户提供额外的灵活性。使用标签,同一业务模块不同目的的消息就可以用相同 Topic 而不同的 Tag 来标识。比如交易消息又可以分为:交易创建消息、交易完成消息等,一条消息可以没有 Tag 。
标签有助于保持您的代码干净和连贯,并且还可以为 RocketMQ 提供的查询系统提供帮助。
Group
分组,一个组可以订阅多个Topic。
分为ProducerGroup,ConsumerGroup,代表某一类的生产者和消费者,一般来说同一个服务可以作为Group,同一个Group一般来说发送和消费的消息都是一样的
Queue
在Kafka中叫Partition,每个Queue内部是有序的,在RocketMQ中分为读和写两种队列,一般来说读写队列数量一致,如果不一致就会出现很多问题。
Message Queue
Message Queue(消息队列),主题被划分为一个或多个子主题,即消息队列。
一个 Topic 下可以设置多个消息队列,发送消息时执行该消息的 Topic ,RocketMQ 会轮询该 Topic 下的所有队列将消息发出去。
消息的物理管理单位。一个Topic下可以有多个Queue,Queue的引入使得消息的存储可以分布式集群化,具有了水平扩展能力。
Offset
在RocketMQ 中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用Offset 来访问,Offset 为 java long 类型,64 位,理论上在 100年内不会溢出,所以认为是长度无限。
也可以认为 Message Queue 是一个长度无限的数组,Offset 就是下标。
消息消费模式
消息消费模式有两种:Clustering(集群消费)和Broadcasting(广播消费)。
默认情况下就是集群消费,该模式下一个消费者集群共同消费一个主题的多个队列,一个队列只会被一个消费者消费,如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。
而广播消费消息会发给消费者组中的每一个消费者进行消费。
Message Order
Message Order(消息顺序)有两种:Orderly(顺序消费)和Concurrently(并行消费)。
顺序消费表示消息消费的顺序同生产者为每个消息队列发送的顺序一致,所以如果正在处理全局顺序是强制性的场景,需要确保使用的主题只有一个消息队列。
并行消费不再保证消息顺序,消费的最大并行数量受每个消费者客户端指定的线程池限制。
还支持各种高级的功能,比如说延迟消息、事务消息、消息回溯、死信队列、消息积压,RocketMQ的官方文档相对简单一些,但是Kafka和RabbitMQ的官方文档就非常的全面和详细,这可能是RocketMQ目前唯一的缺点。

一次完整的通信流程是怎样的?

Producer 与 NameServer集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Broker Master 建立长连接,且定时向 Broker 发送心跳。
Producer 只能将消息发送到 Broker master,但是 Consumer 则不一样,它同时和提供 Topic 服务的 Master 和 Slave建立长连接,既可以从 Broker Master 订阅消息,也可以从 Broker Slave 订阅消息。
消息中间件 - 图4

消息去重

同步默认去重试两次,异步只重试一次。也可设置重试次数为0,保证效率

单向发送(无重试)保证效率
单向发送是指只负责发送消息而不等待服务器回应且没有回调函数触发,适用于某些耗时非常短但对可靠性要求并不高的场景,例如日志收集。
并不是所有异常都会去重试,只有生产者客户端异常,broker,消费端等发生异常才回去重试,比如超时异常就直接返回失败
应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
通过这段源码很明显可以看出以下几点
1. 如果是异步发送 那么重试次数只有1次
2. 对于同步而言,超时异常也是不会再去重试
3.如果发生重试是在一个for 循环里去重试,所以它是立即重试而不是隔一段时间去重试。
注重消费端重试
一般设置成这样的代码这里的代码意思很明显: 主动抛出一个异常,然后如果超过3次,那么就不继续重试下去,而是将该条记录保存到数据库由人工来兜底。
Timeout
说明 这里的超时异常并非真正意义上的超时,它指的是指获取消息后,因为某种原因没有给RocketMQ返回消费的状态,那么 RocketMQ**会认为该消息没有发送,会一直发送。因为它会认为该消息根本就没有发送给消费者,所以肯定没消费。
及时关掉进程,再次重启之后还是会消费的,当获得
当前消费重试次数为 = 0 后 , 关掉该进程。再重新启动该进程,那么依然能够获取该条消息
消费者默认是重试16次**,16次之后就不再重试。
并且重试时间间隔逐步增加1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h
消息中间件 - 图5

消息订阅:

过滤,根据tag标签过滤
* 1.匹配所有tag,
2. 原值相等匹配 “add”
3. ||或者匹配 add||update

消息过滤

消息中间件 - 图6
bysql方法
消息中间件 - 图7
注意过滤器要在配置文件中开启,否则报错
消息中间件 - 图8

去重原则:使用业务端逻辑保持幂等性
幂等性:就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用,数据库的结果都是唯一的,不可变的。
只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样,需要业务端来实现。
去重策略:
保证每条消息都有唯一编号(比如唯一流水号),且保证消息处理成功与去重表的日志同时出现。
建立一个消息表,拿到这个消息做数据库的insert操作。给这个消息做一个唯一主键(primary key)或者唯一约束,那么就算出现重复消费的情况,就会导致主键冲突,那么就不再处理这条消息。
消息重复
消息领域有一个对消息投递的QoS定义,分为:
最多一次(At most once)
至少一次(At least once)
仅一次( Exactly once)
RocketMQ没有内置消息去重的解决方案,最新版本是否支持还需确认。
不同的消息队列发送的确认信息形式不同,例如RabbitMQ是发送一个ACK确认消息,RocketMQ是返回一个CONSUME_SUCCESS成功标志,Kafka实际上有个offset的概念。

消息的可用性

RocketMQ对消息的刷盘提供了同步和异步的策略来满足我们的,当我们选择同步刷盘之后,如果刷盘超时会给返回FLUSH_DISK_TIMEOUT,如果是异步刷盘不会返回刷盘相关信息,选择同步刷盘可以尽最大程度满足我们的消息不会丢失。
RocketMQ采用的是混合型的存储结构,即为Broker单个实例下所有的队列共用一个日志数据文件(即为CommitLog)来存储。
而Kafka采用的是独立型的存储结构,每个队列一个文件。
这里帅丙认为,RocketMQ采用混合型存储结构的缺点在于,会存在较多的随机读操作,因此读的效率偏低。同时消费消息需要依赖ConsumeQueue,构建该逻辑消费队列需要一定开销。

RocketMQ 刷盘实现

Broker 在消息的存取时直接操作的是内存(内存映射文件)

顺序消息:

在某些业务中,consumer在消费消息时,是需要按照生产者发送消息的顺序进行消费的,比如在电商系统中,订单的消息,会有创建订单、订单支付、订单完成,如果消息的顺序发生改变,那么这样的消息就没有意义了。 生产者,消费者都要保证,从同一个队列
消息中间件 - 图9

这玩意是阿里开源的,生产者消费者一般需要保证顺序消息的话,可能就是一个业务场景下的,比如订单的创建、支付、发货、收货。
那这些东西是不是一个订单号呢?一个订单的肯定是一个订单号的说,那简单了呀。
一个topic下有多个队列,为了保证发送有序,RocketMQ提供了MessageQueueSelector队列选择机制,他有三种实现: 用唯一id进行hash取模,hash值不唯一
我们可使用Hash取模法,让同一个订单发送到同一个队列中,再使用同步发送,只有同个订单的创建消息发送成功,再发送支付消息。这样,我们保证了发送有序。
RocketMQ的topic内的队列机制,可以保证存储满足FIFO(First Input First Output 简单说就是指先进先出),剩下的只需要消费者顺序消费即可。
RocketMQ仅保证顺序发送,顺序消费由消费者业务保证!!!
这里很好理解,一个订单你发送的时候放到一个队列里面去,你同一个的订单号Hash一下是不是还是一样的结果,那肯定是一个消费者消费,那顺序是不是就保证了?

具体实现

生产者 每次根据唯一id hash取模队列个数,传入选择队列中
消息中间件 - 图10
消费者 重写接口的listenOrder方法就可
消息中间件 - 图11

分布式事务:

Half Message(半消息)
是指暂不能被Consumer消费的消息。Producer 已经把消息成功发送到了 Broker 端,但此消息被标记为暂不能投递状态,处于该种状态下的消息称为半消息。需要 Producer
对消息的二次确认后,Consumer才能去消费它。
消息回查
由于网络闪段,生产者应用重启等原因。导致 Producer 端一直没有对 Half Message(半消息) 进行 二次确认。这是Brock服务器会定时扫描长期处于半消息的消息,会
主动询问 Producer端 该消息的最终状态(Commit或者Rollback),该消息即为 消息回查。
1.A服务先发送个Half Message给Brock端,消息中携带 B服务 即将要+100元的信息。
2.当A服务知道Half Message发送成功后,那么开始第3步执行本地事务。
3.执行本地事务(会有三种情况1、执行成功。2、执行失败。3、网络等原因导致没有响应)
4.如果本地事务成功,那么Product像Brock服务器发送Commit,这样B服务就可以消费该message。
5.如果本地事务失败,那么Product像Brock服务器发送Rollback,那么就会直接删除上面这条半消息。
6.如果因为网络等原因迟迟没有返回失败还是成功,那么会执行RocketMQ的回调接口,来进行事务的回查。

消息过滤 利用tag字段过滤

Broker端消息过滤  
在Broker中,按照Consumer的要求做过滤,优点是减少了对于Consumer无用消息的网络传输。缺点是增加了Broker的负担,实现相对复杂。
Consumer端消息过滤
这种过滤方式可由应用完全自定义实现,但是缺点是很多无用的消息要传输到Consumer端。
另外,RocketMQ没有内存Buffer概念,RocketMQ的队列都是持久化磁盘,数据定期清除。
7.4 Redis
使用C语言开发的一个Key-Value的NoSQL数据库,开发维护很活跃,虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。实验表明:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。

消息堆积(场景)

我们可以通过jstack打印出线程的堆栈信息(可连续打印多次观察变化)。重点搜索
如果发现大量的消费线程处于WAITING(parking)状态,说明消费线程在等待待消费的消息。如果仍然存在消息堆积,则极有可能是拉取能力不足,重点应该加强rocketmq拉取消息的能力。
Kafka分区只能增加不能减少,利用临时队列,防止处理完之后浪费资源
消息中间件的主要功能是异步解耦,还有个重要功能是挡住前端的数据洪峰,保证后端系统的稳定性,这就要求消息中间件具有一定的消息堆积能力,消息堆积分以下两种情况:
消息堆积在内存Buffer,一旦超过内存Buffer,可以根据一定的丢弃策略来丢弃消息,如CORBA Notification规范中描述。适合能容忍丢弃消息的业务,这种情况消息的堆积能力主要在于内存Buffer大小,而且消息堆积后,性能下降不会太大,因为内存中数据多少对于对外提供的访问能力影响有限。
消息堆积到持久化存储系统中,例如DB,KV存储,文件记录形式。 当消息不能在内存Cache命中时,要不可避免的访问磁盘,会产生大量读IO,读IO的吞吐量直接决定了消息堆积后的访问能力

解决

线上故障了,怎么处理
消息堆积了10小时,有几千万条消息待处理,现在怎么办?
修复consumer, 然后慢慢消费?也需要几小时才可以消费完成,新的消息怎么办
核心思想:紧急临时扩容,更快的速度去消费数据
- 修复Consumer不消费问题,使其恢复正常消费,根据业务需要看是否要暂停
- 临时topic队列扩容,并提高消费者能力,但是如果增加Consumer数量,但是堆积的topic里面的message queue数量固定,过多的consumer不能分配到message queue

- 编写临时处理分发程序,从旧topic快速读取到临时新topic中,新topic的queue数量扩容多倍,然后再启动更多consumer进行在临时新的topic里消费

- 直到堆积的消息处理完成,再还原到正常的机器数量,删除这个临时队列。

定时消息(特性)

定时消息是指消息发到Broker后,不能立刻被Consumer消费,要到特定的时间点或者等待特定的时间后才能被消费。
如果要支持任意的时间精度,在Broker层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销。
RocketMQ支持定时消息,但是不支持任意时间精度,支持特定的level,例如定时5s,10s,1m等

死信队列(最后的后手)

死信队列指的是种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。
在RocketMQ中消息重试超过一定次数后(默认16次)就会被放到死信队列中,在消息队列。可以在控制台Topic列表中看到“DLQ”相关的Topic,默认命名是:%RETRY%消费组名称(重试Topic)%DLQ%消费组名称(死信Topic)死信队列也可以被订阅和消费,并且也会过期RocketMQ 中其中包括重试之后也无法消费的消息也会
死信队列应用场景
如我们平时下单后未在指定时间内付款,过来这个时间,我们的订单会被放入死信队列中。当我们再去付款时候,会发现订单已经被取消,此时我们只需要去死信队列中查该订单是否存在。比如一般设定都是24小时,如当一些消息出现异常迟迟未被消费(或者最大重试次数后也未成功消费),这时候就会将消息存放到死信队列中。

解决方法

其实这个就看你具体的需求了,比如我们可以专门开一个后台线程,订阅“%DLQ%WMSConsumerGroup”这个死信队列,对死信队列中的消息进行不停的重试。

Kafka与RocektMq

从上面的对比来看,Kafka 在性能上综合表现确实要比 RocketMQ 更加的优秀,但在消息选型过程中,我们不仅仅要参考其性能,还有从功能性上来考虑,例如 RocketMQ 提供了丰富的消息检索功能、事务消息、消息消费重试、定时消息等,RocketMQ 提供了丰富的消息检索功能、事务消息、消息消费重试、定时消息等。

为什么kafka比RocketMQ吞吐量更高

kafka性吞吐量更高主要是由于Producer端将多个小消息合并,批量发向Broker。kafka采用异步发送的机制,当发送一条消息时,消息并没有发送到broker而是缓存起来,然后直接向业务返回成功,当缓存的消息达到一定数量时再批量发送。
此时减少了网络io,从而提高了消息发送的性能,但是如果消息发送者宕机,会导致消息丢失,业务出错,所以理论上kafka利用此机制提高了io性能却降低了可靠性。

RocketMQ为何无法使用同样的方式

· RocketMQ通常使用的Java语言,缓存过多消息会导致频繁GC。
· Producer调用发送消息接口,消息未发送到Broker,向业务返回成功,此时Producer宕机,会导致消息丢失,业务出错。
· Producer通常为分布式系统,且每台机器都是多线程发送,我们认为线上的系统单个Producer每秒产生的数据量有限,不可能上万。
· 缓存的功能完全可以由上层业务完成。

为什么选择RocketMQ,与kafka存储区别

当broker里面的topic的partition数量过多时,kafka的性能却不如rocketMq。
kafka和rocketMq都使用文件存储,但是kafka是一个分区一个文件,当topic过多,分区的总量也会增加,kafka中存在过多的文件,当对消息刷盘时,就会出现文件竞争磁盘,出现性能的下降。一个partition(分区)一个文件,顺序读写。一个分区只能被一个消费组中的一个 消费线程进行消费,因此可以同时消费的消费端也比较少。
rocketMq所有的队列都存储在一个文件中,每个队列的存储的消息量也比较小,因此topic的增加对rocketMq的性能的影响较小。rocketMq可以存在的topic比较多,可以适应比较复杂的业务

RabbitMq死信队列

什么是死信
在 RabbitMQ 中充当主角的就是消息,在不同场景下,消息会有不同地表现。
死信就是消息在特定场景下的一种表现形式,这些场景包括:
消息被拒绝访问,即 RabbitMQ返回 nack 的信号时
消息的 TTL 过期时
消息队列达到最大长度
消息不能入队时。
上述场景经常产生死信,即消息在这些场景中时,被称为死信。

RabbitMQ 死信队列基本使用

在 RabbitMQ 中,死信队列的标识为 x-dead-letter-exchange ,通过观察死信队列的标识,我们不难发现,其标识最后为 exchange ,即 RabbitMQ 中的交换机,RabbitMQ 中的死信队列就是由死信交换机而得出的。要想使用死信队列,我们需要首先声明一个普通的消息队列,并将死信队列的标识绑定到这个普通的消息队列上, 这个过程需要我们在生产端进行配置
7.5 Kafka
Apache下的一个子项目,使用scala实现的一个高性能分布式Publish/Subscribe消息队列系统,具有以下特性:
· 快速持久化:通过磁盘顺序读写与零拷贝机制,可以在O(1)的系统开销下进行消息持久化;
· 高吞吐:在一台普通的服务器上既可以达到10W/s的吞吐速率;
· 高堆积:支持topic下消费者较长时间离线,消息堆积量大;
· 完全的分布式系统:Broker、Producer、Consumer都原生自动支持分布式,依赖zookeeper自动实现复杂均衡;
· 支持Hadoop数据并行加载:对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。

Kafka与RocketMQ长轮询

我们来想一下推模式有什么好处?

消息实时性高, Broker 接受完消息之后可以立马推送给 Consumer。
对于消费者使用来说更简单,简单啊就等着,反正有消息来了就会推过来。

推模式有什么缺点?

推送速率难以适应消费速率,推模式的目标就是以最快的速度推送消息,当生产者往 Broker 发送消息 的速率大于消费者消费消息的速率时,随着时间的增长消费者那边可能就“爆仓”了,因为根本消费不过 来啊。当推送速率过快就像 DDos 攻击一样消费者就傻了。并且不同的消费者的消费速率还不一样,身为 Broker 很难平衡每个消费者的推送速率,如果要实现自 适应的推送速率那就需要在推送的时候消费者告诉 Broker ,我不行了你推慢点吧,然后 Broker 需要维护每个消费者的状态进行推送速率的变更。 这其实就增加了 Broker 自身的复杂度。所以说推模式难以根据消费者的状态控制推送速率,适用于消息量不大、消费能力强要求实时性高的情况下

我们来想一下拉模式有什么好处?

拉模式主动权就在消费者身上了,消费者可以根据自身的情况来发起拉取消息的请求。假设当前消费者
觉得自己消费不过来了,它可以根据一定的策略停止拉取,或者间隔拉取都行。
拉模式下 Broker 就相对轻松了,它只管存生产者发来的消息,至于消费的时候自然由消费者主动发 起,来一个请求就给它消息呗,从哪开始拿消息,拿多少消费者都告诉它,它就是一个没有感情的工具人,消费者要是没来取也不关它的事。
拉模式可以更合适的进行消息的批量发送,基于推模式可以来一个消息就推送,也可以缓存一些消息之后再推送,但是推送的时候其实不知道消费者到底能不能一次性处理这么多消息。而拉模式就更加合理,它可以参考消费者请求的信息来决定缓存多少消息之后批量发送。

拉模式有什么缺点?

消息延迟,毕竟是消费者去拉取消息,但是消费者怎么知道消息到了呢?所以它只能不断地拉取,但是又不能很频繁地请求,太频繁了就变成消费者在攻击 Broker 了。因此需要降低请求的频率,比如隔个2秒请求一次,你看着消息就很有可能延迟 2 秒了。
消息忙请求,忙请求就是比如消息隔了几个小时才有,那么在几个小时之内消费者的请求都是无效的,在做无用功。
activeMQ吞吐量最低,性能最低,但是最安全,kafka吞吐量最高,最不安全,
RabbitMQ居中

二者都是拉的模式

那么竟然 RocketMQ 和 Kafka 都选择了拉模式,它们就不怕拉模式的缺点么? 怕,所以它们操作了一 波,减轻了拉模式的缺点。

Kafka 中的长轮询

像 Kafka 在拉请求中有参数,可以使得消费者请求在 “长轮询” 中阻塞等待。 简单的说就是消费者去 Broker 拉消息,定义了一个超时时间,也就是说消费者去请求消息,如果有的 话马上返回消息,如果没有的话消费者等着直到超时,然后再次发起拉消息请求。并且 Broker 也得配合,如果消费者请求过来,有消息肯定马上返回,没有消息那就建立一个延迟操作,等条件满足了再返回。

原理图:

消息中间件 - 图12

RocketMQ 中的长轮询

RocketMQ 中的 PushConsumer 其实是披着拉模式的方法,只是看起来像推模式而已。
因为 RocketMQ 在被背后偷偷的帮我们去 Broker 请求数据了。后台会有个 RebalanceService 线程,这个线程会根据 topic 的队列数量和当前消费组的消费者个数做负载均衡,每个队列产生的 pullRequest 放入阻塞队列 pullRequestQueue 中。然后又有个
PullMessageService 线程不断的从阻塞队列 pullRequestQueue 中获取 pullRequest,然后通过网络请求 broker,这样实现的准实时拉取消息。

原理图:


消息中间件 - 图13
RabbitMq
语法
消息中间件 - 图14
作用
消息中间件 - 图15

2-6-密码加密与微服务鉴权JWT的反馈很重要,体会docker的作用

什么时候选同步,什么时候选异步

一般来讲当一些业务是由第三方公司去开发做出的,我们无法控制他的错误,但是错误肯定会解决,不管多长时间。的用异步(比如阿里云短信,可能中途出bug,但是你给他打个电话就OK了。微服务调用只要发到rabbitMq就不用管了,就等他解决错误就可以了),这样发到rabbitMq就不用管了,但是如果我们一般自己开发的业务,自己的错误可控的,为了保证高可用性,和准确率就还是选同步,当然也可以异步!
异步发送的时候一定要注意,不要立马关闭生产者,否则之后的消息发不了,一般是自己设置时间在结束!

RocketMq

消息模式

集群消费模式 1000条平均分333每人
消息中间件 - 图16
广播消费模式 三人都是1000条
消息中间件 - 图17

默认是集群模式

push和pull模式

在RocketMQ中,消费者有两种模式,一种是push模式,另一种是pull模式。
push模式:客户端与服务端建立连接后,当服务端有消息时,将消息推送到客户端。
一般不用push方式,因为push要建立长链接,而一个发送者可能供多个消费者消费,每个都建立长链接,太耗费资源了,承受不了。原因2:当生产者往 Broker 发送消息 的速率大于消费者消费消息的速率时,随着时间的增长消费者那边可能就“爆仓”了,因为根本消费不过 来啊。
pull模式:客户端不断的轮询请求服务端,来获取新的消息。
但在具体实现时,Push和Pull模式都是采用消费端主动拉取的方式,即consumer轮询从broker拉取消息。
区别:
Push方式里,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒
MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。
Pull方式里,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历 MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开 始offffset,直到取完了,再换另一个MessageQueue。
疑问:既然是采用pull方式实现,RocketMQ如何保证消息的实时性呢? 轮询有时间差。
而且有好多都是无效请求,假如半个小时都一直没有发过来消息,你每一秒一轮询,特别耗费资源

4.1.1、长轮询 (长链接+轮询)

RocketMQ中采用了长轮询的方式实现,什么是长轮询呢? ,采取的是折中的方案,
长轮询即是在请求的过程中,若是服务器端数据并没有更新,那么则将这个连接挂起,直到服务器推送新的 数据,再返回,然后进入循环周期。客户端像传统轮询一样从服务端请求数据,服务端会阻塞请求不会立刻返回,直到有数据或超时才返回给客 户端,然后关闭连接,客户端处理完响应信息后再向服务器发送新的请求。

RocketMQ性能高原因

消息中间件 - 图18

分布式事务问题

每一个小系统模块部署在不同服务器中,各个服务调用,如果按顺序调用中每一个服务出现问题,那之前的服务调用应该也不算成功,回滚,保证事务性
消息中间件 - 图19
消息中间件 - 图20
具体实现实现这个接口
消息中间件 - 图21
两个方法,一个执行事务,一个check回查
三个状态 成功,回滚,未知
消息中间件 - 图22
回查机制:定义一个Map(key是事务id,value是事务状态(三个))
生产者事务完成之后记录map里。
重写check方法,通过事务id查到事务状态,返回给broker
从而解决了网络延迟,生产者发送的信息丢失的情况(可以设置定时回查时间)
消息中间件 - 图23
同步刷盘和异步刷盘
通过修改broker文件可以修改
同步更可靠,异步是到内存立马返回成功然后等积入一堆才开始刷盘,按正常情况下异步应该是没有问题的,但是如果宕机或者意外情况的话,你返回成功了,最后没刷成功就造成消息丢失了,所以如果可靠性,可以采用同步,效率的话可以异步
消息中间件 - 图24
消息中间件 - 图25
消息中间件 - 图26

RocketMQ架构

RocketMQ(1)-架构原理 - 雨点的名字 - 博客园 (cnblogs.com)
不采取广播模式
(1)广播消费模式下不支持顺序消息。
(2)广播消费模式下不支持重置消费位点。
(3)每条消息都需要被相同逻辑的多台机器处理。
(4)广播模式下, 消息队列 RocketMQ 保证每条消息至少被每台客户端消费一次, 但是并不会对消费失败的消息进行失败重投, 因此业务方需要关注消费失败的情况。
(5)广播模式下, 客户端每一次重启都会从最新消息消费。 客户端在被停止期间发送至服务端的消息将会被自 动跳过, 请谨慎选择。
(6)广播模式下, 每条消息都会被大量的客户端重复处理, 因此推荐尽可能使用集群模式。
消息中间件 - 图27

常用消息中间件对比

Kafka先写内存缓冲区,再刷磁盘
Rocket重试,延时,
消息中间件 - 图28
参考博文:(79条消息) Kafka、RabbitMQ、RocketMQ等消息中间件的对比_码农博士的博客-CSDN博客_kafka rabbitmq rocketmq

RocketMq的文件存储系统有两点优化以保证性能:

消息存储(顺序写):RocketMQ的消息用顺序写,保证了消息存储的速度。目前的高性能磁盘,顺序写速度可以达到600MB/s, 超过了一般网卡的传输速度,但是磁盘随机写的速度只有大概100KB/s
消息发送(零拷贝):将本机磁盘文件的内容发送到客户端需要进行多次复制,比如从磁盘复制数据到内核态内存;从内核态内存复制到用户态内存;从用户态内存复制到网络驱动,最后从网络驱动复制到网卡中。RocketMq采用Java中零拷贝的技术,让从内核态内存复制到用户态内存这一步省略,直接赋值到网络驱动中

Kafka高性能原因

1. 顺序写磁盘

(相比磁盘的随机写快很多)。如果你是追加文件末尾按照顺序的方式来写数据的话,那么这种磁盘顺序写的性能基本上可以跟写内存的性能本身也是差不多的。

2. 利用Page Cache(RocketMq没有)

回顾前面说道文件传输过程,其中第一步都是先需要先把磁盘文件数据拷贝「内核缓冲区」里,这个「内核缓冲区」实际上是磁盘高速缓存(PageCache)。
空中接力的方式来实现高效读写,操作系统本身有一层缓存,叫做page cache,是在内存里的缓存,(性能比RocketMq高的原因)我们也可以称之为os cache,意思就是操作系统自己管理的缓存。原理就是Page Cache可以把磁盘中的数据缓存到内存中,把对磁盘的访问改为对内存的访问。
我们都知道程序运行的时候,具有「局部性」,所以通常,刚被访问的数据在短时间内再次被访问的概率很高,于是我们可以用 PageCache 来缓存最近被访问的数据,当空间不足时淘汰最久未被访问的缓存。

PageCache 使用了「预读功能」。


比如,假设 read 方法每次只会读 32 KB 的字节,虽然 read 刚开始只会读 0 ~ 32 KB 的字节,但内核会把其后面的 32~64 KB 也读取到 PageCache,这样后面读取 32~64 KB 的成本就很低,如果在 32~64 KB 淘汰出 PageCache 前,进程读取到它了,收益就非常大。

PageCache 的优点主要是三个:

缓存最近被访问的数据;
把小请求放在pagecache,等满了再同一请求
预读功能;

PageCache优势

PageCache,就无法享受内核的这两点的优化:
·
内核的 I/O 调度算法会缓存尽可能多的 I/O 请求在 PageCache 中,最后「合并」成一个更大的 I/O 请求再发给磁盘,这样做是为了减少磁盘的寻址操作;
·
·
内核也会「预读」后续的 I/O 请求放在 PageCache 中,一样是为了减少对磁盘的操作;

3.零拷贝方式

假如不用零拷贝方式,kafka从磁盘读数据发送给下游的消费者大概的过程为:kafka首先看看要读的数据在不在os cache里,如果不在的话就从磁盘文件里读取数据后放入os cache,接着再到应用程序进程的缓存里,再到操作系统层面的Socket缓存里,最后从Socket缓存里提取数据后发送到网卡,最后发送出去给消费者。
零拷贝:直接让操作系统的cache中的数据发送到网卡后传输给下游的消费者,直接跳过了两次拷贝数据的步骤,Socket缓存中仅仅会拷贝一个描述符过去,不会拷贝数据到Socket缓存。

传统的的拷贝需要经历四个:jvm切换到内核态缓冲区读取->操作系统将数据拷贝用户缓冲区—>-再次切换到内核态并将用户缓存区数据拷贝进来->将内核态缓冲区写入socket buffer(cpu参与两次)

零拷贝经历两个:用户->内核态缓冲区(cpu不参与)

其实被拷贝了两次为什么叫零拷贝呢?由于站在系统的角度来待!

零拷贝的好处?

减少上下文切换,避免CPU参与拷贝带来的负载;

减少cpu和带宽的开销;

(79条消息) kafka高性能的原因_龍的天空的博客-CSDN博客_kafka高性能的原因

Kafka架构

消息中间件 - 图29

利用死信队列去补数据

消息变成死信有以下几种情况
· 消息被拒绝(basic.reject / basic.nack),并且requeue = false
· 消息TTL过期
· 队列达到最大长度(队列满了,无法再添加数据到mq中)
DLX 队列

消息中间件的缺点

使用消息队列缺点:
系统可用性降低:系统引入的外部依赖越多,越容易挂掉,万一MQ挂了,整套系统崩溃了。
系统复杂性提高:加MQ进来,怎么保证消息没有重复消费?怎么处理消息丢失的情况?怎么保证消息传递的顺序性?
分布式事务问题一致性问题:A系统处理完了直接返回成功了,后面的如果失败了,这数据就不一致了。

4.批量发送数据(RocketMq没有)

生产者发送多个消息到同一个分区的时候,为了减少网络带来的系能开销,kafka会对消息进行批量发送
batch.size
通过这个参数来设置批量提交的数据大小,默认是16k,当积压的消息达到这个值的时候就会统一发送(发往同一分区的消息)
linger.ms
这个设置是为发送设置一定是延迟来收集更多的消息,默认大小是0ms(就是有消息就立即发送)
当这两个参数同时设置的时候,只要两个条件中满足一个就会发送。比如说batch.size设置16kb,linger.ms设置50ms,那么当消息积压达到16kb就会发送,如果没有到达16kb,那么在第一个消息到来之后的50ms之后消息将会发送。

RabbitMq为何那么可靠

1. 持久化存储机制

持久化是提高RabbitMQ可靠性的基础,否则当RabbitMQ遇到异常时(如:重启、断电、停机 等)数据将会丢失。主要从以下几个方面来保障消息的持久性:

Exchange的持久化。通过定义时设置durable 参数为ture来保证Exchange相关的元数据不不 丢失。
Queue的持久化。也是通过定义时设置durable 参数为ture来保证Queue相关的元数据不不 丢失。
消息的持久化。通过将消息的投递模式 (BasicProperties 中的 deliveryMode 属性)设置为 2 即可实现消息的持久化,保证消息自身不丢失。

2. AMQP/RabbitMQ的事务机制

没有捕获到异常并不能代表消息就一定投递成功了。
一直到事务提交后都没有异常,确实就说明消息是投递成功了。但是,这种方式在性能方面的开销比较大,一般也不推荐使用

3. 发送端确认机制

RabbitMQ后来引入了一种轻量量级的方式,叫发送方确认(publisher confirm)机制。生产者将信 道设置成confirm(确认)模式,一旦信道进入confirm 模式,所有在该信道上面面发布的消息都会被指派 一个唯一的ID(从1 开始),一旦消息被投递到所有匹配的队列之后(如果消息和队列是持久化的,那么 确认消息会在消息持久化后发出),RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一 ID),这样生产者就知道消息已经正确送达了。

4. Consumer ACK

如何保证消息被消费者成功消费?

前面我们讲了生产者发送确认机制和消息的持久化存储机制,然而这依然无法完全保证整个过程的可靠性,因为如果消息被消费过程中业务处理失败了但是消息却已经出列了(被标记为已消费了),我们又没有任何重试,那结果跟消息丢失没什么分别。
RabbitMQ在消费端会有Ack机制

5. 消费端限流

在电商的秒杀活动中,活动一开始会有大量并发写请求到达服务端,需要对消息进行削峰处理,如何削峰?
当消息投递速度远快于消费速度时,随着时间积累就会出现“消息积压”。消息中间件本身是具备一 定的缓冲能力的,但这个能力是有容量限制的,如果长期运行并没有任何处理,最终会导致Broker崩溃,而分布式系统的故障往往会发生上下游传递,连锁反应那就会很悲剧… 下面我将从多个角度介绍QoS与限流,防止上面的悲剧发生。
RabbitMQ 可以对内存和磁盘使用量设置阈值,当达到阈值后,生产者将被阻塞(block),直 到对应项指标恢复正常。全局上可以防止超大流量、消息积压等导致的Broker被压垮。当内 存受限或磁盘可用空间受限的时候,服务器都会暂时阻止连接,服务器将暂停从发布消息的已 连接客户端的套接字读取数据。连接心跳监视也将被禁用。所有网络连接将在rabbitmqctl和 管理插件中显示为“已阻止”,这意味着它们尚未尝试发布,因此可以继续或被阻止,这意味着 它们已发布,现在已暂停。兼容的客户端被阻止时将收到通知。

重点:RocketMq可视化工具

重点博客 : (79条消息) RocketMQ可视化工具RocketMQ-Console搭建实践_pharos的博客-CSDN博客_rocketmq可视化工具

RocketMQ索引

put——加入新消息

前几步都是经典的hash取模操作
根据消息的key计算hash值
hash对500w取模,找到对应的hash-slot
拿到找到的hash-slot的已有的值a(指的是最近在这个slot发生hash碰撞的index-item的地址,默认是0),顺序从index-item-list找一个位置b,将消息的hash值、物理偏移量、时间戳和刚刚拿到的最新发生hash碰撞的index-item的地址a组成一个新的index-item放到位置b。所以这里可以看到rmq在消息索引这里解决hash冲突的方式也是一种拉链法,而且个人觉得这个拉链法也是使用头插法处理新来的entry的

get——根据key查询消息

先根据查询的时间戳范围,过滤不在此范围内的文件,然后在过滤后的每个文件里执行查找操作
计算查询的key的hash值
hash值取模,找到对应的hash-slot
从hash-slot拿到的第一个index-item开始遍历这个链表,遇到匹配的消息就收集他们的偏移量,最后统一去commitLog文件取出消息。注意,因为这里我们的index-item没有存储具体的key值,只存了key值的hash值,所以没有办法像Java的hashMap那样在发生hash冲突的链表上使用key值的具体内容一个一个的equals寻找真正需要的值,只能按照时间范围和已有的hash值对比拿到一系列的msg的offset,然后去commitLog里去查,查完之后再进行过滤。
消息中间件 - 图30

RocketMq事务消息和kafka事务消息

Kafka 的事务消息和 RocketMQ 的事务消息又不一样了,RocketMQ 解决的是本地事务的执行和发消息
这两个动作满足事务的约束。
而 Kafka 事务消息则是用在一次事务中需要发送多个消息的情况,保证多个消息之间的事务约束,即多条消息要么都发送成功,要么都发送失败,
Kafka 的事务基本上是配合其幂等机制来实现 Exactly Once 语义的,所以说 Kafka 的事务消息不是我们想的那种事务消息,RocketMQ 的才是。
我们知道消息可靠性有三种,分别是最多一次、恰好一次、最少一次,之前在消息队列连环问的文章我
已经提到了基本上我们都是用最少一次然后配合消费者端的幂等来实现恰好一次。
那他是如何实现恰好一次的?就是通过幂等,和我们在业务上实现的一样通过一个唯一 Id, 然后记录下来,如果已经记录过了就不写入,这样来保证恰好一次。
所以说 Kafka 实现的是在特定场景下的恰好一次,不是我们所想的利用 Kafka 来发送消息,那么这条消息只会恰巧被消费一次。
在开始事务的时候,生产者会向事务协调者发起请求表示事务开启,事务协调者会将这个消息记录到特殊的日志-事务日志中,然后生产者再发送真正想要发送的消息,这里 Kafka 和 RocketMQ 处理不一样,Kafka 会像对待正常消息一样处理这些事务消息,而不是半消息,由消费端来过滤这个消息。
然后发送完毕之后生产者会向事务协调者发送提交或者回滚请求,由事务协调者来进行两阶段提交,如果是提交那么会先执行预提交,即把事务的状态置为预提交然后写入事务日志,然后再向所有事务有关的分区写入一条类似事务结束的消息,这样消费端消费到这个消息的时候就知道事务好了,可以把消息放出来了。
最后协调者会向事务日志中再记一条事务结束信息,至此 Kafka 事务就完成了,我拿 confluent.io 上的
图来总结一下这个流程。
当然你要是用的流式计算那么 Kakfa 的事务消息也是你想要的。

RabbitMQ高可用的模式

普通集群模式,意思就是在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。你创建的queue,只会放在一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(元数据可以认为是queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个 queue 的读写操作。
镜像集群模式:这种模式,才是所谓的 RabbitMQ 的高可用模式。跟普通集群模式不一样的是,在镜像集群模式下,你创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。RabbitMQ 有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据
同步到其他的节点上去了。这样的话,好处在于,你任何一个机器宕机了,没事儿,其它机器(节点)还包含了这个 queue 的完整数据,别的 consumer 都可以到其它节点上去消费数据。坏处在于,第一,这个性能开销也太大了吧,消息需要同步到所有机器上,导致网络带宽压力和消耗很重!RabbitMQ 一个 queue 的数据都是放在一个节点里的,镜像集群下,也是每个节点都放这个 queue 的完整数据

RabbitMQ如何保证消息不丢失


消息中间件 - 图31
数据的丢失问题,可能出现在生产者、MQ、消费者中
生产者丢失:生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。此时可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 中 broker 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。吞吐量会下来,因为太耗性能。所以一般来说,如果你要确保说写RabbitMQ 的消息别丢,可以开启confirm模式,在生产者那里设置开启confirm模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个ack消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你一个nack接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。事务机制和cnofirm机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是confirm机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息RabbitMQ 接收了之后会异步回调你一个接口通知你这个消息接收到了。所以一般在生产者这块避免数据丢失,都是用confirm机制的。
MQ中丢失:就是 RabbitMQ 自己弄丢了数据,这个你必须开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。设置持久化有两个步骤:创建 queue 的时候将其设置为持久化,这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是不会持久化 queue 里的数据。第二个是发送消息的时候将消息的deliveryMode 设置为 2,就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。必须要同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复queue,恢复这个 queue 里的数据。持久化可以跟生产者那边的confirm机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者ack了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到ack,你也是可以自己重发的。注意,哪怕是你给 RabbitMQ 开启了持久化机制,也有一种可能,就是这个消息写到了 RabbitMQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时RabbitMQ 挂了,就会导致内存里的一点点数据丢失

消费端丢失:你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,RabbitMQ 认为你都消费了,这数据就丢了。这个时候得用 RabbitMQ 提供的ack机制,简单来说,就是你关闭 RabbitMQ 的自动ack,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里ack一把。这样的话,如果你还没处理完,不就没有ack?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。
当然不一定,假如消费者先进行确认应答,然后执行业务逻辑。那么在执行业务之前宕机,则此条消息既不在 Broker 中,也不在消费者端,消息就丢失了。
所以,消费者收到消息后,可以先存储到本地数据库,然后确认应答,最后执行业务逻辑。这样就可以保证消费者端的可靠性

RabbitMQ事务

5.1 RabbitMQ解决分布式事务原理
采用最终一致性原理。
需要保证以下三要素:
1.确认生产者一定要将数据投递到MQ服务器中(采用MQ消息确认机制)
2.MQ消费者消息能够正确消费消息,采用手动ACK模式(注意重试幂等性问题)
3.如何保证第一个事务先执行,采用补偿机制,在创建一个补单消费者进行监听,如果订单没有创建成功,进行补单。(如果第一个事务中出错,补单消费者会在重新执行一次第一个事务,例如第一个事务是添加订单表,如果失败在补单的时候重新生成订单记录,由于订单号唯一,所以不会重复)


当然不一定,假如消费者先进行确认应答,然后执行业务逻辑。那么在执行业务之前宕机,则此条消息既不在 Broker 中,也不在消费者端,消息就丢失了。
所以,消费者收到消息后,可以先存储到本地数据库,然后确认应答,最后执行业务逻辑。这样就可以保证消费者端的可靠性

分布式事务问题

每一步都有可能异常或者出现意外,比如在插入到数据库中出现异常、发送到MQ中途异常、MQ接受消息后返回确认时异常、MQ向消费者投递消息异常、消费者消费消息后向MQ进行ACK确认时异常等。每一步的异常都要进行相应的处理,不然就有可能导致脏数据。
插入到数据库中出现异常:当product插入到数据库异常,这一步还没到MQ,直接回滚。本地消息表的作用在于不过度依赖MQ,MQ中途也有可能挂掉。
发送到MQ中途异常:轮训本地消息表对没有发送到MQ的消息进行重发
MQ接受消息后返回确认时异常:和上步一样,重发到MQ,等MQ给确认收到更改本地消息
MQ向消费者投递消息异常:MQ会进行重试向消费者投递消息,只要消费者没有给MQ ACK确认,MQ就会进行重试,重试配置可根据实际业务配置
消费者消费消息后向MQ进行ACK确认时异常:和上步相同,MQ没收到ACK确认还是会重发,MQ重发时,在消费者需要进行幂等设计处理,防止脏数据
对于本地消息表,可以抽离出来做成消息服务系统,毕竟不只是一个服务在使用,抽离出来可以作为共用

7、 如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,说说怎么解决?

消息积压处理办法:临时紧急扩容:
先修复 consumer 的问题,确保其恢复消费速度,然后将现有 cnosumer 都停掉。
新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。
然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。
接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。
MQ中消息失效:假设你用的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在 mq 里,而是大量的数据会直接搞丢。我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上12点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回来。也只能是这样了。假设 1 万个订单积压在 mq 里面,没有处理,其中 1000个订单都丢了,你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。
mq消息队列块满了:如果消息积压在 mq 里,你很长时间都没有处理掉,此时导致 mq 都快写满了,咋办?这个还有别的办法吗?没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据吧。

参考博客去解决分布式事务问题

(79条消息) Spring Cloud分布式事务终极解决方案探讨_persistenceヾ(◍°∇°◍)ノ的博客-CSDN博客

建筑电气复习范围


电气192尹溢伟
二、三章相对基础
电气192-王芸博2.6 第三章例1必考
计算负荷 无功补偿应该也要考嘛
电气191颜湘
前四章计算负荷,无功补偿,有功功率和无功功率损耗、导线截面积、熔断器的选择这几类计算题,第四章导线截面积会考一题
电气192尹溢伟4-2 必考