Consumer端:
RocketMQ提供了两种消费模式:PUSH(pull进行监听)和PULL(长轮训)
1. Push 方式:rocketmq 已经提供了很全面的实现,consumer 通过长轮询拉取消息后回调
MessageListener 接口实现完成消费, 应用系统只要 MessageListener 完成业务逻辑即可
2. Pull 方式:完全由业务系统去控制,定时拉取消息,指定队列消费等等,当然这里需要业务系统
根据自己的业务需求去实现。
这两种模式分别对应的是DefaultMQPushConsumer类和DefaultMQPullConsumer类
设计:简化程序员使用rocketmq消息中间件,pull
调用broker
org.apache.rocketmq.client.impl.consumer.PullMessageService#run
消费模型:
org.apache.rocketmq.common.protocol.heartbeat.MessageModel#BROADCASTING
org.apache.rocketmq.common.protocol.heartbeat.MessageModel#CLUSTERING
消费选择:
org.apache.rocketmq.common.consumer.ConsumeFromWhere#CONSUME_FROM_LAST_OFFSET
第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费
org.apache.rocketmq.common.consumer.ConsumeFromWhere#CONSUME_FROM_FIRST_OFFSET
第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费
org.apache.rocketmq.common.consumer.ConsumeFromWhere#CONSUME_FROM_TIMESTAMP
第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费
以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始
消息重复幂等:
RocketMQ无法避免消息重复,所以如果业务对消费重复非常敏感,务必要在业务层面去重
Ps:见开发文档 接口幂等性处理 redis incr
消息过滤:
enablePropertyFilter=true
Status=1 消费 status=2不需要
为什么要有组group:?
sh mqshutdown broker
sh mqshutdown namesrv
Namesrv端:
Namesrv入口:org.apache.rocketmq.namesrv.NamesrvStartup
Namesrv 名称服务,是没有状态可集群横向扩展。可以理解为一个注册中心, 整个Namesrv的代码非常简单,主要包含两块功能:
1、管理一些 KV 的配置
2、管理一些 Topic、Broker的注册信息
大致提供服务为:
1. 每个 broker 启动的时候会向 namesrv 注册
2. Producer 发送消息的时候根据 topic 获取路由到 broker 的信息
3. Consumer 根据 topic 到 namesrv 获取 topic 的路由到 broker 的信息
Broker端:
程序入口:org.apache.rocketmq.broker.BrokerStartup
存储&索引:
存储文件结构:
org.apache.rocketmq.store.index.IndexFile
索引文件由索引文件头(IndexHeader)+(槽位 Slot )+(消息的索引内容)三部分构成。
beginTimestamp 8 位 long 类型,索引文件构建第一个索引的消息落在 broker 的时间
endTimestamp 8 位 long 类型,索引文件构建最后一个索引消息落 broker 时间
beginPhyOffset 8 位 long 类型,索引文件构建第一个索引的消息 commitLog 偏移量endPhyOffset 8 位 long 类型,索引文件构建最后一个索引消息
commitLog 偏移量hashSlotCount 4 位 int 类型,构建索引占用的槽位数(这个值貌似没有具体作用)
indexCount 4 位 int 类型,索引文件中构建的索引个数
槽位 slot
默认每个文件配置的 slot 个数为 500 万个,每个 slot 是 4 位的 int 类型数据计算消息的对应的 slotPos=Math.abs(keyHash)%hashSlotNum
消息在 IndexFile 中的偏移量,Slot 存储的值为消息个数索引
消息的索引内容是 20 位定长内容的数据
4 位 int 值, 存储的是 key 的 hash 值
8 位 long 值 存储的是消息在 commitlog 的物理偏移量phyOffset
4 位 int 值 存储了当前消息跟索引文件中第一个消息在 broker落地的时间差
4 位 int 值 如果存在 hash 冲突,存储的是上一个消息的索引地址
序列化
在RocketMQ中,RemotingCommand这个类在消息传输过程中对所有数据内容的封装,不但包含了所有的数据结构,还包含了编码解码操作
org.apache.rocketmq.remoting.protocol.RemotingCommand
本地启动集群:
1、持久化 producer发送数据了写磁盘
2、集群完好