前言
介绍consumer的消费者的启动
DefaultMQPushConsumer new
- consumerGroup 设置
2. namespace 设置
3. allocateMessageQueueStrategy 设置
4. defaultMQPushConsumerImpl new出来
设置namesrvAddr
subcribe订阅
1. defaultMQPushConsumerImpl 进行订阅
2. 构建 SubscriptionData (按照 || 切割tag)
3. rebalanceImpl 的 subscriptionInner(map结构) 放入topic设置信息
注册listener
1. defaultMQPushConsumerImpl 注册 messageListener
DefaultMQPushConsumer 启动
- 设置consumerGroup
2. defaultMQPushConsumerImpl 启动
defaultMQPushConsumerImpl 启动
- 检查config参数等等;
2. copySubscription 构建%RETRY%_topic 的队列,放入rebalanceImpl 的 subscriptionInner 当中
3. 集群模式,默认的instanceName 替换为 pid
4. 创建MQClientInstance 接口, 注:clientId 构造方法一样
5. rebalanceImpl 设置集群,ConsumerGroup MQClientInstance
6. 构建PullAPIWrapper
7. 构建RemoteBrokerOffsetStore
8. 构建Listener的Service,并启动
9. MQClientInstance 创建消费组,启动 MQClientInstance
10. 之后:
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately();
rebalanceService.wakeup 进行wakeup操作
开始重平衡下