前言

介绍consumer的消费者的启动

DefaultMQPushConsumer new

  1. consumerGroup 设置
    2. namespace 设置
    3. allocateMessageQueueStrategy 设置
    4. defaultMQPushConsumerImpl new出来

设置namesrvAddr

subcribe订阅
1. defaultMQPushConsumerImpl 进行订阅
2. 构建 SubscriptionData (按照 || 切割tag)
3. rebalanceImpl 的 subscriptionInner(map结构) 放入topic设置信息

注册listener
1. defaultMQPushConsumerImpl 注册 messageListener

DefaultMQPushConsumer 启动

  1. 设置consumerGroup
    2. defaultMQPushConsumerImpl 启动

defaultMQPushConsumerImpl 启动

  1. 检查config参数等等;
    2. copySubscription 构建%RETRY%_topic 的队列,放入rebalanceImpl 的 subscriptionInner 当中
    3. 集群模式,默认的instanceName 替换为 pid
    4. 创建MQClientInstance 接口, 注:clientId 构造方法一样
    5. rebalanceImpl 设置集群,ConsumerGroup MQClientInstance
    6. 构建PullAPIWrapper
    7. 构建RemoteBrokerOffsetStore
    8. 构建Listener的Service,并启动
    9. MQClientInstance 创建消费组,启动 MQClientInstance
    10. 之后:
    this.updateTopicSubscribeInfoWhenSubscriptionChanged();
    this.mQClientFactory.checkClientInBroker();
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    this.mQClientFactory.rebalanceImmediately();

rebalanceService.wakeup 进行wakeup操作

开始重平衡下