序言
producer 负责消息的发送,主要分为 3 个步骤,分别为:
- 启动
- 发消息
- 普通消息
- 顺序消息
- 广播消息
- 定时消息
- 批量消息
- 过滤消息
- 事务消息
- 关闭
在开始本节之前,请先思考如下问题?
- producer 和 namesrv 、broker 如何交互?
- 本地是否会存放所有 broker 的列表?
- 发送一条消息有几次交互?
- 消息发送到哪台 broker 上?如何保证顺序消息?
- 如果发消息失败了如何处理?
- 直接返回失败
- 重试(重试策略),重试保证幂等吗?如何保证?
- 加入本地队列后续重试
- 发送无序消息、顺序消息、事务消息、延时消息有何区别?
- 启动之后有几个线程?分别做什么操作?
有哪些配置项?在哪个环节起作用?
-
一、启动
1. 时序图
启动过程包括如下几个步骤:
- 启动线程
- 和 ns 同学获取 topic 对应的 broker 地址是启动过程还是发送过程?
启动
不论是producer还是consumer使用同一个MQClientInstance与server通信.
- 如果没有指定
namesrv地址,将会自动寻址 - 生成MQClientInstance类,该类关联一个NettyRemotingClient
- 启动MQClientInstance
- 启动NettyRemotingClient
- 启动定时任务:
- 更新namesrv地址
- 从namsrv更新topic路由信息
- 清理已经挂掉的broker、向所有broker发送心跳
- 持久化consumer的offset
- 调整DefaultMQPushConsumerImpl的线程池
- 启动PullMessageService
- 启动RebalanceService
- 启动CLIENT_INNER_PRODUCER
- 向所有broker发送心跳信息
1. 整体流程
注:一共两个重点:
- 设置 Topic 发布信息
- 创建 MQClientInstance 并启动
2. 配置项&如何起作用
3. 定时任务
定时任务线程池:
- 线程数 - 1 个
- 线程名 - MQClientFactoryScheduledThread
| 任务名 | 作用 | 周期 |
|---|---|---|
| fetchNameServerAddr | 获取 namesrv 地址 | 初始延迟 10s,120s 执行一次 |
| updateTopicRouteInfoFromNameServer | 更新 consumer 和 producer 的 topic 路由信息 | 初始化延迟 10ms,pollNameServerInterval 执行一次,默认30s |
| cleanOfflineBroker&sendHeartbeatToAllBrokerWithLock | 1. 清理离线 broker 2. 向 broker 发送心跳 |
初始化延迟 1s,heartbeatBrokerInterval 执行一次,默认30s |
| persistAllConsumerOffset | 持久化消费者的 offset | 初始化延迟 10s,persistConsumerOffsetInterval 执行一次,默认5s |
| adjustThreadPool | 自适应调节消费者线程数 | 初始化延迟 1min,1min 执行一次 |
4. 启动线程
| 分类 | 名字 | 线程个数 | 代码 | 功能 |
|---|---|---|---|---|
| 通信 | ||||
| 业务 | 业务定时任务线程 | 1 | MQClientInstance#startScheduledTask | 执行定时任务 |
| pullMessageService | 拉取消息 | |||
| RebalanceService | 重平衡服务 |
- 发消息线程
- 远程通信线程(netty)
- 业务线程
二、发送消息
三、关闭
producer启动
producer相关属性
| 属性名 | 是否必须 | 说明 |
|---|---|---|
| producerGroup | 是 | 同组producer可在事务消息中相互合作;对于非事务消息,每个JVM进程中的producer有不同的组名 |
| namesrvAddr | 否 | namesrv地址,若未直接设置,则通过 TopAddressing 发起http请求获取namesrv地址 |
发送消息
下面以 SYNC 方式为例,看下整个消息的发送过程,其他方式略有差异,总体流程类似。
根据 Topic 找到指定的 TopicPublishInfo
- 先去本地 map 找
- 如果没有,就去 Namesrv fetch
- 如果 Namesrv 里也没有,则用默认的 Topic 再去 fetch TopicRouteData.
获取TopicRouteData之后,将其解析为pub使用的TopicPublishInfo和sub使用的MessageQueue分别更新对应table
public class TopicPublishInfo {private boolean orderTopic = false;private boolean haveTopicRouterInfo = false;private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();private TopicRouteData topicRouteData;}public class TopicRouteData {private String orderTopicConf;private List<QueueData> queueDatas;private List<BrokerData> brokerDatas;private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;}
- QueueData 定义了这个 read 和 write 的 queue的数量,Client 在拿到 TopicRouteData 后,会根据这里配的数量去构建响应数目的messageQueue,即 messageQueueList. brokerDatas 保存了各个 broker 的相关信息。
从 messageQueueList 中选择一个 MessageQueue
如果没有 enable latencyFaultTolerance,就用递增取模的方式选择。如果 enable 了,在递增取模的基础上,再过滤掉 not available 的。这里所谓的 latencyFaultTolerance, 是指对之前失败的,按一定的时间做退避:
long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
- 举个例子,如果上次请求的 latency 超过 550L ms, 就退避 3000L ms;超过 1000L,就退避 60000L.
- 以上就是 Producer 到 Broker 的简单的负载均衡。
发送消息
到这一步,我们已经拿到了这些关键数据:
- Message, 要发送的消息
- MessageQueue,这里面包括 topic/brokerName/queueId
- CommunicationMode, 发送方式, SYNC/ASYNC/ONEWAY
- TopicPublishInfo
有了这些数据,就可以构建 RequestHeader 了,大部分字段意思都很明显(当然,前提是对RocketMQ的源码有所熟悉),个别字段见注释。
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTopic(msg.getTopic());requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());requestHeader.setQueueId(mq.getQueueId());//系统Flag, 用于判断走什么逻辑。标识是否压缩,事务的不同TYPE(prepare/rollback/commit/not transaction) 等requestHeader.setSysFlag(sysFlag);requestHeader.setBornTimestamp(System.currentTimeMillis());//消息Flag, 最终会落地requestHeader.setFlag(msg.getFlag());requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));requestHeader.setReconsumeTimes(0);//TODO,暂不知道这个字段是干嘛用的requestHeader.setUnitMode(this.isUnitMode());requestHeader.setBatch(msg instanceof MessageBatch);
最后用这些 header 字段,以及 message body 构建 RemotingCommand,通过 remoting 模块发给 broker.
处理结果
- 发送成功:直接返回发送结果
- 发送失败:如果 enable retryAnotherBrokerWhenNotStoreOK,就会重试,默认重试两次(retryTimesWhenSendFailed)。否则直接返回结果
- 发送异常:Producer 对异常做了很好的区分,如果是 Remoting 和 Client 模块的异常,就重试,如果是 Broker 模块的异常,根据不同的 response code 做不同的处理,有的重试,有的抛出异常,有的返回结果。
