该文所涉及的 RocketMQ 源码版本为 4.9.3。

RocketMQ 生产者启动流程

入口:

org.apache.rocketmq.client.producer.DefaultMQProducer#start

  1. @Override
  2. public void start() throws MQClientException {
  3. this.setProducerGroup(withNamespace(this.producerGroup));
  4. this.defaultMQProducerImpl.start();
  5. if (null != traceDispatcher) {
  6. try {
  7. traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
  8. } catch (MQClientException e) {
  9. log.warn("trace dispatcher start failed ", e);
  10. }
  11. }
  12. }

第一步、检查 producerGroup

  1. private void checkConfig() throws MQClientException {
  2. Validators.checkGroup(this.defaultMQProducer.getProducerGroup());
  3. if (null == this.defaultMQProducer.getProducerGroup()) {
  4. throw new MQClientException("producerGroup is null", null);
  5. }
  6. if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) {
  7. throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.",null);
  8. }
  9. }

第二步、设置 instanceName

  1. public void changeInstanceNameToPID() {
  2. if (this.instanceName.equals("DEFAULT")) {
  3. this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();
  4. }
  5. }

第三步、创建 mqClientInstance,它是与 nameserver 和 broker 通信的中介

  1. public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
  2. String clientId = clientConfig.buildMQClientId();
  3. MQClientInstance instance = this.factoryTable.get(clientId);
  4. if (null == instance) {
  5. instance =
  6. new MQClientInstance(clientConfig.cloneClientConfig(),
  7. this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
  8. MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
  9. if (prev != null) {
  10. instance = prev;
  11. log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
  12. } else {
  13. log.info("Created new MQClientInstance for clientId:[{}]", clientId);
  14. }
  15. }
  16. return instance;
  17. }

第四步、将生产者加入 mqClientInstance 管理

  1. public synchronized boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
  2. if (null == group || null == producer) {
  3. return false;
  4. }
  5. MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
  6. if (prev != null) {
  7. log.warn("the producer group[{}] exist already.", group);
  8. return false;
  9. }
  10. return true;
  11. }

第五步、启动 MQClientInstance(有一些关于消费者的任务 会在消费者启动流程中讲解)

  1. 启动 netty 客户端 ,创建与 nameserver、broker 通信的 channel
  1. public void start() {
  2. this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
  3. nettyClientConfig.getClientWorkerThreads(),
  4. new ThreadFactory() {
  5. private AtomicInteger threadIndex = new AtomicInteger(0);
  6. @Override
  7. public Thread newThread(Runnable r) {
  8. return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
  9. }
  10. });
  11. Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
  12. .option(ChannelOption.TCP_NODELAY, true)
  13. .option(ChannelOption.SO_KEEPALIVE, false)
  14. .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
  15. .handler(new ChannelInitializer<SocketChannel>() {
  16. @Override
  17. public void initChannel(SocketChannel ch) throws Exception {
  18. ChannelPipeline pipeline = ch.pipeline();
  19. if (nettyClientConfig.isUseTLS()) {
  20. if (null != sslContext) {
  21. pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
  22. log.info("Prepend SSL handler");
  23. } else {
  24. log.warn("Connections are insecure as SSLContext is null!");
  25. }
  26. }
  27. pipeline.addLast(
  28. defaultEventExecutorGroup,
  29. new NettyEncoder(),
  30. new NettyDecoder(),
  31. new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
  32. new NettyConnectManageHandler(),
  33. new NettyClientHandler());
  34. }
  35. });
  36. if (nettyClientConfig.getClientSocketSndBufSize() > 0) {
  37. log.info("client set SO_SNDBUF to {}", nettyClientConfig.getClientSocketSndBufSize());
  38. handler.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize());
  39. }
  40. if (nettyClientConfig.getClientSocketRcvBufSize() > 0) {
  41. log.info("client set SO_RCVBUF to {}", nettyClientConfig.getClientSocketRcvBufSize());
  42. handler.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());
  43. }
  44. if (nettyClientConfig.getWriteBufferLowWaterMark() > 0 && nettyClientConfig.getWriteBufferHighWaterMark() > 0) {
  45. log.info("client set netty WRITE_BUFFER_WATER_MARK to {},{}",nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark());
  46. handler.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
  47. nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark()));
  48. }
  49. this.timer.scheduleAtFixedRate(new TimerTask() {
  50. @Override
  51. public void run() {
  52. try {
  53. NettyRemotingClient.this.scanResponseTable();
  54. } catch (Throwable e) {
  55. log.error("scanResponseTable exception", e);
  56. }
  57. }
  58. }, 1000 * 3, 1000);
  59. if (this.channelEventListener != null) {
  60. this.nettyEventExecutor.start();
  61. }
  62. }
  1. 启动一些周期性的任务:

更新 nameserver 地址的任务:

  1. if (null == this.clientConfig.getNamesrvAddr()) {
  2. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  3. @Override
  4. public void run() {
  5. try {
  6. MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
  7. } catch (Exception e) {
  8. log.error("ScheduledTask fetchNameServerAddr exception", e);
  9. }
  10. }
  11. }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
  12. }

更新 topic 路由信息的任务:

  1. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  2. @Override
  3. public void run() {
  4. try {
  5. MQClientInstance.this.updateTopicRouteInfoFromNameServer();
  6. } catch (Exception e) {
  7. log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
  8. }
  9. }
  10. }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

更新 broker 的任务:

  1. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  2. @Override
  3. public void run() {
  4. try {
  5. MQClientInstance.this.cleanOfflineBroker();
  6. MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
  7. } catch (Exception e) {
  8. log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
  9. }
  10. }
  11. }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

启动拉取消息线程:

this.pullMessageService.start();

  1. public void run() {
  2. log.info(this.getServiceName() + " service started");
  3. while (!this.isStopped()) {
  4. try {
  5. PullRequest pullRequest = this.pullRequestQueue.take();
  6. this.pullMessage(pullRequest);
  7. } catch (InterruptedException ignored) {
  8. } catch (Exception e) {
  9. log.error("Pull Message Service Run Method exception", e);
  10. }
  11. }
  12. log.info(this.getServiceName() + " service end");
  13. }