整体架构

参考了kafka:
image.png

  • NameServer: 类似于kafka的ZK

    ZK本身存在主从和数据同步的。nameServer是一个去中心化的结构,不进行通信,NameServer可用性比较高,主要为了维护消息路由信息。 路由信息包括了:BrokerServer Topic ConsumeQueueId

  • Broker:节点, 和每一个nameserver都有一个基于Netty的长连接,定期注册topic.

  • 三个日志文件:CommitLog 消息内容,不区分topic, 1G 1个。ConsumeQueueLog commitlog基于topic的索引文件,IndexFile是通过key或者时间区间的索引。
  • 负载均衡

    事务消息原理

    依赖于TransactionListener接口

  • executeLocalTransaction 方法会在发送消息后调用,用于执行本地事务,如果本地事务执行成功,rocketmq再提交消息

  • checkLockTransaction用于本地事务的检查,rocketmq依赖此方法做补偿

通过两个内部的topic来实现消息的两阶段支持。
prepare: 将消息(消息上带有事务的标识)投递到名为RMC_SYS_TRANS_HALF_TOPIC的topic中,而不是直接投递到真正的topic。
commit/rollback: producer再通过TransactionListener的executeLocalTransaction方法来执行本地事务,当producer的localTransation处理成功或者失败后,producer会向broker发送commit或者rollback命令,如果是COMMIT,则broker会将投递到RMC_SYS_TRANS_HALF_TOPIC中的消息投递到真实的topic中,然后再投递一个删除的消息到RMC_SYS_TRANS_HALF_TOPIC中,表示当前事务已完成。如果是rollback,则没有投递到真实topic的过程,只需要投递删除的消息到RMC_SYS_TRANS_HALF_TOPIC。最后,消息者和消息普通的消息一样消费消息。

  • 第一阶段(prepare失败):给应用返回发送消息失败
  • 事务失败:发送回滚命令给broker,由broker执行消息的回滚
  • Commit或者rollback失败,由broker定时向producer发起事务回滚,如果本地事务成功,则提交消息事务,否则回滚消息事务。

事务检查由两种情况:

  • commit/rollback: broker会执行相应的commit/rollback操作
  • 如果type是TRANSACTION_NOT_TYPE,则一段时间后会检查,当检查次数超过上限(默认15次)则丢弃消息。

    怎么实现顺序消息?

    默认是不能保证的,需要程序去保证发送和消息的是同一个queue,多线程消费也无法保证
    发送顺序:发送端自己业务逻辑保证先后,发往一个固定的Queue, 生产者可以在消息体上设置消息的顺序
    发送者实现MessageQueueSelector, 选择一个queue进行发送,也可使用rocketmq提供的默认实现:

  • SelectMessageQueueByHash: 按照参数的hashCode与可选择的队列进行求余操作

  • SelectMessageQueueByRandom: 随机选择

mq: queue本身就是顺序追加写,只需要保证一个队列同一时间只有一个comsumer再消费,通过加锁实现,consumer上的顺序消息有一个定时任务,每隔一定的时间向broker发送请求演唱锁定。
pull模式:消费者需要自己维护需要拉取的queue, 一次拉取的消息都是顺序的,需要消费者自己保证顺序消费
pull模式:消费实例实现自MQPushConsumer接口,提供注册监听的方法消费消息,registerMessageListener、重载方法

  • MessageListenerConcurrently : 并行消费
  • MessageListenerOrderly: 串行消费,consumer会把消息放入本地队列并加锁,定时任务保证锁的同步。

    持久化机制

    CommitLog

    日志数据文件,被所有的queue共享,大小为1G,写满之后重新生成,顺序写

    ConsumeQueue

    逻辑queue, 消息先到达commitlog,然后异步转发到consumeQueue,包含queue在commitLog中的物理位置偏移量Offset,消息实体内容大小和message tag 的hash值,大小约为600W个字节,写满之后重新生成,顺序写

    index file

    通过时间或者区间来查找commitlog中的消息,文件名以创建的时间戳命名,固定的单个index file大小为400M,可以保存2000W个索引

所有队列公用一个日志数据文件,避免了kafka分区数过多,日志文件过多导致的磁盘IO读写压力较大造成的性能瓶颈,rocketmq的queue只存储少量数据,更加轻量化,对磁盘的访问是串行化避免磁盘竞争,缺点在于,写入的顺序写,读写是随机读,写读ConsumeQueue,再读CommitLog,会降低消息读的效率。

消息发送到broker之后,会被写入commit log,写之前加锁,保证顺序写入,然后转发到ConsumeQueue

消息消费时先从ConsumeQueue读取在CommitLog中的起始位置偏移量Offset, 消息大小和消息tag的hashCode值,在从Commit Log中读取消息的内容。

  • 同步刷盘:消息持久化到磁盘才会给生产者返回ack,可以保证消息可靠,但是会影响性能
  • 异步刷盘:消息写入page Cache就返回ack给生产者,刷盘采用异步线程,降低读写延迟提高性能和吞吐。消息存在丢失的 风险。

如何保证不丢消息?

生产者

  • 同步阻塞的方式发送消息,加上失败重试机制,可能broker存储失败,而可以通过查询确认
  • 异步发送需要重写回调方法,检查发送结果
  • ack机制,可能存储Commit Log,存储ConsumeQueue失败,此时对消费者不可见

同步发送、同步刷盘

生产者

  • offset手动提交,消息消费保证幂等等。

    底层实现原理

    RocketMq由nameserver集群、broker集群、Consumer集群组成,消息生产和消费大致原理如下:
  1. Broker在启动的时候会向所有的nameservier注册,并保持长连接,每30秒发送一次心跳
  2. Produdcer在发送消息的时候从NameServer获取Broker服务器地址,根据负载均衡算法选择一台服务器来发发送消息
  3. Consumer消费消息的时候同样从NameServer获取Borker的地址,然后主动拉取消息来消费。

    使用mq如何保证分布式事务的最终一致性?

    分布式事务:业务相关的多个操作要保证他们同时成功同时失败。
    事务的最终一致性:和强一致性对应。
    需要保证两点:

  4. 生产者需要保证消息能够100%投递,事务消息机制

  5. 消费者一端需要保证幂等消费

    保证消费的幂等性

    防止消费者重复消费的问题,所有的mq产品都没有主动解决幂等性的问题,需要由消费者自行控制。
  • 最好就是自己带一个业务标识的id,来进行幂等判断

    MQ怎么保证消息的高效读写

    零拷贝计数来优化文件的读写。
    传统文件的拷贝方式:
    硬件拷贝 -> 内核拷贝 -> 用户空间
    零拷贝:
    省略用户空间的拷贝,用户空间支持拿到一次虚拟的映射,映射修改,整个文件是在内存空间完成读写,减少了两次拷贝。这就是mmap. Mmap方式通过MappedByteBuffer对象进行操作,transfile通过FileChannel来进行操作。
    在底层使用DirectMemory技术。
    mmap适合操作比较小的文件,通常文件大小不超过1.5G到2G之间。

  • RocketMq当中使用Mmap方式来进行文件的读写。先初始化1G的文件,然后再往里面去写数据。

    如何保证消息的顺序

    全局有序,局部有序;MQ只需要保证局部有序,不需要保证全局有序。
    MessageQueneSelector。把消息发送到一个固定的队列上去。
    消费者端注册一个监听,MessageListenerOrderly
    MQ的结构就是先进先出。
    核心:生产者把一组有序的消息放到同一个队列中,而消费者一次消费整个队列当中的消息。

如何进行产品选型

MQ 产品 优点 缺点 使用场景
Kafka
1. 吞吐量非常大
1. 性能非常好
1. 集群高可用

1. 会丢失数据
1. 功能比较单一
日志分析、大数据采集
Rabbit Mq
1. 消息可靠性高
1. 功能全面

1. 吞吐量比较低,并发性能不高
1. 消息积压会严重影响性能
1. erlang语言不好定制
适合小规模场景
Rocket mq
1. 高吞吐、高性能、高可用
1. 功能比较全面

1. 开源版本功能不如云上商业版
1. 官方文档和周边生态不太成熟,客户端只支持java
几乎是全场景。