批量发送指的是将同一个Topic的多条消息一起打包发送给Broker,减少网络的开销,提高网络传输效率。<br />批量发送跟单次发送消息的行为大致相同,所不同的是批量发送需要批量检查消息,并把批量消息内容编码转成单次发送的格式。因为单次发送的编码格式已经固定,所以只需要在body字节数组后面做拼接即可。
批量发送
1):批量发送执行代码
首先依旧是DefaultMQProducer#send,输入内容是一个消息集合体
public SendResult send(
Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(batch(msgs));
}
DefaultMQProducer#batch
try {
//消息集合转成批量消息子对象
msgBatch = MessageBatch.generateFromList(msgs);
//检查消息,设置唯一ID,设置Topic
for (Message message : msgBatch) {
Validators.checkMessage(message, this);
MessageClientIDSetter.setUniqID(message);
message.setTopic(withNamespace(message.getTopic()));
}
//转码设置body
msgBatch.setBody(msgBatch.encode());
} catch (Exception e) {
throw new MQClientException("Failed to initiate the MessageBatch", e);
}
MessageDecoder#encodeMessages
List<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());
int allSize = 0;
for (Message message : messages) {
//单个message编码
byte[] tmp = encodeMessage(message);
encodedMessages.add(tmp);
allSize += tmp.length;
}
byte[] allBytes = new byte[allSize];
int pos = 0;
for (byte[] bytes : encodedMessages) {
//字节数组对批量消息进行拼接
System.arraycopy(bytes, 0, allBytes, pos, bytes.length);
pos += bytes.length;
}
return allBytes;
MessageDecoder#encodeMessage
//消息体
byte[] body = message.getBody();
//消息体长度
int bodyLen = body.length;
//扩展属性
String properties = messageProperties2String(message.getProperties());
//扩展属性转码
byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);
//note properties length must not more than Short.MAX
//扩展属性长度
short propertiesLength = (short) propertiesBytes.length;
//系统标识
int sysFlag = message.getFlag();
//存储长度=4位总长度+4位魔术+4位消息体crc+4位flag+4位消息长度+2位扩展长度=22位
int storeSize = 4 // 1 TOTALSIZE
+ 4 // 2 MAGICCOD
+ 4 // 3 BODYCRC
+ 4 // 4 FLAG
+ 4 + bodyLen // 4 BODY
+ 2 + propertiesLength;
//根据存储长度创建字节缓存
ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize);
// 1 TOTALSIZE
byteBuffer.putInt(storeSize);
// 2 MAGICCODE
byteBuffer.putInt(0);
// 3 BODYCRC
byteBuffer.putInt(0);
// 4 FLAG
int flag = message.getFlag();
byteBuffer.putInt(flag);
// 5 BODY
byteBuffer.putInt(bodyLen);
byteBuffer.put(body);
// 6 properties
byteBuffer.putShort(propertiesLength);
byteBuffer.put(propertiesBytes);
return byteBuffer.array();
2):批量发送逻辑总结
- 将消息集合体单个校验
- 消息集合体单个进行编码然后拼接
body=总长+魔术+消息体crc+flag+body长度+消息体(N个字节)+扩展属性长度+扩展属性(N个长度)