前言

介绍producer的启动过程

DefaultMQProducer 启动

  1. new
    producerGroup 生产组名
    defaultMQProducerImpl new出来,创建defaultAsyncSenderExecutor 用于发送消息的消息队列

  2. set namesrvAddr

3.start 启动
1)含有namespace的,将group包裹下,设置为namespace + group 组合
2)defaultMQProducerImpl 启动

defaultMQProducerImpl 启动

1.检查config的group名称是否正确
2.非内部的goup = CLIENT_INNER_PRODUCER ,修改instanceName,默认是DEFAULT的修改为 pid (进程id)
this.instanceName = String.valueOf(UtilAll.getPid());
3.获取 MQClientInstance
1)从唯一的MQClientManager获取,先拿到MQClientManager
2)拿到唯一的clientId MQClientInstance的 clientId = ip@instanceName@unitName
3)从map中获取instance
4)没有instance时,创建instance
5)instance中的producerTable放入当前的defaultMQProducerImpl
6)topicPublishInfoTable 放入 topic路由信息,TBW102
7) 启动 MQClientInstance
8)发送心跳

MQClientInstance 创建

  1. 配置 nettyClientConfig
    2. 创建 ClientRemotingProcessor
    3. 创建 MQClientAPIImpl (API交互的,创建nettyClient,加入不同的任务processorTable)
    4. 更新 MQClientAPIImpl的namesevAddr
    5. 创建默认的defaultMQProducer

MQClientInstance 启动

  1. 启动 MQClientAPIImpl (启动了netty)
    2. 启动 scheduledTask
    3. 启动 pullMessageService
    4. 启动 rebalanceService
    5. 启动 默认的CLIENT_INNER_PRODUCER的producer

发送消息时

  1. 设置topic (namespace 包裹下)
    2. defaultMQProducerImpl 进行send
    3. 获取 topicPublishInfoTable 获取TopicPUblishInfo(里面有队列数)
    4. 根据publishInfo信息选择一个队列
    5. 开始进行发送
    6. 先根据MQClientInstance 找到brokerAddr地址
    7. 设置message的UNIQ_KEY
    8. 构建 SendMessageRequestHeader
    9. 如果是SYNC,开始使用 MQClientAPIImpl发送
    10. 构建RemotingCommand
    11. 使用 sendMessageSync 发送
    12. 获取nettyRemotingClient