前言
介绍producer的启动过程
DefaultMQProducer 启动
new
producerGroup 生产组名
defaultMQProducerImpl new出来,创建defaultAsyncSenderExecutor 用于发送消息的消息队列set namesrvAddr
3.start 启动
1)含有namespace的,将group包裹下,设置为namespace + group 组合
2)defaultMQProducerImpl 启动
defaultMQProducerImpl 启动
1.检查config的group名称是否正确
2.非内部的goup = CLIENT_INNER_PRODUCER ,修改instanceName,默认是DEFAULT的修改为 pid (进程id)
this.instanceName = String.valueOf(UtilAll.getPid());
3.获取 MQClientInstance
1)从唯一的MQClientManager获取,先拿到MQClientManager
2)拿到唯一的clientId MQClientInstance的 clientId = ip@instanceName@unitName
3)从map中获取instance
4)没有instance时,创建instance
5)instance中的producerTable放入当前的defaultMQProducerImpl
6)topicPublishInfoTable 放入 topic路由信息,TBW102
7) 启动 MQClientInstance
8)发送心跳
MQClientInstance 创建
- 配置 nettyClientConfig
2. 创建 ClientRemotingProcessor
3. 创建 MQClientAPIImpl (API交互的,创建nettyClient,加入不同的任务processorTable)
4. 更新 MQClientAPIImpl的namesevAddr
5. 创建默认的defaultMQProducer
MQClientInstance 启动
- 启动 MQClientAPIImpl (启动了netty)
2. 启动 scheduledTask
3. 启动 pullMessageService
4. 启动 rebalanceService
5. 启动 默认的CLIENT_INNER_PRODUCER的producer
发送消息时
- 设置topic (namespace 包裹下)
2. defaultMQProducerImpl 进行send
3. 获取 topicPublishInfoTable 获取TopicPUblishInfo(里面有队列数)
4. 根据publishInfo信息选择一个队列
5. 开始进行发送
6. 先根据MQClientInstance 找到brokerAddr地址
7. 设置message的UNIQ_KEY
8. 构建 SendMessageRequestHeader
9. 如果是SYNC,开始使用 MQClientAPIImpl发送
10. 构建RemotingCommand
11. 使用 sendMessageSync 发送
12. 获取nettyRemotingClient