序言

rocketmq 中有 producer、consumer、nameserver、broker 4 种角色,必须通过网络进行通信,而 remoting 模块便是实现远程通信的基础模块,本文介绍 remoting 模块的实现原理、协议、架构和接口。

一、实现原理

说起远程通信便不得不说 netty,但是 netty 过于底层,只是一个框架,而非一个产品,remoting 模块便是基于 netty 实现的。
image.png

netty 的 nio server总共有三类线程。第一类是boss 线程,其用于处理 accept 事件,并将其分配给 worker 线程;第二类是 worker 线程,其用于处理 read 和 write 事件;第三类是 handler 线程,其用于处理特定的 handler (handler 线程是可选的,不设置时使用 worker 线程处理 handler)。

二、架构

url 类图如下,我们分别介绍下各个类的相关方法:
image.png

RemotingService

方法名 概述
start 启动
shutdown 关闭
registerRPCHook 注册hook,会在被执行

RemotingServer

方法名 概述
registerProcessor 注册处理器
registerDefaultProcessor 注册默认处理器
localListenPort 返回本地监听的端口号
getProcessorPair 获取一个请求类型的处理器和线程池。
invokeSync 同步发送请求。通过参数中 channel 确定给哪个 client 发消息
invokeAsync 异步发送请求
invokeOneway 发送请求不关心返回值

RemotingClient

方法名 概述
updateNameServerAddressList 太有业务语义的方法
getNameServerAddressList 注册默认处理器
registerProcessor client 端注册的处理器处理什么?
setCallbackExecutor 设置执行 callback 的线程池
getCallbackExecutor
isChannelWritable 还有不可写的 channel 吗?
invokeSync 同步调用
invokeAsync 异步调用
invokeOneway oneway

三、消息的协议设计与编码解码

请求 DTO-RemotingCommand

在Client和Server之间完成一次消息发送时,需要对发送的消息进行一个协议约定,因此就有必要自定义RocketMQ的消息协议。同时,为了高效地在网络中传输消息和对收到的消息读取,就需要对消息进行编解码。在RocketMQ中,RemotingCommand这个类在消息传输过程中对所有数据内容的封装,不但包含了所有的数据结构,还包含了编码解码操作。 RemotingCommand类的部分成员变量如下:

Header字段 类型 Request说明 Response说明
code int 请求操作码,应答方根据不同的请求码进行不同的业务处理 应答响应码。0表示成功,非0则表示各种错误
type RemotingCommandType 枚举值 Request 枚举值 Response
language LanguageCode 请求方实现的语言 应答方实现的语言
version int 请求方程序的版本 应答方程序的版本
opaque int 相当于reqeustId,在同一个连接上的不同请求标识码,与响应消息中的相对应 应答不做修改直接返回
flag int 区分是普通RPC还是onewayRPC得标志 区分是普通RPC还是onewayRPC得标志
remark String 传输自定义文本信息 传输自定义文本信息
extFields HashMap 请求自定义扩展信息 响应自定义扩展信息
body byte[]
serializeTypeCurrentRPC SerializeType 序列化方式 序列化方式
customHeader CommandCustomHeader 自定义 header header

消息协议

下面来看下RocketMQ通信协议的格式:
rocketmq-remoting - 图3
可见传输内容主要可以分为以下4部分:
(1)消息长度
:总长度,四个字节存储,占用一个int类型;
(2)序列化类型&消息头长度
:同样占用一个int类型,第一个字节表示序列化类型,后面三个字节表示消息头长度;
(3)消息头数据
:经过序列化后的消息头数据;
(4)消息主体数据

编码

  1. public ByteBuffer encodeHeader(final int bodyLength) {
  2. // 1> header length size
  3. int length = 4;
  4. // 2> header data length
  5. byte[] headerData;
  6. headerData = this.headerEncode();
  7. length += headerData.length;
  8. // 3> body data length
  9. length += bodyLength;
  10. ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);
  11. // length
  12. result.putInt(length);
  13. // header length
  14. result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
  15. // header data
  16. result.put(headerData);
  17. result.flip();
  18. return result;
  19. }

解码

  1. /**
  2. * @param byteBuffer 不包括"消息长度"在内的数据
  3. * @return 解码结果
  4. */
  5. public static RemotingCommand decode(final ByteBuffer byteBuffer) {
  6. int length = byteBuffer.limit();
  7. int oriHeaderLen = byteBuffer.getInt();
  8. int headerLength = getHeaderLength(oriHeaderLen);
  9. byte[] headerData = new byte[headerLength];
  10. byteBuffer.get(headerData);
  11. RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
  12. int bodyLength = length - 4 - headerLength;
  13. byte[] bodyData = null;
  14. if (bodyLength > 0) {
  15. bodyData = new byte[bodyLength];
  16. byteBuffer.get(bodyData);
  17. }
  18. cmd.body = bodyData;
  19. return cmd;
  20. }

四、三种通信方式

从 RemotingServer 和 RemotingClient 的接口中可以看出,一共有三种发送消息的类型:

  1. async-异步
  2. sync-同步
  3. oneway-只发送,不关心接收

异步

异步执行流程图如下:
image.png
异步执行分为 3 个步骤:

  1. 创建连接(异步操作,分为链接和等待链接完成)
  2. 发送请求(异步操作,分为发送请求和发送请求成功)
  3. 接受 response(异步接受response)

这三个步骤在 netty 框架中都是异步执行的,但是对于使用者而言只感受到了一次异步,其原因是把创建连接同步化,并且把等待请求完成和等待 response 两次异步合二为一,不等待请求完成即返回。

创建连接

  1. private Channel createChannel(final String addr) throws InterruptedException {
  2. ChannelWrapper cw = this.channelTables.get(addr);
  3. if (cw != null && cw.isOK()) {
  4. return cw.getChannel();
  5. }
  6. //同步开始建立连接
  7. if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
  8. try {
  9. boolean createNewConnection;
  10. cw = this.channelTables.get(addr);
  11. if (cw != null) {
  12. if (cw.isOK()) {
  13. return cw.getChannel();
  14. } else if (!cw.getChannelFuture().isDone()) {
  15. createNewConnection = false;
  16. } else {
  17. this.channelTables.remove(addr);
  18. createNewConnection = true;
  19. }
  20. } else {
  21. createNewConnection = true;
  22. }
  23. if (createNewConnection) {
  24. ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
  25. log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
  26. cw = new ChannelWrapper(channelFuture);
  27. this.channelTables.put(addr, cw);
  28. }
  29. } catch (Exception e) {
  30. log.error("createChannel: create channel exception", e);
  31. } finally {
  32. this.lockChannelTables.unlock();
  33. }
  34. } else {
  35. log.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
  36. }
  37. //同步等待连接建立完毕
  38. if (cw != null) {
  39. ChannelFuture channelFuture = cw.getChannelFuture();
  40. if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {
  41. if (cw.isOK()) {
  42. log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
  43. return cw.getChannel();
  44. } else {
  45. log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause());
  46. }
  47. } else {
  48. log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
  49. channelFuture.toString());
  50. }
  51. }
  52. return null;
  53. }

可以看出其创建连接之后,同步等待连接建立完毕。

发送消息

由于remoting 模块支持异步,而 netty 中发送消息成功并等待 response 两个操作都是异步的,所以需要一个 bean 保存请求相关信息,将两个异步操作以一个异步操作的方式展示给调用方,在 netty 中使用 ResponseFuture 完成该功能,其属性如下:

类型 名称 概述
int opaque 请求的唯一标识,用于关联请求和响应
Channel processChannel 关联的 channel
long timeoutMillis 该次请求剩余等待响应返回的超时时间,若 timeout ms 后,仍然未返回响应,认为该次请求超时
InvokeCallback invokeCallback 回调函数
long beginTimestamp ResponseFuture 创建的时间,和 timeout 比较,用于判断请求是否超时
CountDownLatch countDownLatch 和工作线程进行同步
SemaphoreReleaseOnlyOnce once 优化项,暂时忽略
AtomicBoolean executeCallbackOnlyOnce
RemotingCommand responseCommand response 的封装类
请求发送是否成功 sendRequestOK
异常 cause

本质就是自己实现了一个 future ,通过 countDownLatch 实现主线程和 worker 线程直接的同步。

同步

同步执行流程图如下:
image.png
和异步的区别有两点:

  1. 发送请求后不是直接返回,而是对 ResponseFuture wait 直到服务端返回 response
  2. response 返回后不是执行 callback ,而是设置 response 以唤醒等待的请求

oneway

image.png
oneway 是最简单的形式,只有请求,不关心 response。

五、一些细节

异常处理机制&接口定义

我理解异常处理机制和接口定义是同一件事,也即如何对调用方屏蔽实现的复杂性,比如一次请求包括连接创建、发送请求、等待响应三个步骤,但是从调用方的角度考虑,就是调用一个方法,获取返回值。其他如建立连接超时、发送请求超时调用方根本不想理解这么多概念,实现方要保证

  1. 异常场景下的稳定性,也即充分考虑所有异常,能处理就处理,不能处理视情况决定是否抛出(对于业务代码,就是一个大 try - catch 变成系统异常,拒绝考虑实现的复杂性,统一暴露为系统异常)
  2. 暴露调用方想知晓的细节给调用方(哪些是调用方想知晓的呢?)

remoting 模块的异常体系如下:
image.png

类名 含义 何处抛出? 何处处理?
RemotingCommandException
1. Processor 处理时解码失败
2. RemotingCommond 校验 field 异常
3. 自定义 header 解码失败
RemotingConnectException client 建立与server 连接异常
1. 无法连接到 nameserver 时抛出(nameserver 地址无效、连接失败等)
2. 根据addr 建立连接失败

1. sendKernelImpl 时交由 SendMessageHook 处理
2. 交由 SendCallback 处理
RemotingSendRequestException 请求发送失败异常
1. channel.writeAndFlush(request).addListener(listener); 抛出的异常会被当转换为 RemotingSendRequestException
2. listener 失败并不会抛出 RemotingSendRequestException
RemotingTimeoutException 超时 调用超时即抛出该异常
RemotingTooMuchRequestException 超时 调用超时即抛出该异常

超时机制

如何实现异步?

由于异步需要

问题

  1. remoting 是 producer 、consumer 和 broker 通信的框架吗?
  2. 如果是,对于消息体的大小有无优化?
  3. remotign 对于异常是如何处理的?
  4. 同步的超时处理?
  5. 同步的加锁处理?