序言

producer 负责消息的发送,主要分为 3 个步骤,分别为:

  1. 启动
  2. 发消息
    1. 普通消息
    2. 顺序消息
    3. 广播消息
    4. 定时消息
    5. 批量消息
    6. 过滤消息
    7. 事务消息
  3. 关闭

在开始本节之前,请先思考如下问题?

  1. producer 和 namesrv 、broker 如何交互?
    1. 本地是否会存放所有 broker 的列表?
    2. 发送一条消息有几次交互?
  2. 消息发送到哪台 broker 上?如何保证顺序消息?
  3. 如果发消息失败了如何处理?
    1. 直接返回失败
    2. 重试(重试策略),重试保证幂等吗?如何保证?
    3. 加入本地队列后续重试
  4. 发送无序消息、顺序消息、事务消息、延时消息有何区别?
  5. 启动之后有几个线程?分别做什么操作?
  6. 有哪些配置项?在哪个环节起作用?

  7. message trace 是什么功能?

    一、启动

1. 时序图

启动过程包括如下几个步骤:

  1. 启动线程
  2. 和 ns 同学获取 topic 对应的 broker 地址是启动过程还是发送过程?

启动

不论是producer还是consumer使用同一个MQClientInstance与server通信.

  1. 如果没有指定namesrv地址,将会自动寻址
  2. 生成MQClientInstance类,该类关联一个NettyRemotingClient
  3. 启动MQClientInstance
    1. 启动NettyRemotingClient
    2. 启动定时任务:
      1. 更新namesrv地址
      2. 从namsrv更新topic路由信息
      3. 清理已经挂掉的broker、向所有broker发送心跳
      4. 持久化consumer的offset
      5. 调整DefaultMQPushConsumerImpl的线程池
    3. 启动PullMessageService
    4. 启动RebalanceService
  4. 启动CLIENT_INNER_PRODUCER
  5. 向所有broker发送心跳信息

1. 整体流程

producer 概述 - 图1

注:一共两个重点:

  1. 设置 Topic 发布信息
  2. 创建 MQClientInstance 并启动

2. 配置项&如何起作用

3. 定时任务

定时任务线程池:

  1. 线程数 - 1 个
  2. 线程名 - MQClientFactoryScheduledThread
任务名 作用 周期
fetchNameServerAddr 获取 namesrv 地址 初始延迟 10s,120s 执行一次
updateTopicRouteInfoFromNameServer 更新 consumer 和 producer 的 topic 路由信息 初始化延迟 10ms,pollNameServerInterval 执行一次,默认30s
cleanOfflineBroker&sendHeartbeatToAllBrokerWithLock
1. 清理离线 broker
2. 向 broker 发送心跳
初始化延迟 1s,heartbeatBrokerInterval 执行一次,默认30s
persistAllConsumerOffset 持久化消费者的 offset 初始化延迟 10s,persistConsumerOffsetInterval 执行一次,默认5s
adjustThreadPool 自适应调节消费者线程数 初始化延迟 1min,1min 执行一次

4. 启动线程

分类 名字 线程个数 代码 功能
通信
业务 业务定时任务线程 1 MQClientInstance#startScheduledTask 执行定时任务
pullMessageService 拉取消息
RebalanceService 重平衡服务
  1. 发消息线程
  2. 远程通信线程(netty)
  3. 业务线程

二、发送消息

三、关闭

producer启动

producer相关属性

属性名 是否必须 说明
producerGroup 同组producer可在事务消息中相互合作;对于非事务消息,每个JVM进程中的producer有不同的组名
namesrvAddr namesrv地址,若未直接设置,则通过 TopAddressing 发起http请求获取namesrv地址

发送消息

下面以 SYNC 方式为例,看下整个消息的发送过程,其他方式略有差异,总体流程类似。

根据 Topic 找到指定的 TopicPublishInfo

  1. 先去本地 map 找
  2. 如果没有,就去 Namesrv fetch
  3. 如果 Namesrv 里也没有,则用默认的 Topic 再去 fetch TopicRouteData.
    获取TopicRouteData之后,将其解析为pub使用的TopicPublishInfo和sub使用的MessageQueue分别更新对应table
  1. public class TopicPublishInfo {
  2. private boolean orderTopic = false;
  3. private boolean haveTopicRouterInfo = false;
  4. private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
  5. private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
  6. private TopicRouteData topicRouteData;
  7. }
  8. public class TopicRouteData {
  9. private String orderTopicConf;
  10. private List<QueueData> queueDatas;
  11. private List<BrokerData> brokerDatas;
  12. private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
  13. }
  1. QueueData 定义了这个 read 和 write 的 queue的数量,Client 在拿到 TopicRouteData 后,会根据这里配的数量去构建响应数目的messageQueue,即 messageQueueList. brokerDatas 保存了各个 broker 的相关信息。

  2. 从 messageQueueList 中选择一个 MessageQueue

    如果没有 enable latencyFaultTolerance,就用递增取模的方式选择。如果 enable 了,在递增取模的基础上,再过滤掉 not available 的。这里所谓的 latencyFaultTolerance, 是指对之前失败的,按一定的时间做退避:
  1. long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
  2. long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
  1. 举个例子,如果上次请求的 latency 超过 550L ms, 就退避 3000L ms;超过 1000L,就退避 60000L.
  2. 以上就是 Producer 到 Broker 的简单的负载均衡。

发送消息

到这一步,我们已经拿到了这些关键数据:

  • Message, 要发送的消息
  • MessageQueue,这里面包括 topic/brokerName/queueId
  • CommunicationMode, 发送方式, SYNC/ASYNC/ONEWAY
  • TopicPublishInfo

有了这些数据,就可以构建 RequestHeader 了,大部分字段意思都很明显(当然,前提是对RocketMQ的源码有所熟悉),个别字段见注释。

  1. requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
  2. requestHeader.setTopic(msg.getTopic());
  3. requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
  4. requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
  5. requestHeader.setQueueId(mq.getQueueId());
  6. //系统Flag, 用于判断走什么逻辑。标识是否压缩,事务的不同TYPE(prepare/rollback/commit/not transaction) 等
  7. requestHeader.setSysFlag(sysFlag);
  8. requestHeader.setBornTimestamp(System.currentTimeMillis());
  9. //消息Flag, 最终会落地
  10. requestHeader.setFlag(msg.getFlag());
  11. requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
  12. requestHeader.setReconsumeTimes(0);
  13. //TODO,暂不知道这个字段是干嘛用的
  14. requestHeader.setUnitMode(this.isUnitMode());
  15. requestHeader.setBatch(msg instanceof MessageBatch);

最后用这些 header 字段,以及 message body 构建 RemotingCommand,通过 remoting 模块发给 broker.

处理结果

  • 发送成功:直接返回发送结果
  • 发送失败:如果 enable retryAnotherBrokerWhenNotStoreOK,就会重试,默认重试两次(retryTimesWhenSendFailed)。否则直接返回结果
  • 发送异常:Producer 对异常做了很好的区分,如果是 Remoting 和 Client 模块的异常,就重试,如果是 Broker 模块的异常,根据不同的 response code 做不同的处理,有的重试,有的抛出异常,有的返回结果。

注意事项

  1. 状态转移做幂等,比如 start
  2. 初始化延迟 10ms, 任务启动

    参考

  3. 【RocketMQ源码学习】4-消息发送