1. 批量发送指的是将同一个Topic的多条消息一起打包发送给Broker,减少网络的开销,提高网络传输效率。<br />批量发送跟单次发送消息的行为大致相同,所不同的是批量发送需要批量检查消息,并把批量消息内容编码转成单次发送的格式。因为单次发送的编码格式已经固定,所以只需要在body字节数组后面做拼接即可。

批量发送

1):批量发送执行代码

首先依旧是DefaultMQProducer#send,输入内容是一个消息集合体

  1. public SendResult send(
  2. Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  3. return this.defaultMQProducerImpl.send(batch(msgs));
  4. }

DefaultMQProducer#batch

  1. try {
  2. //消息集合转成批量消息子对象
  3. msgBatch = MessageBatch.generateFromList(msgs);
  4. //检查消息,设置唯一ID,设置Topic
  5. for (Message message : msgBatch) {
  6. Validators.checkMessage(message, this);
  7. MessageClientIDSetter.setUniqID(message);
  8. message.setTopic(withNamespace(message.getTopic()));
  9. }
  10. //转码设置body
  11. msgBatch.setBody(msgBatch.encode());
  12. } catch (Exception e) {
  13. throw new MQClientException("Failed to initiate the MessageBatch", e);
  14. }

MessageDecoder#encodeMessages

  1. List<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());
  2. int allSize = 0;
  3. for (Message message : messages) {
  4. //单个message编码
  5. byte[] tmp = encodeMessage(message);
  6. encodedMessages.add(tmp);
  7. allSize += tmp.length;
  8. }
  9. byte[] allBytes = new byte[allSize];
  10. int pos = 0;
  11. for (byte[] bytes : encodedMessages) {
  12. //字节数组对批量消息进行拼接
  13. System.arraycopy(bytes, 0, allBytes, pos, bytes.length);
  14. pos += bytes.length;
  15. }
  16. return allBytes;

MessageDecoder#encodeMessage

  1. //消息体
  2. byte[] body = message.getBody();
  3. //消息体长度
  4. int bodyLen = body.length;
  5. //扩展属性
  6. String properties = messageProperties2String(message.getProperties());
  7. //扩展属性转码
  8. byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);
  9. //note properties length must not more than Short.MAX
  10. //扩展属性长度
  11. short propertiesLength = (short) propertiesBytes.length;
  12. //系统标识
  13. int sysFlag = message.getFlag();
  14. //存储长度=4位总长度+4位魔术+4位消息体crc+4位flag+4位消息长度+2位扩展长度=22位
  15. int storeSize = 4 // 1 TOTALSIZE
  16. + 4 // 2 MAGICCOD
  17. + 4 // 3 BODYCRC
  18. + 4 // 4 FLAG
  19. + 4 + bodyLen // 4 BODY
  20. + 2 + propertiesLength;
  21. //根据存储长度创建字节缓存
  22. ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize);
  23. // 1 TOTALSIZE
  24. byteBuffer.putInt(storeSize);
  25. // 2 MAGICCODE
  26. byteBuffer.putInt(0);
  27. // 3 BODYCRC
  28. byteBuffer.putInt(0);
  29. // 4 FLAG
  30. int flag = message.getFlag();
  31. byteBuffer.putInt(flag);
  32. // 5 BODY
  33. byteBuffer.putInt(bodyLen);
  34. byteBuffer.put(body);
  35. // 6 properties
  36. byteBuffer.putShort(propertiesLength);
  37. byteBuffer.put(propertiesBytes);
  38. return byteBuffer.array();

2):批量发送逻辑总结

  1. 将消息集合体单个校验
  2. 消息集合体单个进行编码然后拼接

body=总长+魔术+消息体crc+flag+body长度+消息体(N个字节)+扩展属性长度+扩展属性(N个长度)