批量发送指的是将同一个Topic的多条消息一起打包发送给Broker,减少网络的开销,提高网络传输效率。<br />批量发送跟单次发送消息的行为大致相同,所不同的是批量发送需要批量检查消息,并把批量消息内容编码转成单次发送的格式。因为单次发送的编码格式已经固定,所以只需要在body字节数组后面做拼接即可。
批量发送
1):批量发送执行代码
首先依旧是DefaultMQProducer#send,输入内容是一个消息集合体
public SendResult send(Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.defaultMQProducerImpl.send(batch(msgs));}
DefaultMQProducer#batch
try {//消息集合转成批量消息子对象msgBatch = MessageBatch.generateFromList(msgs);//检查消息,设置唯一ID,设置Topicfor (Message message : msgBatch) {Validators.checkMessage(message, this);MessageClientIDSetter.setUniqID(message);message.setTopic(withNamespace(message.getTopic()));}//转码设置bodymsgBatch.setBody(msgBatch.encode());} catch (Exception e) {throw new MQClientException("Failed to initiate the MessageBatch", e);}
MessageDecoder#encodeMessages
List<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());int allSize = 0;for (Message message : messages) {//单个message编码byte[] tmp = encodeMessage(message);encodedMessages.add(tmp);allSize += tmp.length;}byte[] allBytes = new byte[allSize];int pos = 0;for (byte[] bytes : encodedMessages) {//字节数组对批量消息进行拼接System.arraycopy(bytes, 0, allBytes, pos, bytes.length);pos += bytes.length;}return allBytes;
MessageDecoder#encodeMessage
//消息体byte[] body = message.getBody();//消息体长度int bodyLen = body.length;//扩展属性String properties = messageProperties2String(message.getProperties());//扩展属性转码byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);//note properties length must not more than Short.MAX//扩展属性长度short propertiesLength = (short) propertiesBytes.length;//系统标识int sysFlag = message.getFlag();//存储长度=4位总长度+4位魔术+4位消息体crc+4位flag+4位消息长度+2位扩展长度=22位int storeSize = 4 // 1 TOTALSIZE+ 4 // 2 MAGICCOD+ 4 // 3 BODYCRC+ 4 // 4 FLAG+ 4 + bodyLen // 4 BODY+ 2 + propertiesLength;//根据存储长度创建字节缓存ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize);// 1 TOTALSIZEbyteBuffer.putInt(storeSize);// 2 MAGICCODEbyteBuffer.putInt(0);// 3 BODYCRCbyteBuffer.putInt(0);// 4 FLAGint flag = message.getFlag();byteBuffer.putInt(flag);// 5 BODYbyteBuffer.putInt(bodyLen);byteBuffer.put(body);// 6 propertiesbyteBuffer.putShort(propertiesLength);byteBuffer.put(propertiesBytes);return byteBuffer.array();
2):批量发送逻辑总结
- 将消息集合体单个校验
- 消息集合体单个进行编码然后拼接
body=总长+魔术+消息体crc+flag+body长度+消息体(N个字节)+扩展属性长度+扩展属性(N个长度)
