丙-RocketMQ - 图1

RocketMq

基础组成

丙-RocketMQ - 图2

消息生产者(Producer

  1. 同步发送 - 需要Broker返回确认信息 (重要通知消息)
  2. 异步发送 - 需要Broker返回确认信息 (链路耗时较长而对响应时间敏感)
  3. 单向发送 (可靠性要求并不高)

消息消费者(Consumer) - ACK机制是发生在Consumer端

消费模型由Consumer决定,消费维度为Topic。

  1. 集群消费(一条消息只会被同Group中的一个Consumer消费)
  2. 广播消费(消息将对一 个Consumer Group 下的各个 Consumer 实例都消费一遍。)

RocketMQ没有真正意义的push,都是pull。push采用的是长轮询机制

  1. pull(拉取式消费)主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。
  2. push(推动式消费-长轮询机制)Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。(难以根据消费者的状态控制推送速率,适用于消息量不大、消费能力强要求实时性高的情况下。)

代理服务器(Broker Server

  1. 消息中转角色,负责存储消息、转发消息。
  2. 每个Broker节点,在启动时,都会遍历NameServer列表,与所有的NameServer建立长连接,注册自己的信息,之后定时上报

从物理结构上看 Broker 的集群部署方式有四种:
单 Master
多 Master
单台机器宕机期间,该机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受影响。
多 Master 多 Slave(异步复制)
消息采用异步复制方式,主备之间有毫秒级消息延迟。这种方式优点是消息丢失的非常少,且消息实时性不会受影响,Master 宕机后消费者可以继续从 Slave 消费,中间的过程对用户应用程序透明,不需要人工干预,性能同多 Master 方式几乎一样。缺点是 Master 宕机时在磁盘损坏情况下会丢失极少量消息。
多 Master多 Slave(异步双写
消息采用同步双写方式,主备都写成功才返回成功。优点是数据与服务都没有单点问题,Master 宕机时消息无延迟,服务与数据的可用性非常高。缺点是性能相对异步复制方式略低,发送消息的延迟会略高。

名字服务(Name Server)-(路由管理,路由注册,服务发现)

RocketMq中的NameServer是一个无状态(zookeeper有状态)的命名服务(相关组件的状态在内存中),类似于dubbo中的zookeeper。生产者或消费者能够通过NameServer查找各主题相应的Broker IP列表。多个Namesrve实例组成集群,但相互独立,没有信息交换

一般NameServer先启动,broker后启动,启动后向所有的NameServer注册,produce发送消息时,会先从NameServer获取可用的broker地址,根据负载算法选择一个并发送。

主题(Topic) - 每个主题包含若干条消息,每条消息只能属于一个主题。
Topic是个抽象的概念,每个Topic底层对应N个queue,而数据也真实存在queue上的。

消费保证

  • 发送成功后返回consume_success
  • 回溯消费

RocketMQ顺序消息

大部分逻辑依赖客户端,也就是Producer和Consumer。

  1. RocketMQ的做法是有顺序关系的消息都发送到同一个queue上,自然他们也会存到同一个broker上。根据之前讲的broker消息的存储逻辑,同一个queue的消息,先到的肯定放在前面,所以只要客户端在发送的时候使用单线程,发完一条再发另一条,消息在broker上保存的顺序自然也是按发送的顺序。
  2. Broker不知道消息的状态,那就把保证顺序这件事交给Consumer,因为第一步中有顺序关系的消息已经在同一个queue里了,consumer拿消息的时候本来也是按照存的顺序来的,所以Broker不需要做任何特殊逻辑。

RocketMQ去重

MsgId

  • ip地址决定了分布式作业环境下生产的id值唯一
  • 进程id决定了单机上多个客户端实例间生产的id值唯一
  • count作为原子Integer类型,决定了单实例运行时高并发下生产的值唯一
  • time 乃当前时间戳 - 当月开始时间戳的long值,保证应用月内重启不会重复。
    offsetMsgId
    服务端ip地址+服务端消息的物理分区偏移量来达到唯一值id。

1、极为严谨的业务必须业务幂等。
2、宽松业务可以考虑使用OffsetMsgId作为去重id。

丙-RocketMQ - 图3

Rocketmq分布式事务

分布式系统中的事务可以使用TCC(Try、Confirm、Cancel)、2pc来解决分布式系统中的消息原子性。

Prepared阶段(预备阶段)
该阶段主要发一个消息到rocketmq,但该消息只储存在commitlog中,但consumeQueue中不可见,也就是消费端(订阅端)无法看到此消息。
commit/rollback阶段(确认阶段)
该阶段主要是把prepared消息保存到consumeQueue中,即让消费端可以看到此消息,也就是可以消费此消息。

1、在扣款之前,先发送预备消息 (第一步先给 Broker 发送事务消息即半消息,并且 RocketMQ 的发送方会提供一个反查事务状态接口,如果一段时间内半消息没有收到任何操作请求,那么 Broker 会通过反查接口得知发送方事务是否执行成功,然后执行 Commit 或者 RollBack 命令)
2、发送预备消息成功后,执行本地扣款事务
3、扣款成功后,再发送确认消息
4、消息端(加钱业务)可以看到确认消息,消费此消息,进行加钱
丙-RocketMQ - 图4

  • 如果发送预备消息成功,执行本地事务成功,但发送确认消息失败;这个就有问题了,因为用户A扣款成功了,但加钱业务没有订阅到确认消息,无法加钱。这里出现了数据不一致。
    那RocketMq是怎么解决的呢?- 状态回查

因为预备消息最终肯定会变为commit消息或Rollback消息,所以遍历预备消息去回查本地业务的执行状态,如果发现本地业务没有执行成功就rollBack,如果执行成功就发送commit消息。

  1. @RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
  2. @RequiredArgsConstructor(onConstructor = @__(@Autowired))
  3. public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {
  4. private final ShareService shareService;
  5. private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
  6. /**
  7. * 发送预备消息成功后,执行本地事务
  8. * 执行本地事务 - 记录本地事务日志
  9. * RocketMQ 提供了事务消息的功能,我们只需要定义好事务反查接口即可。
  10. * @return
  11. */
  12. @Override
  13. public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
  14. MessageHeaders headers = msg.getHeaders();
  15. String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
  16. Integer shareId = Integer.valueOf((String) headers.get("share_id"));
  17. String dtoString = (String) headers.get("dto");
  18. ShareAuditDTO auditDTO = JSON.parseObject(dtoString, ShareAuditDTO.class);
  19. try {
  20. this.shareService.auditByIdWithRocketMqLog(shareId, auditDTO, transactionId);
  21. // -- 在这挂了的话 需要事务回查
  22. return RocketMQLocalTransactionState.COMMIT;
  23. } catch (Exception e) {
  24. return RocketMQLocalTransactionState.ROLLBACK;
  25. }
  26. }
  27. /**
  28. * 本地事务检查接口(回查)
  29. * 事务反查接口
  30. * kill -9 后会进来回查
  31. * @param msg
  32. * @return
  33. */
  34. @Override
  35. public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
  36. MessageHeaders headers = msg.getHeaders();
  37. String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
  38. // select * from xxx where transaction_id = xxx
  39. RocketmqTransactionLog transactionLog = this.rocketmqTransactionLogMapper.selectOne(
  40. RocketmqTransactionLog.builder()
  41. .transactionId(transactionId)
  42. .build()
  43. );
  44. log.info("本地事务检查接口(回查){}",transactionLog==null);
  45. if (transactionLog != null) {
  46. return RocketMQLocalTransactionState.COMMIT;
  47. } // 如果是 RollBack 那么订阅方收不到这条消息,等于事务就没执行过。
  48. return RocketMQLocalTransactionState.ROLLBACK;
  49. }
  50. }

回查判断业务是否成功

设计一张Transaction表,将业务表和Transaction绑定在同一个本地事务中。当RocketMq回查时,只需要检查对应的TransactionId的状态是否是「已完成」就好。

完整调用链

  1. producer和NameServer节点建立长连接
  2. 定期从NameServer获取Topic信息
  3. 并且像Broker Master建立链接 发送心跳
  4. 发送消息给Broker Master
  5. consumer 从 Master 和 Slave 一起订阅消息

RocketMQ可在broker.conf文件中配置Consumer端的重试次数和重试时间间隔。
重试队列在重试16次(默认次数)将消息放入死信队列。
死信队列中的数据需要通过新订阅该topic进行消费。

RocketMQ保证消息不丢失

Producer端

  • 采取send()同步发消息,发送结果是同步感知的。
  • 发送失败后可以重试,设置重试次数。默认3次。
    Broker端
  • 修改刷盘策略为同步刷盘。默认情况下是异步刷盘的。
  • 集群部署,主从模式,高可用。
    Consumer端
  • 完全消费正常后在进行手动ack确认。

RocketMq中零拷贝

服务器中文件数据通过网络传输到客户端的流程

  1. 数据读取和写入是从用户空间到内核空间来回复制,
  2. 而内核空间的数据是通过操作系统层面的 I/O 接口从磁盘读取或写入。
  3. read(file, tmp_buf, len);
  4. write(socket, tmp_buf, len);
  1. 应用程序通过read进行操作系统函数调用,此时cpu由用户态切换到内核态,此时DMA(直接内存访问)引擎直接将磁盘上数据读取到内核缓冲区
  2. 内核缓冲区中的数据进行拷贝,复制到用户空间缓冲区,在内存空间之间数据拷贝,是需要cpu来参与的,拷贝结束cpu状态由内核态转换到用户态
  3. 用户空间的数据想要发送到客户端,通过write进行操作系统函数调用,这个时候发生了cpu状态的切换,由用户态转换到内核态。然后用户缓冲区数据拷贝到内核socket发送缓冲区,这个时候的复制也需要cpu进行参与
  4. 最后socket缓冲区中的数据需要复制到网卡缓冲区中,由网卡发送到客户端,然后cpu状态切换到用户态
    丙-RocketMQ - 图5
    在这个过程中发生了2次cpu copy和2次DMA copy,以及发生了 4 次用户态与内核态的上下文切换。要想提高文件传输的性能,就需要减少「用户态与内核态的上下文切换」和「内存拷贝」的次数。
  • 要想减少上下文切换到次数,就要减少系统调用的次数。
  • 文件传输的应用场景中,在用户空间我们并不会对数据「再加工」,用户的缓冲区是没有必要存在
    因此所谓的零拷贝就是,让其中的2次cpu拷贝省略掉,因为这两次cpu拷贝的数据其实已经在内存中,没有必要再让cpu参与进来进行数据的拷贝,浪费cpu。在大量文件读写的时候,这个优化带来的收益还是比较可观的。

零拷贝的实现方式

零拷贝就是避免数据在内核空间缓存区和用户空间缓缓冲区之间的复制,避免掉2次cpu复制,释放cpu。

  • mmap+write (即使频繁调用,使用小块文件传输,效率也很高;)
    mmap() 系统调用函数会直接把内核缓冲区里的数据「映射」到用户空间,这样,操作系统内核与用户空间就不需要再进行任何的数据拷贝操作。通过使用 mmap() 来代替 read(), 可以减少一次数据拷贝的过程。
  • sendfile (可以利用 DMA 方式,消耗 CPU 较少,大块文件传输效率高,无内存安全新问题)
    第一步,通过 DMA 将磁盘上的数据拷贝到内核缓冲区里;第二步,缓冲区描述符和数据长度传到 socket 缓冲区,这样网卡的 SG-DMA 控制器就可以直接将内核缓存中的数据拷贝到网卡的缓冲区里,此过程不需要将数据从操作系统内核缓冲区拷贝到 socket 缓冲区中,这样就减少了一次数据拷贝;

无论是mmap结合write方式还是sendfile方式都只是减少了一次cpu拷贝,而后DMA引擎还具有了收集功能,可以在内核缓存区发送到socket缓冲区的时候避免掉cpu复制,只是将缓冲区地址和数据长度发送给socket缓冲区,然后DMA引擎通过收集功能直接读取收集数据发送到网卡中。这里依赖DMA引擎的收集功能省略掉了最后一次cpu拷贝,到此才是真正的零拷贝。

零拷贝( 通过一次系统调用(sendfile 方法)合并了磁盘读取与网络发送两个操作 )技术,没有在内存层面去拷贝数据,也就是说全程没有通过 CPU 来搬运数据,所有的数据都是通过 DMA 来进行传输的。
零拷贝技术的文件传输方式相比传统文件传输的方式,减少了 2 次上下文切换和数据拷贝次数,只需要 2 次上下文切换和数据拷贝次数,就可以完成文件的传输,而且 2 次的数据拷贝过程,都不需要通过 CPU,2 次都是由 DMA 来搬运。
零拷贝技术可以把文件传输的性能提高至少一倍以上。

RocketMq中采用mmap()+write()方式实现零拷贝

因为有小块数据传输的需求,效果会比 sendfile 更好。

  1. FileChannel.map(MapMode.READ_WRITE, 0, fileSize);
  1. java中还可以通过FileChannel.transferTo()来实现数据从文件描述符传输到socket中,它的底层是通过sendfile系统调用来实现。

相关问题

消息积压解决方案

由于堆积的topic里面message queue数量固定,即使我们这时增加consumer的数量,它也不能分配到message queue。这时我们可以写一个分发程序做一个临时topic队列扩充,来提高消费者能力。程序从旧的topic中读取到新的topic,只是新的topic的queue可以指定多一点(理论上可以无限扩充,正常1000以内),然后启动更多的consumer在临时新的topic里面消费

RocketMQ中的消息只会在commitLog被删除的时候才会消失,不会超时。也就是说未被消费的消息不会存在超时删除这情况。
堆积的消息并不会进死信队列,只有在消费失败后会进入重试队列18次才会进入死信队列。

RocketMQ Broker中的消息被消费后不会立即删除

不会,每条消息都会持久化到CommitLog中,当有消息消费后只是当前Consumer的消费进度(CommitLog的offset)更新了。

  • 那么消息会堆积吗?什么时候清理过期消息?
    默认48小时后会删除不再使用的CommitLog文件。(指定时间删除)

为什么要主动拉取消息而不使用事件监听方式?

如果broker主动推送消息的话有可能push速度快,消费速度慢的情况,那么就会造成消息在consumer端堆积过多,同时又不能被其他consumer消费的情况。而pull的方式可以根据当前自身情况来pull,不会造成过多的压力而造成瓶颈。所以采取了pull的方式。

RocketMQ如何做负载均衡?

通过Topic在多Broker中分布式存储实现。
producer端
发送端指定message queue发送消息到相应的broker,来达到写入时的负载均衡。
consumer端
采用的是平均分配算法来进行负载均衡。

平均分配策略(默认)(AllocateMessageQueueAveragely) 环形分配策略(AllocateMessageQueueAveragelyByCircle) 手动配置分配策略(AllocateMessageQueueByConfig) 机房分配策略(AllocateMessageQueueByMachineRoom) 一致性哈希分配策略(AllocateMessageQueueConsistentHash) 靠近机房策略(AllocateMachineRoomNearby)

  • 当消费负载均衡consumer和queue不对等的时候会发生什么?
    如果Consumer少于queue的个数,则会存在部分Consumer消费多个queue的情况。

高吞吐量下如何优化生产者和消费者的性能?

  • 同一group下,多机部署,并行消费
  • 单个Consumer提高消费线程个数
  • 批量消费(消息批量拉取,业务逻辑批量处理)
  • 网卡调优+jvm调优+多线程与cpu调优+Page Cache

消息队列之推还是拉,RocketMQ 是如何做的?

RocketMQ 和 Kafka 都选择了拉模式(利用“长轮询”),具体的做法都是通过消费者等待消息,当有消息的时候 Broker 会直接返回消息,如果没有消息都会采取延迟处理的策略,并且为了保证消息的及时性,在对应队列或者分区有新消息到来的时候都会提醒消息来了,及时返回消息。
一句话说就是消费者和 Broker 相互配合,拉取消息请求不满足条件的时候 hold 住,避免了多次频繁的拉取动作,当消息一到就提醒返回

RocketMQ 中的长轮询

RocketMQ 中的 PushConsumer 其实是披着拉模式的方法,只是看起来像推模式而已。

  1. 后台会有个 RebalanceService 线程,这个线程会根据 topic 的队列数量和当前消费组的消费者个数做负载均衡,每个队列产生的 pullRequest 放入阻塞队列 pullRequestQueue 中。然后又有个 PullMessageService 线程不断的从阻塞队列 pullRequestQueue 中获取 pullRequest,然后通过网络请求 broker,这样实现的准实时拉取消息。
  2. Broker 的 PullMessageProcessor 里面的 processRequest 方法是用来处理拉消息请求的,有消息就直接返回,如果没有消息就将请求挂起。
  3. PullRequestHoldService 这个线程会每 5 秒从 pullRequestTable 取PullRequest请求,然后看看待拉取消息请求的偏移量是否小于当前消费队列最大偏移量,如果条件成立则说明有新消息了,则会调用 notifyMessageArriving ,最终调用 PullMessageProcessor 的 executeRequestWhenWakeup() 方法重新尝试处理这个消息的请求,也就是再来一次,整个长轮询的时间默认 30 秒。
  4. 还有个 ReputMessageService 线程,这个线程用来不断地从 commitLog 中解析数据并分发请求,构建出 ConsumeQueue 和 IndexFile 两种类型的数据,并且也会有唤醒请求的操作,来弥补每 5s 一次这么慢的延迟。

消息写入并且会调用 pullRequestHoldService#notifyMessageArriving

丙-RocketMQ - 图6