前言

rocketMQ的instanceName 可以在生产端设置,也可以在消费端设置。平时不用设置,但是当一个JVM中,含有多个生产者消费者时,必须设置下。

原因

如果一个JVM有多个生产者和消费者时,那么他们会共用MQClientInstance ,这样一些异步线程,定时拉取topic,心跳,nettyclient等线程都是一份,可以节省资源。

共用的实例MQClientInstance是从MQClientManager 获取的,而它这个类是个单例。它将所有的类缓存到了factoryTable 这个map当中。

  1. public class MQClientManager {
  2. private static MQClientManager instance = new MQClientManager();
  3. private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =
  4. new ConcurrentHashMap<String, MQClientInstance>();
  5. private MQClientManager() {
  6. }
  7. }

他们的唯一区分就是通过clientId来区分的,clientId是构造出来,如下:

  1. public String buildMQClientId() {
  2. StringBuilder sb = new StringBuilder();
  3. sb.append(this.getClientIP());
  4. sb.append("@");
  5. sb.append(this.getInstanceName());
  6. if (!UtilAll.isBlank(this.unitName)) {
  7. sb.append("@");
  8. sb.append(this.unitName);
  9. }
  10. return sb.toString();
  11. }

如果是集群模式,这里的instanceName会进行替换为当前线程pid,如下:

  1. public void changeInstanceNameToPID() {
  2. if (this.instanceName.equals("DEFAULT")) {
  3. this.instanceName = String.valueOf(UtilAll.getPid());
  4. }
  5. }

所以,最后的clientId就是:

  1. ip@instanceName@unitName
  2. 即:
  3. ip@pid@unitName

当消息发送时,拿到MQClientInstance 获取brokerAddr地址,然后进行发送。
多个生产者消费者,如果instanceName一致。那么设置了namesrvAddr没用,最后拿到的还是相同的MQClientInstance,发送的还是instance对应的broker。

解决办法

  1. 设定不同的instanceName
  2. 设定不同的unitName

疑问

问题见:容器化后 部署RocketMQ consumer instanceName重复?
这里有待考虑,容器化后,如果是host模式下,那么多个项目在容器内ip是一样的,而pid也会有可能一样,最后造成instanceName的一样。但是还有疑问?即使一样了,那也是在不同的JVM当中,应该不会影响到发送?这里还需要了解下,重复对发送造成的影响。

参考