架构

image.png

网络编程要素

io 模型 + 协议 + 线程模型
1) 传输:用什么样的通道将数据发送给对方,BIO、NIO或者AIO,IO模型在很大程度上决定了框架的性能。

2) 协议:采用什么样的通信协议,HTTP或者内部私有协议。协议的选择不同,性能模型也不同。相比于公有协议,内部私有协议的性能通常可以被设计的更优。

3) 线程:数据报如何读取?读取之后的编解码在哪个线程进行,编解码后的消息如何派发,Reactor线程模型的不同,对性能的影响也非常大。

网络部分

mq网络部分.png

通讯部分类图

mq 通讯部分类图.png

线程模型

mq-thread.png

rocketMq 的 reactor 模型

rocketmq 使用了reactor 主从模型,但是在 compute 部分,由于业务的操作比较耗时,直接使用 io 线程会影响吞吐量。所以使用了单独的线程池;

image.png

Rocket 通讯协议

消息类

**

  1. // rocketMq 消息定义
  2. public class RemotingCommand {
  3. // 最终构成 message header部分
  4. private int code; // 标识请求类型
  5. private LanguageCode language = LanguageCode.JAVA;
  6. private int version = 0;
  7. private int opaque = requestId.getAndIncrement(); // 请求id
  8. private int flag = 0;
  9. private String remark;
  10. private HashMap<String, String> extFields; // 扩展字段
  11. // 最终构成 message body部分
  12. private transient byte[] body;
  13. }

字段说明

Header字段 类型 Request说明 Response说明
code int 请求操作码,应答方根据不同的请求码进行不同的业务处理 应答响应码。0表示成功,非0则表示各种错误
language LanguageCode 请求方实现的语言 应答方实现的语言
version int 请求方程序的版本 应答方程序的版本
opaque int 相当于reqeustId,在同一个连接上的不同请求标识码,与响应消息中的相对应 应答不做修改直接返回
flag int 区分是普通RPC还是onewayRPC得标志 区分是普通RPC还是onewayRPC得标志
remark String 传输自定义文本信息 传输自定义文本信息
extFields HashMap 请求自定义扩展信息 响应自定义扩展信息

extFields 说明


每个业务自定义的 header 数据会通过实现 CommandCustomHeader 接口自定义对象实现;但是在网络中传输时,会通过 extFields 进行传输;

在网络传输前,自定义 header 转化 为 extFields 中hashMap 结构方法:

public void makeCustomHeaderToNet() {
        if (this.customHeader != null) {
            // 反射拿到CommandCustomHeader 所有字段
            Field[] fields = getClazzFields(customHeader.getClass());
            if (null == this.extFields) {
                this.extFields = new HashMap<String, String>();
            }

            for (Field field : fields) {
                if (!Modifier.isStatic(field.getModifiers())) {
                    String name = field.getName();
                    if (!name.startsWith("this")) {
                        Object value = null;
                        try {
                            field.setAccessible(true);
                            value = field.get(this.customHeader);
                        } catch (Exception e) {
                            log.error("Failed to access field [{}]", name, e);
                        }

                        if (value != null) {
                            // 放到 hashMap
                            this.extFields.put(name, value.toString());
                        }
                    }
                }
            }
        }
    }

在接收到网络传输的字节流后,NettyDecoder 会把字节流解码为 RemotingCommand 对象。在业务处理时,会通过反射创建 CommandCustomHeader对象, 并把 extFields 中对象的字段设置到 CommandCustomHeader对象。方法如下:

public CommandCustomHeader decodeCommandCustomHeader(
        Class<? extends CommandCustomHeader> classHeader)
        throws RemotingCommandException {

        CommandCustomHeader objectHeader;
        try {
            objectHeader = classHeader.newInstance();
        } catch (InstantiationException e) {
            return null;
        } catch (IllegalAccessException e) {
            return null;
        }

        if (this.extFields != null) {

            Field[] fields = getClazzFields(classHeader);
            for (Field field : fields) {
                if (!Modifier.isStatic(field.getModifiers())) {
                    String fieldName = field.getName();
                    if (!fieldName.startsWith("this")) {
                        try {
                            String value = this.extFields.get(fieldName);
                            if (null == value) {
                                if (!isFieldNullable(field)) {
                                    throw new RemotingCommandException(
                                        "the custom field <" + fieldName + "> is null");
                                }
                                continue;
                            }

                            field.setAccessible(true);
                            String type = getCanonicalName(field.getType());
                            Object valueParsed;

                            if (type.equals(STRING_CANONICAL_NAME)) {

                                valueParsed = value;
                            } else if (type.equals(INTEGER_CANONICAL_NAME_1) 
                                       || type.equals(INTEGER_CANONICAL_NAME_2)) {

                                valueParsed = Integer.parseInt(value);
                            } else if (type.equals(LONG_CANONICAL_NAME_1) 
                                       || type.equals(LONG_CANONICAL_NAME_2)) {

                                valueParsed = Long.parseLong(value);
                            } else if (type.equals(BOOLEAN_CANONICAL_NAME_1)
                                       || type.equals(BOOLEAN_CANONICAL_NAME_2)) {

                                valueParsed = Boolean.parseBoolean(value);
                            } else if (type.equals(DOUBLE_CANONICAL_NAME_1)
                                       || type.equals(DOUBLE_CANONICAL_NAME_2)) {

                                valueParsed = Double.parseDouble(value);
                            } else {
                                throw new RemotingCommandException(
                                    "the custom field <" + fieldName + "> type is not supported");
                            }

                            field.set(objectHeader, valueParsed);

                        } catch (Throwable e) {
                            log.error("Failed field [{}] decoding", fieldName, e);
                        }
                    }
                }
            }

            objectHeader.checkFields();
        }

        return objectHeader;
    }

报文

case: broker向nameserver注册自己

[
    code=103,//这里的103对应的code就是broker向nameserver注册自己的消息
    language=JAVA,
    version=137,
    opaque=58,//这个就是requestId
    flag(B)=0,
    remark=null,
    extFields={
        brokerId=0,
        clusterName=DefaultCluster,
        brokerAddr=ip1: 10911,
        haServerAddr=ip1: 10912,
        brokerName=LAPTOP-SMF2CKDN
        }
]

网络传输包

mq-protocol.png

编解码源码


/** 编码部分 java bean --> 字节流 **/

public ByteBuffer encode() {
    // 1> header length size   header 长度
    int length = 4;

    // 2> header data length    header编码,计算长度
    byte[] headerData = this.headerEncode();
    length += headerData.length;

    // 3> body data length     消息体长度
    if (this.body != null) {
        length += body.length;
    }

    // 分配buffer 写数据   4是消息总长度,上面未计算进去
    ByteBuffer result = ByteBuffer.allocate(4 + length);

    // length  写总长度  
    result.putInt(length);

    // header length   1byte 序列化方式 3byte header长度
    result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));

    // header data    写 header data
    result.put(headerData);

    // body data;
    if (this.body != null) {
        result.put(this.body); // 写data 部分
    }

    // 重置 position 到开始写入的位置
    result.flip();

    return result;
}

// header 部分编码 
private byte[] headerEncode() {
    this.makeCustomHeaderToNet();
    if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
        return RocketMQSerializable.rocketMQProtocolEncode(this);
    } else {
        return RemotingSerializable.encode(this);
    }
}

// header 部分编码 
public static byte[] rocketMQProtocolEncode(RemotingCommand cmd) {
    // String remark
    byte[] remarkBytes = null;
    int remarkLen = 0;
    if (cmd.getRemark() != null && cmd.getRemark().length() > 0) {
        remarkBytes = cmd.getRemark().getBytes(CHARSET_UTF8);
        remarkLen = remarkBytes.length;
    }

    // HashMap<String, String> extFields
    byte[] extFieldsBytes = null;
    int extLen = 0;
    if (cmd.getExtFields() != null && !cmd.getExtFields().isEmpty()) {
        extFieldsBytes = mapSerialize(cmd.getExtFields());
        extLen = extFieldsBytes.length;
    }

    int totalLen = calTotalLen(remarkLen, extLen);

    ByteBuffer headerBuffer = ByteBuffer.allocate(totalLen);
    // int code(~32767)
    headerBuffer.putShort((short) cmd.getCode());
    // LanguageCode language
    headerBuffer.put(cmd.getLanguage().getCode());
    // int version(~32767)
    headerBuffer.putShort((short) cmd.getVersion());
    // int opaque
    headerBuffer.putInt(cmd.getOpaque());
    // int flag
    headerBuffer.putInt(cmd.getFlag());
    // String remark
    if (remarkBytes != null) {
        headerBuffer.putInt(remarkBytes.length);
        headerBuffer.put(remarkBytes);
    } else {
        headerBuffer.putInt(0);
    }
    // HashMap<String, String> extFields;
    if (extFieldsBytes != null) {
        headerBuffer.putInt(extFieldsBytes.length);
        headerBuffer.put(extFieldsBytes);
    } else {
        headerBuffer.putInt(0);
    }

    return headerBuffer.array();
}


/** 解码 字节流 --> java bean **/
 public static RemotingCommand decode(final ByteBuffer byteBuffer) {
     int length = byteBuffer.limit();
     int oriHeaderLen = byteBuffer.getInt();
     int headerLength = getHeaderLength(oriHeaderLen);

     byte[] headerData = new byte[headerLength];
     byteBuffer.get(headerData);

     RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));

     int bodyLength = length - 4 - headerLength;
     byte[] bodyData = null;
     if (bodyLength > 0) {
         bodyData = new byte[bodyLength];
         byteBuffer.get(bodyData);
     }
     cmd.body = bodyData;

     return cmd;
 }

public static RemotingCommand rocketMQProtocolDecode(final byte[] headerArray) {
    RemotingCommand cmd = new RemotingCommand();
    ByteBuffer headerBuffer = ByteBuffer.wrap(headerArray);
    // int code(~32767)
    cmd.setCode(headerBuffer.getShort());
    // LanguageCode language
    cmd.setLanguage(LanguageCode.valueOf(headerBuffer.get()));
    // int version(~32767)
    cmd.setVersion(headerBuffer.getShort());
    // int opaque
    cmd.setOpaque(headerBuffer.getInt());
    // int flag
    cmd.setFlag(headerBuffer.getInt());
    // String remark
    int remarkLength = headerBuffer.getInt();
    if (remarkLength > 0) {
        byte[] remarkContent = new byte[remarkLength];
        headerBuffer.get(remarkContent);
        cmd.setRemark(new String(remarkContent, CHARSET_UTF8));
    }

    // HashMap<String, String> extFields
    int extFieldsLength = headerBuffer.getInt();
    if (extFieldsLength > 0) {
        byte[] extFieldsBytes = new byte[extFieldsLength];
        headerBuffer.get(extFieldsBytes);
        cmd.setExtFields(mapDeserialize(extFieldsBytes));
    }
    return cmd;
}

参考: https://jaskey.github.io/blog/2016/12/19/rocketmq-network-protocol/
https://cloud.tencent.com/developer/article/1329047
https://github.com/apache/rocketmq/blob/master/docs/cn/design.md