一 老系统的痛点
1.1 订单支付前
如果用户量大 ,还会出现大量的超时取消支付,如果用定时任务批处理,那么效率就会特别低
1.2 订单支付中
1.3 订单支付后
1.4 大促秒杀活动
数据库没发抗住很大的,访问请求。没办法,数据的一般访问量一般就是2000左右好一点的机器大概也就是 4000左右
1.5 大sql 关联查询 出报表
大 sql 直接访问业务数据库,关联查询数据,出报表的话,很容易影响线上的业务,一般不允许超过三张表关联查询
二 基础概念

2.1 消息中间件是什么?
- 异步提高接口的响应时间
- 系统之间解耦,不会强依赖
- 应对突发流量,削峰填谷
2.2 mq 技术选型
rabbitmq 因为它属于较老的mq ,所以它不具备分布式的中间件特性 ,比如数据分片 但是功能比较完善,
kafka 比较新一点 有分布式特性 数据分片 和高可用性 ,但是作为mq 功能很简陋
rocketmq 就是抄袭kafka的 不过,给我们实现了很多mq高级特性。方便好用些2.3 rocketmq 的基本原理
2.3.1 支持集群化部署
很简单 可以集群部署,分流压力
2.3.2 海量数据分片存储
分片存储数据
2.3.3 支持高可用
Broker 主从架构以及多副本策略
2.3.4 数据路由
nameServer 存储路由信息
2.4 路由中心(NameServer)原理
2.4.1 nameServer 集群原理
路由中心集群化,保证高可用
2.4.2 Broker 注册到 nameServer上
每个broker 启动 都得向 所有的NameServer 进行注册
2.4.3 系统获取 Broker 信息
每个业务系统主动去 nameServer 拉取Broker 信息
2.4.4 nameServer 感知 broker 存活
Broker 与 NameServer之间存在心跳机制,Broker 每隔30秒给所有的NameServer发送心跳,告诉NameServer 自己目前还活着
每次 NameServer 收到 Broker 一个心跳,则更新最新的心跳时间。
NameServer 每间隔10s 回去check 最近的心跳时间,如果某个 Broker 超过120s 没有发送心跳,则可以认为这个Broker 挂掉了。
2.4.5 业务系统 感知broker 存活
2.5 Broker 原理
2.5.1 Broker 主从同步原理
为了保证 mq的数据不丢失 和具备一定的高可用,一般都会将broker 部署成 Master-Slave 的模式,也就是一主一从。
Master-Slave 采取的是Pull 模式 拉取主节点信息。
2.5.2 Broker 读写分离
读:主从皆可读
- 如何主节点负载过高,他会建议业务系统下次去从节点读取
- 如何数据不同步,只能去主节点拉取数据
2.5.3 Master Broker 宕机
4.5 之前 是不能自动切换,只能手工处理,即Master-Slave 不是彻底的高可用模式,它无法实现自动Slave 切换为Master
2.5.4 Slave Broker 宕机
2.5.5 一主多从 自动切换
4.5 之后 引入了Dledger 机制 它是基于Raft 协议实现的。他可以 让 一个Master 对应多个Slave 也就是有多个副本,然后如何主节点宕机了,可以通过选举机制,选出新的Master Broker
三 集群部署
3.1 部署
3.1.1 配置说明
- nameServer :3台机器 8*cpu +16G +500G +千兆网卡
- broker:6台机器 24*cpu(两颗x86_64 cpu 、每颗 12核) +48G +1TG +千兆网卡
- 生产者:2台机器 4*cpu +8G +500G +千兆网卡
- 消费者:2台机器 4*cpu +8G +500G +千兆网卡
3.1.2 单机快速部署
3.1.3 集群部署
3.1.4 demo 测试
3.2 可视化页面
3.2.1 rocketmq-external
3.2.2 如何监控
3.2.3 机器监控
zabbix 监控
命令监控
3.3 参数优化调整
3.3.1 os 参数优化
3.3.1.1 vm.overcommot_memory
它是 内存分配策略
可选值:0、1、2。
0:表示内核将检查是否有足够的可用内存供应用进程使用;如果有足够的可用内存,内存申请允许;否则,内存申请失败,并把错误返回给应用进程。
1:表示内核允许分配所有的物理内存,而不管当前的内存状态如何。
2:表示内核允许分配超过所有物理内存和交换空间总和的内存。
我记得以前redis,就是因为这个参数为0, 导致在save数据快照到磁盘文件的时候,需要申请大量的内存被拒绝,然后报错了
推荐 参数为1
修改命令: echo ‘ vm.overcommot_memory=1’ >> /etc/sysctl.conf
3.3.1.2 vm.max_map_count
这个参数影响中间件可以开启的线程数量,非常的重要
如果这个参数太小,有时候会导致有些中间件无法启动足够的线程,进而导致报错,甚至中间件系统挂掉
它的默认值是 65536
我记得 生产环境部署的kafka 集群就因为无法开启足够多的线程,直接导致kafka 宕机
修改命令:echo ‘vm.max_map_count=655360’ >> /etc/sysctl.conf
3.3.1.3 vm.swappiness
swappiness,Linux内核参数,控制换出运行时内存的相对权重。swappiness参数值可设置范围在0到100之间。 低参数值会让内核尽量少用交换,更高参数值会使内核更多的去使用交换空间。默认值为60(参考网络资料:当剩余物理内存低于40%(40=100-60)时,开始使用交换空间)。对于大多数操作系统,设置为100可能会影响整体性能,而设置为更低值(甚至为0)则可能减少响应延迟。
默认值 60 对我们生产环境来说偏高了,可能会导致我们的中间件运行不活跃的时刻被迫腾出内存空间然后放到swap区域去
因此我们一般建议设置为10 尽量用物理内存,别放磁盘swap区域去
修改命令:echo ‘vm.swappiness=10’ >> /etc/sysctl.conf
3.3.1.4 ulimit
它是用来控制 linux 的最大文件连接数的,默认值1024,因为咱们中间件会有大量的频繁的读写磁盘文件的时候,或者进行网络通信的时候,都会受到这个参数的限制
我们经常在linux 看到报错, error:too many open files
修改命令:echo ‘ulimit -n 1000000 ‘ >> /etc/profile
3.3.1.5 总结
其实我们调整的参数无非都是 磁盘文件 IO 、 网络通信、内存管理、线程数量有关系的,因为我们的中间件系统无非就是和这些打交道。
- 肯定需要开启大量的线程
- 肯定需要大量的网络通信和磁盘IO
- 肯定需要大量的使用内存
3.3.2 jvm 参数优化
-server 一般都是服务器模式
-Xms8g -Xmx8g -Xmn4g 虽然默认堆大小8g 新生代是4g ,如果我们机器很大可以设置多些
-XX:+UseG1GC -XX:G1HeapRegionSize=16m 使用G1垃圾回收器
如果内存够大 可以把region 设置为16M ,如果使用默认的2m 就太小了,导致数量很多
-XX:G1ReservePercent=25:它表示,在G1 管理的老年代理预留25%的空闲内存,保证新生代,进入到老年代,避免老年代内存满了,
默认值10% 太少了。我们一般都是 20-25
-XX:InitiatingHeapOccupancyPercent=30 表示当堆内存使用率超过30% 就会启动G1的并发垃圾回收,开始尝试回收一些垃圾对象
默认值45%,也就是提供gc 频率,避免垃圾对象过多,一次垃圾回收耗时过长的问题。
-XX:SoftrefLRUPolicyMSPerMB=0: 默认是0 ,可以设置1000 避免频繁回收一些软引用的class 对象
-XX:-OmitstackTraceInfastThrow: 它表示jvm 要把完整的异常堆栈信息打印出来,默认的时候jvm 会丢弃部分信息
-XX:+AlwaysPreTouch: jvm 开始的时候不会分配指定的内存,会在实际使用的时候再分配
这里设置了强制分配内存
—XX:MaxDirectMemorySize=15g :直接内存 Mq 大量使用了NIO中的direct buffer ,我们可以设置大些
总结 rocketMq 默认使用了G1 垃圾回收器 默认堆内存8G
3.3.3 rocketMQ 核心参数优化
rocketmq/distribution/target/apache-rocketmq/conf/dledger
sendMessageThreadPoolNums=16 默认线程数量是16 我们可以调整为 24cpu
3.4 机器压测
压测获取的结果是尽可能的高,并且其他资源负载不能太高,保证机器不能宕机,不是要最大值。即 rocketmq 的Tps 和机器的资源使用率和负载取得一个平衡。
3.4.1 压测背景。
- nameServer :3台机器 8*cpu +16G +500G +千兆网卡
- broker:6台机器 24*cpu(两颗x86_64 cpu 、每颗 12核) +48G +1TG +千兆网卡
- 生产者:2台机器 4*cpu +8G +500G +千兆网卡
- 消费者:2台机器 4*cpu +8G +500G +千兆网卡

- 2 个生产者 80个线程*2 并发写入消息
- 每个消息 500字节
-
3.4.2 tps 和消息延时
在控制台可以看到 tps(每秒消息处理的数量) 稳定在7万左右
- 消息从生产到消费在1秒以内的延迟
3.4.3 cpu负载
top命令 查看负载情况: load avg 12.03 12.05 12.08
表示 cpu 在1分钟 5分钟 15分钟的cpu负载情况
因为我们的机器是24cpu 说明负载不到50%,问题不大3.4.4 内存使用率
free 查看内存命令 发现内存 稳定在 16G左右,还有很大内存没有使用3.4.5 jvm gc频率
使用jstat 查看 mq 的jvm gc频率 ,基本上 几十秒 垃圾回收一次,每次回收的后,存活的对象很小,几乎不进入老年代3.4.6 磁盘IO负载
top 查看 io负载情况
Cpu(s): 0.3% us, 0.3% sy, 0.0% ni, 76.7% id, 13.2% wa, 0.0% hi, 0.0% si
这里的13.2% 表示 磁盘IO 等待CPU 执行时间的百分比,比例越大,说明CPU执行的时候大部分时间都在等待执行IO ,说明IO 负载很高,一般不要超过40%
3.4.7 网卡流量
sar -n DEV 1 2 查看网卡流量
理论上 500 字节 7万 = 35兆 (单次) 3(同步到—2个从节点 消费)=115兆
千兆网卡 理论上限 每秒传输128兆数据 ,所以网卡几乎被打满了。
3.4.8 总结
3.4.9 建议
- 如何压测
- MQ的TPS &机器的负载、内存使用率、jvm GC频率、磁盘IO负载、网络流量负载取一个平衡。
- 尽量的让TPS 提高
- 同时机器的各项资源负载不要太高
- 压测过程
- 业务机器开启最大的线程 进行并发读写消息
- 观察 TPS cpu load 、内存使用率、jvm gc 频率、磁盘IO负载、网卡流量负载、
- 不断增加机器&线程、让TPS 不断提升
- 同时观察各项负载是否过高
-
3.5 生产规划
根据QPS 来定适当冗余一些机器
- 生产机器使用高配物理机
四 改造系统
4.1 订单非核心功能异步化
我们可以把 送积分 发券 推送消息 发货异步化,我们可以新建一个订单支付成功topic

4.2 三方系统异步化
我可以吧 物流系统 和推送系统异步化。
4.3 cannal+mq 同步数据库

4.4 大促秒杀优化
4.4.1 并发读优化
4.4.1.1 页面静态化
提前把商品的数据从数据库取出来,然后封装成一个大json串。方便前端直接渲染。
4.4.1.2 多级缓存 CDN+Nginx+Redis
cdn 就近节点加速,花钱买各个云厂商的即可
nginx 结合lua脚本实现本地缓存
redis 缓存json数据
4.4.2 并发写优化
4.4.2.1 秒杀答题
让用户手动做一个答题的操作:
4.4.2.3 基于redis 实现下单精准扣除库存
一般我们会将秒杀商品的库存提前写入redis中,然后当收到请求之后,可以直接对redis的库存进行扣减
redis 可以轻松的单机可以抗几万高并发,因此这里可以抗下高并发的库存扣减
4.4.2.4 抢购之前过滤无效请求
当redis 扣除完库存之后,在zk 写入一个秒杀完毕的标志位,然后zk会反向通知nginx 中我们自己写的lua脚本 ,通过lua 脚本后续请求过来的时候直接过滤,不做转发了。
4.4.2.5 瞬时并发下单请求使用mq 削峰
mq 可以抗下几万并发的下单请求,我们利用mq异步生成订单,慢慢的处理,也就是延迟个几十秒会处理完毕。
五 核心原理

5.1 MessageQueue
5.1.1 创建topic 必须指定队列
我们在控制台创建topic的时候 必须要指定MessageQueue,本质上 MessageQueue就是一个数据分片的机制。它将一个topic 的数据拆分了很多个数据分片,然后每个broker 机器上 都存储一些MessageQueue。
5.1.2 生产者均匀写入队列
5.1.3 broker 故障 生产者自动容错
当 broker 临时出现故障,比如 master Broker 挂了,此时正在等待其他slave broker 自动热切换为 master broker ,那么这个时候一组broker 就没有 master broker 可以写入了。
如果是均匀把数据写入各个broker 上的 队列,那么会导致一段时间内,每次访问这个挂掉的 master broker 都会访问失败,因此我们会在生产端,开启一个参数 sendLatencyFaultEnable, 它具有一个容错机制,例如 某次访问 一个 broker 发现网络延迟有500ms 然后还是无法访问,那么就自动回避返回这个 broker 一段时间,比如接下来3秒内,就不会访问这个broker
5.2 Broker 数据存储机制
5.2.1 commitLog 概念
commitLog 消息顺序写入机制 ,当生产者发送消息给mq的时候,是把消息直接写到磁盘的日志文件中commitLog
- 每个文件最多限定1GB
- 文件尾部追加写
5.2.2 MessageQueue & commitLog
文件格式
$Home/store/consumequeue/{topic}/{queueId}/{fileName}
{topic} 指的就是某个topic
{queueId} 指的就是某个MessageQueue
{fileName} 后面的就是存在在队列中许多的文件,这个文件里存储是的一条消息对应着commitlog的offset的偏移量
举例说明:一个topic 两个分片队列
$Home/store/consumequeue/TopicOrderPaySuccess/Messagequeue0/consumequeue0磁盘文件
$Home/store/consumequeue/TopicOrderPaySuccess/Messagequeue1/consumequeue1磁盘文件

consumequeue 包含信息
- 消息在日志文件(commitLog)中的偏移量
- 消息的长度
- 一条数据是20字节
- tag hashcode
- 每个consumequeue文件可以保存30条记录
-
5.2.3 commitLog 写优化
磁盘顺序写
- OS pageCache
- OS 异步刷盘策略
5.2.4 同步刷盘与异步刷盘

同步刷盘:吞吐量低,不会丢数据
- 首先它是强制把消息刷入物理磁盘文件中的,然后才返回ack给生产端,
- 否则报发送失败,客户端继续重试
异步刷盘:吞吐量高,会丢失数据

由图得知,宏观上看, 一组broker 有三份数据, 如果master 节点 宕机 那么 follower 自动更新为leader,继续接受客户端的数据写入。
5.3.1 基于 Dledger 技术 替换Broker 的commitLog
其实就是用 dledger 来管理 commitLog , 然后 Broker 还是基于 dledger管理的commitLog 去构建出来机器上的各个consumeQueue 磁盘文件。
5.3.2 Dledger 基于 Raft协议 选举 Leader Broker
简单讲就是利用随机休眠机制,先苏醒的节点会投票给自己,其他节点苏醒发现自动选票,则会跟投,这样会确保经过几轮投票,可以快速的选出一个leader。
5.3.3 Dledger 基于 Raft协议 多副本同步
简单来说,数据同步会分为两个阶段,一个uncommitted 阶段、一个是committed阶段。
- 当 leader broker 上的Dledger 收到一条数据之后,会标记为uncommitted 状态
- 然后它通过自己的dledgerServer组件把这个uncommitted 数据发送给 follower 的 dledgerServer.
- 然后 follower broker dledgerServer 收到 uncommitted 消息后 ,必须返回一个ack 给 leader
- 如果 leader 收到 过半数的 follower 返回 ack ,那么将消息标记为 committed状态
- 最后 leader 发给commited 消息给 follow 让他们把消息标记为 commit 状态
5.3.4 leader broker 崩溃
当 leader broker 挂了,那么剩下的两个follower 重新选举 一个 leader 出来。然后同步数据,保证数据不丢失。
5.4 consumer 原理
5.4.1 consumer group
消费组,就是一组消费者的意思。
5.4.2 集群消费 vs 广播消费
一般我们都是默认集群模式
集群模式:一个消费组获取一条消息,只会给组内的一台机器处理。
广播模式:一个消费组获取一条消息,组内的每个机器都可以处理
5.4.3 messageQueue 与消费者的关系
一个 messageQueue 只能被一个消费机器去处理的,即 只能被一个消费组对应
但是 一台消费者机器可以出来多个messageQueue。

5.4.4 push pull 消费
我们一把使用push 模式,时效性会更好些;
push:当消费者发起拉取请求的时候,如果发现可以消费的信息,立马返回一批消息到消费机器去处理。处理完之后立刻发送broker 请求拉取下一批机器。所以就是拉取一批,立马拉取下一批,看起来就像是 broker 一直不停的推送消息到消费机器一样。
push 请求挂起和长轮询机制:当拉取的时候没有发现新的消息,则会请求线程挂起,默认是15秒,然后有一个后台线程不停的check 是否有新的消息进来,如果有则唤醒。
5.4.5 broker 吐出消息给消费者
- 加入消费者第一次请求 messageQueue0中的消息,那就从第一条开始拉取。
- broker 就根据messageQueue0 找到 对应的 messageQueue0 找到 第一条消息的offset
- 更据这个地址 读取数据返回给消费者机器

5.4.6 消费者处理消息,并返回ack
- 消费者处理完数据后,提交消费进度 ,即就会在 offset=1 的位置上记录消费进度
- 下次消费组再拉取consumeQueue消息时,就从 broker 记录消费位置后面拉取
5.4.7 消费者重平衡
- 如果消费组宕机 或则扩容
- 重新给各个消费机器分配消息队列messageQueue
假设
机器01=messageQueue0/message1
机器02=messageQueue2/message3
如果机器02宕机,则机器01 接管
则
机器01=messageQueue0/message1
机器01=messageQueue2/message3
5.4.8 消费者何时在主节点拉取,何时在从节点拉取
我们都知道当主节点负载太高的时候,要去从节点拉取,这个判定标准是啥呢。
5.4.8.1 consumerQueue 也是基于os cache 优化的。
我们都知道 利用 消息 利用 os cache 写入很快。那么读取也是利用 os cache 的。 os cache 优化机制 ,会把我们的consumerQueue文件(它主要存放消息的元信息 offset 30万条消息每个文件都很小才5.72m) 几乎全部放到os cache 中,这样 消费者机器既可以读取信息的地址,比直接访问磁盘快太多。
5.4.8.2 commitLog 基于 os cache +磁盘一起读取
因为commitLog文件存储的是消息信息,所以内容是很大的,要1GB ,所以整体要有几个TB ,因此 是远远大于内存的,一般 内存中缓存的消息大概就是10-20G 。
如果你的机器负载很低 ,即 积压的消息很少,那么此时消息都是刚刚写到 commitLog 消费者就来消费。那么这个效率很高的。如果负载很高,机器内存放不下,或者消息已经写到磁盘里了,那么消费者机器读取性能就大大下降。
5.4.9 master broker 负载过高,从 slave 拉取数据
当主节点发现 mq里有10万条数据,你才拉取2万条,剩下的8万条数据 ,内存放不下,还得磁盘给你,他就觉得自己的负载太高,希望消费者从其他节点拉取。
PS 没有拉取的数量大小 远远大于 os cache 可以存放的消息大小,那么说明要借助磁盘加载数据,这个时候会建议下次拉取去slave 节点拉取。
5.5 rocketmq 高性能的通信框架

- Reactor 主线程在端口上监听producer 建立连接的请求,建立长连接
- Reactor 线程池并发的监听多个连接的请求是否到达
- Worker 请求并发的对多个请求进行预处理
- 业务线程池并发的对多个请求进行磁盘读写业务操作
PS 这些事情全部是利用各个线程池分工合作,所以cpu 越多 性能就高。
5.6 mmap 技术 减少文件拷贝次数
5.6.1 传统文件 IO操作
5.6.2 缓存映射
jdk NIO 包下的 MappedByteBuffer.map() 缓存映射 一般大小 1.5GB-2GB
所以 commitLog 单文件在1GB ,方便做内存映射。
- 写数据:是写入page cache缓存中的,然后由异步 os 线程 刷盘(只有一次文件拷贝 从 pageCache 到 磁盘文件里)
- 读数据:是从pagecache 中加载,如果没有从 磁盘里加载,并且会把临近的其他数据一块加载。(这里也是一次文件拷贝)
预映射机制:提前把 commitLog 和 consumerQueue 分配到虚拟内存中。
文件预热机制:提前把数据加载到内存里
六 实战案例
6.1 消息丢失的情况
生产者发送失败
mq 宕机 os cache 丢失
消费者消费失败
6.2 生产者防丢失
6.2.1 事务消息防丢失
6.2.1.1 事务消息使用流程
- 先发送half 消息 到mq 试探 mq是否正常,如果是则失败,则支付失败,走退款操作,则继续执行,订单更新操作
- 如果本地事务更新失败(假设数据库挂了),直接发送回滚请求 rollback 给mq,意思删除 half消息
- 如果本地更新成功了,这个时候发送一个half commit 操作给mq ,这样红包系统就能可见。

ps 如果 发送half 消息 没有响应 mq 会有补偿机制,定时扫描这个没有执行 commit/rollback的操作,我们可以根据具体的业务判断是 commit 或者是rollback
6.2.1.2 事务消息 可见性分析
它会把发送的half消息 写入 内部的topic 中,所以 业务topic 看不到。
- 当消息进入 内部的topic 文件里 则响应half消息成功
1 rollback
如果长时间没有操作 half 那么就会被逻辑删除,也就是记录到 op_topic 中。

commit
就是把 half 提交到 业务topic 中 ,这就意味着业务可见了。
6.2.1.3 保证业务一致性
- 同步发送消息+反复多次重试
- 问题一:订单事务执行成功,消息为未发送出去
- 问题二:订单事务执行成功,消息多次发送 接口严重耗时
- 所以正在保证消息一定投递到mq ,同时保证事务数据完全一致,还是上事务消息。
6.2.2 同步发送消息+反复多次重试 防丢失
6.3 broker 消息丢失
6.3.1 异步刷盘可能丢失数据
改成同步刷盘
6.3.2 磁盘坏了丢失数据
6.4 消费者-消息丢失
- 一定要处理完业务逻辑,再返回 consumeConcurrentlyStatus.CONSUME_SUCCESS 。去手动提交offset
不能在代码里采用异步处理。否则如果业务失败了,但是这边已经返回成功了。
6.5 消息幂等
因为一起原因 必要系统重启 ,超时,重试 ,会出现消息重复很正常,我们只需要回调的时候判断下,就好,如果处理了,就跳过即可。
6.6 死信队列
我们消费消息因为宕机啥的,导致消费失败,即消息处理异常,可以返回 reconsume_later,让mq 稍后重试 16次之后,就进入私信队列

我可以使用一个程序把私信队列处理掉。”%DLQ%VoucherConsymerGroup”6.6 顺序消息
因为消息进入不同的消息队列, 不同的机器执行导致消息乱序

我们可以统一订单消息 取模 路由到指定的messageQueue
- 一个MessageQueue 只能交给一个Consumer 进行处理
Consumer 处理消息异常的时候。返回 暂定重试SUSPEND_CURRENT_QUEUE_Amoent,而不是放回去重试。
6.7 tag 机制
6.8 延迟消息
6.8 mq 经验
6.8.1 灵活运用 tag 过滤数据
6.8.2 基于key 定位消息是否丢失
6.8.3 消息零丢失的补充
6.8.4 提高吞吐量
增加消费者机器,就得增加 MessageQueue
- 开启批量处理
- 增加线程数
6.8.5 要不要历史消息
也就是说,重启后,我们一般是从上一条信息开始消费
consume_from_first_offset6.9 消息积压



