架构
网络编程要素
io 模型 + 协议 + 线程模型
1) 传输:用什么样的通道将数据发送给对方,BIO、NIO或者AIO,IO模型在很大程度上决定了框架的性能。
2) 协议:采用什么样的通信协议,HTTP或者内部私有协议。协议的选择不同,性能模型也不同。相比于公有协议,内部私有协议的性能通常可以被设计的更优。
3) 线程:数据报如何读取?读取之后的编解码在哪个线程进行,编解码后的消息如何派发,Reactor线程模型的不同,对性能的影响也非常大。
网络部分
通讯部分类图
线程模型
rocketMq 的 reactor 模型
rocketmq 使用了reactor 主从模型,但是在 compute 部分,由于业务的操作比较耗时,直接使用 io 线程会影响吞吐量。所以使用了单独的线程池;
Rocket 通讯协议
消息类
**
// rocketMq 消息定义
public class RemotingCommand {
// 最终构成 message header部分
private int code; // 标识请求类型
private LanguageCode language = LanguageCode.JAVA;
private int version = 0;
private int opaque = requestId.getAndIncrement(); // 请求id
private int flag = 0;
private String remark;
private HashMap<String, String> extFields; // 扩展字段
// 最终构成 message body部分
private transient byte[] body;
}
字段说明
Header字段 | 类型 | Request说明 | Response说明 |
---|---|---|---|
code | int | 请求操作码,应答方根据不同的请求码进行不同的业务处理 | 应答响应码。0表示成功,非0则表示各种错误 |
language | LanguageCode | 请求方实现的语言 | 应答方实现的语言 |
version | int | 请求方程序的版本 | 应答方程序的版本 |
opaque | int | 相当于reqeustId,在同一个连接上的不同请求标识码,与响应消息中的相对应 | 应答不做修改直接返回 |
flag | int | 区分是普通RPC还是onewayRPC得标志 | 区分是普通RPC还是onewayRPC得标志 |
remark | String | 传输自定义文本信息 | 传输自定义文本信息 |
extFields | HashMap |
请求自定义扩展信息 | 响应自定义扩展信息 |
extFields 说明
每个业务自定义的 header 数据会通过实现 CommandCustomHeader 接口自定义对象实现;但是在网络中传输时,会通过 extFields 进行传输;
在网络传输前,自定义 header 转化 为 extFields 中hashMap 结构方法:
public void makeCustomHeaderToNet() {
if (this.customHeader != null) {
// 反射拿到CommandCustomHeader 所有字段
Field[] fields = getClazzFields(customHeader.getClass());
if (null == this.extFields) {
this.extFields = new HashMap<String, String>();
}
for (Field field : fields) {
if (!Modifier.isStatic(field.getModifiers())) {
String name = field.getName();
if (!name.startsWith("this")) {
Object value = null;
try {
field.setAccessible(true);
value = field.get(this.customHeader);
} catch (Exception e) {
log.error("Failed to access field [{}]", name, e);
}
if (value != null) {
// 放到 hashMap
this.extFields.put(name, value.toString());
}
}
}
}
}
}
在接收到网络传输的字节流后,NettyDecoder 会把字节流解码为 RemotingCommand 对象。在业务处理时,会通过反射创建 CommandCustomHeader对象, 并把 extFields 中对象的字段设置到 CommandCustomHeader对象。方法如下:
public CommandCustomHeader decodeCommandCustomHeader(
Class<? extends CommandCustomHeader> classHeader)
throws RemotingCommandException {
CommandCustomHeader objectHeader;
try {
objectHeader = classHeader.newInstance();
} catch (InstantiationException e) {
return null;
} catch (IllegalAccessException e) {
return null;
}
if (this.extFields != null) {
Field[] fields = getClazzFields(classHeader);
for (Field field : fields) {
if (!Modifier.isStatic(field.getModifiers())) {
String fieldName = field.getName();
if (!fieldName.startsWith("this")) {
try {
String value = this.extFields.get(fieldName);
if (null == value) {
if (!isFieldNullable(field)) {
throw new RemotingCommandException(
"the custom field <" + fieldName + "> is null");
}
continue;
}
field.setAccessible(true);
String type = getCanonicalName(field.getType());
Object valueParsed;
if (type.equals(STRING_CANONICAL_NAME)) {
valueParsed = value;
} else if (type.equals(INTEGER_CANONICAL_NAME_1)
|| type.equals(INTEGER_CANONICAL_NAME_2)) {
valueParsed = Integer.parseInt(value);
} else if (type.equals(LONG_CANONICAL_NAME_1)
|| type.equals(LONG_CANONICAL_NAME_2)) {
valueParsed = Long.parseLong(value);
} else if (type.equals(BOOLEAN_CANONICAL_NAME_1)
|| type.equals(BOOLEAN_CANONICAL_NAME_2)) {
valueParsed = Boolean.parseBoolean(value);
} else if (type.equals(DOUBLE_CANONICAL_NAME_1)
|| type.equals(DOUBLE_CANONICAL_NAME_2)) {
valueParsed = Double.parseDouble(value);
} else {
throw new RemotingCommandException(
"the custom field <" + fieldName + "> type is not supported");
}
field.set(objectHeader, valueParsed);
} catch (Throwable e) {
log.error("Failed field [{}] decoding", fieldName, e);
}
}
}
}
objectHeader.checkFields();
}
return objectHeader;
}
报文
case: broker向nameserver注册自己
[
code=103,//这里的103对应的code就是broker向nameserver注册自己的消息
language=JAVA,
version=137,
opaque=58,//这个就是requestId
flag(B)=0,
remark=null,
extFields={
brokerId=0,
clusterName=DefaultCluster,
brokerAddr=ip1: 10911,
haServerAddr=ip1: 10912,
brokerName=LAPTOP-SMF2CKDN
}
]
网络传输包
编解码源码
/** 编码部分 java bean --> 字节流 **/
public ByteBuffer encode() {
// 1> header length size header 长度
int length = 4;
// 2> header data length header编码,计算长度
byte[] headerData = this.headerEncode();
length += headerData.length;
// 3> body data length 消息体长度
if (this.body != null) {
length += body.length;
}
// 分配buffer 写数据 4是消息总长度,上面未计算进去
ByteBuffer result = ByteBuffer.allocate(4 + length);
// length 写总长度
result.putInt(length);
// header length 1byte 序列化方式 3byte header长度
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// header data 写 header data
result.put(headerData);
// body data;
if (this.body != null) {
result.put(this.body); // 写data 部分
}
// 重置 position 到开始写入的位置
result.flip();
return result;
}
// header 部分编码
private byte[] headerEncode() {
this.makeCustomHeaderToNet();
if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
return RocketMQSerializable.rocketMQProtocolEncode(this);
} else {
return RemotingSerializable.encode(this);
}
}
// header 部分编码
public static byte[] rocketMQProtocolEncode(RemotingCommand cmd) {
// String remark
byte[] remarkBytes = null;
int remarkLen = 0;
if (cmd.getRemark() != null && cmd.getRemark().length() > 0) {
remarkBytes = cmd.getRemark().getBytes(CHARSET_UTF8);
remarkLen = remarkBytes.length;
}
// HashMap<String, String> extFields
byte[] extFieldsBytes = null;
int extLen = 0;
if (cmd.getExtFields() != null && !cmd.getExtFields().isEmpty()) {
extFieldsBytes = mapSerialize(cmd.getExtFields());
extLen = extFieldsBytes.length;
}
int totalLen = calTotalLen(remarkLen, extLen);
ByteBuffer headerBuffer = ByteBuffer.allocate(totalLen);
// int code(~32767)
headerBuffer.putShort((short) cmd.getCode());
// LanguageCode language
headerBuffer.put(cmd.getLanguage().getCode());
// int version(~32767)
headerBuffer.putShort((short) cmd.getVersion());
// int opaque
headerBuffer.putInt(cmd.getOpaque());
// int flag
headerBuffer.putInt(cmd.getFlag());
// String remark
if (remarkBytes != null) {
headerBuffer.putInt(remarkBytes.length);
headerBuffer.put(remarkBytes);
} else {
headerBuffer.putInt(0);
}
// HashMap<String, String> extFields;
if (extFieldsBytes != null) {
headerBuffer.putInt(extFieldsBytes.length);
headerBuffer.put(extFieldsBytes);
} else {
headerBuffer.putInt(0);
}
return headerBuffer.array();
}
/** 解码 字节流 --> java bean **/
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
int length = byteBuffer.limit();
int oriHeaderLen = byteBuffer.getInt();
int headerLength = getHeaderLength(oriHeaderLen);
byte[] headerData = new byte[headerLength];
byteBuffer.get(headerData);
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0) {
bodyData = new byte[bodyLength];
byteBuffer.get(bodyData);
}
cmd.body = bodyData;
return cmd;
}
public static RemotingCommand rocketMQProtocolDecode(final byte[] headerArray) {
RemotingCommand cmd = new RemotingCommand();
ByteBuffer headerBuffer = ByteBuffer.wrap(headerArray);
// int code(~32767)
cmd.setCode(headerBuffer.getShort());
// LanguageCode language
cmd.setLanguage(LanguageCode.valueOf(headerBuffer.get()));
// int version(~32767)
cmd.setVersion(headerBuffer.getShort());
// int opaque
cmd.setOpaque(headerBuffer.getInt());
// int flag
cmd.setFlag(headerBuffer.getInt());
// String remark
int remarkLength = headerBuffer.getInt();
if (remarkLength > 0) {
byte[] remarkContent = new byte[remarkLength];
headerBuffer.get(remarkContent);
cmd.setRemark(new String(remarkContent, CHARSET_UTF8));
}
// HashMap<String, String> extFields
int extFieldsLength = headerBuffer.getInt();
if (extFieldsLength > 0) {
byte[] extFieldsBytes = new byte[extFieldsLength];
headerBuffer.get(extFieldsBytes);
cmd.setExtFields(mapDeserialize(extFieldsBytes));
}
return cmd;
}
参考: https://jaskey.github.io/blog/2016/12/19/rocketmq-network-protocol/
https://cloud.tencent.com/developer/article/1329047
https://github.com/apache/rocketmq/blob/master/docs/cn/design.md