RocketMQ 是如何使用Netty的?
在RocketMQ中,Netty 作为通信框架,负责NameServer、Broker 、客户端之间的通信。
其中的入口类是服务端 NettyRemotingServer 和客户端 NettyRemotingClient
Netty原理
先了解Netty的原理,核心类是线程组EventLoopGroup和线程EventLoop,业务实现ChannelHandler
在Netty的服务端内,一般有两个EventLoopGroup类型的线程组,一个是负责新连接的建立的 bossGroup,一个是负责新连接后续的IO事件的 workerGroup,其中EventLoop是单个线程的抽象。
面试中经常问到的主从Reactor模型,在Netty中就是服务端的 bossGroup和workerGroup
主从Reactor模型解决了两点,单Reactor单线程的阻塞和单Reactor多线程的线程数量瓶颈。
bossGroup的EventLoop是主Reactor,监听等待连接事件(OP_ACCEPT),该EventLoop在监听成功后创建子Channel,从workerGroup选择EventLoop和其绑定。
workerGroup线程组的eventLoop是从Reactor ,负责处理新连接后续的IO事件,一般我们的业务会在ChannelHandler 实现,比如RocketMQ就实现SimpleChannelInboundHandler
Netty 实现 Reactor 的核心类NioEventLoop ,其实现则是死循环等待 I/O 事件产生,然后处理事件,以及处理提交至线程中的任务,包括提交的异步任务、定时任务、尾部任务。
亮点:Netty通过反射优化SelectedSelectionKeySet内的processSelectedKeys,将原来的Set替换为数组。
RokcetMQ的Netty应用
回到RocketMQ这里,以 4.9.0 版本的代码为例,在 rocketmq-remoting 模块下可以找到Netty的封装
在RocketMQ中,NettyRemotingServer初始化时创建Netty的服务端,和NettyRemotingClient初始化时创建Netty的客户端,
初始化时会带一个ChannelEventListener(实现类是ClientHousekeepingService或BrokerHousekeepingService),用于处理broker到客户端的心跳,broker到无效的name server,监听connect和close、idle、exception事件
RocketMQ从 SimpleChannelInboundHandler
如何创建服务端
服务端类是NettyRemotingServer ,创建的逻辑如下,
- 通过group将主从线程组关联
- 通过channel关联channel的实现
- 通过option初始化channel的配置
- 通过handler关联ChannelHandler,其中RocketMQ实际的业务逻辑就在NettyServerHandler serverHandler(这里用到的pipeline 是责任链模式)

创建线程组
线程组的创建,Reactor支持epoll则使用EpollEventLoopGroup,否则使用NioEventLoopGroup,而处理耗时业务的线程池则时使用DefaultEventExecutorGroup
如何创建客户端
客户端代码是NettyRemotingClient,创建的逻辑和服务端类似,但少了线程组
如何创建服务端到客户端的连接
使用 bootstrap.connect 将客户端连接到服务端
如何接收消息并实现业务
服务端和客户端都是在 ChannelInitializer 内添加的,通过 ChannelPipeline.addLast 添加
在服务端是NettyServerHandler,在客户端是NettyClientHandler,两者实际上都是调用NettyRemotingAbstract.processMessageReceived
而NettyRemotingAbstract.processMessageReceived 内则是按请求和响应分类处理
processMessageReceived 调用的是NettyRemotingAbstract.processRequestCommand 由,this.processorTable.get(cmd.getCode()) 决定具体的NettyRequestProcessor
其中的业务逻辑AsyncNettyRequestProcessor则是调用具体的NettyRequestProcessor实现
如何回写消息
看回 NettyServerHandler ,其中有上下文 ChannelHandlerContext 
处理完业务后,通过 ChannelHandlerContext.writeAndFlush(response)或者 Channel.writeAndFlush(response) 回写消息即可
同时也支持回调方法,在执行结束后回调
