5.Netty核心源码剖析
5.1 Netty源码构建
- 下载源码
https://github.com/netty/netty
2. 导入项目工程
3. 将入门案例demo代码example模块下
5.2 EventLoopGroup事件循环组(线程组)源码
EventLoopGroup 是一组 EventLoop 的抽象,Netty 为了更好的利用多核 CPU 资源,一般会有多个 EventLoop 同时工作,每个 EventLoop 维护着一个 Selector 实例。5.2.1 线程组源码流程分析
5.2.2 线程组源码主要源码跟踪
- NioEventLoopGroup线程组的创建
```java
static {
//默认线程数量为处理器数*2
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
); if (logger.isDebugEnabled()) {"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)
} } /**logger.debug("-Dio.netty.eventLoopThreads: {}",DEFAULT_EVENT_LOOP_THREADS);
- @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object…) */ protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object… args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); } ```
- NioEventLoop的创建
newChild方法protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {if (nThreads <= 0) {throw new IllegalArgumentException(String.format("nThreads: %d(expected: > 0)", nThreads));}if (executor == null) {executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());}//根据线程数量创建children = new EventExecutor[nThreads];for (int i = 0; i < nThreads; i ++) {boolean success = false;try {//循环创建线程NioEventLoopchildren[i] = newChild(executor, args);success = true;} catch (Exception e) {// TODO: Think about if this is a good exception typethrow new IllegalStateException("failed to create a child event loop", e);} finally {...}}......}
NioEventLoop@Overrideprotected EventLoop newChild(Executor executor, Object... args) throws Exception {EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;//创建NioEventLoopreturn new NioEventLoop(this, executor,(SelectorProvider) args[0],((SelectStrategyFactory) args[1]).newSelectStrategy(),(RejectedExecutionHandler) args[2],queueFactory);}
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,SelectStrategy strategy,RejectedExecutionHandler rejectedExecutionHandler,EventLoopTaskQueueFactory queueFactory) {super(parent, executor, false, newTaskQueue(queueFactory),newTaskQueue(queueFactory),rejectedExecutionHandler);if (selectorProvider == null) {throw new NullPointerException("selectorProvider");}if (strategy == null) {throw new NullPointerException("selectStrategy");}provider = selectorProvider;//创建选择器final SelectorTuple selectorTuple = openSelector();selector = selectorTuple.selector;unwrappedSelector = selectorTuple.unwrappedSelector;selectStrategy = strategy;}
5.3 Netty启动源码
5.3.1 启动流程分析
5.3.2 主要源码跟踪
- initAndRegister方法
final ChannelFuture initAndRegister() {Channel channel = null;try {//创建通道channel = channelFactory.newChannel();//通道初始化init(channel);} catch (Throwable t) {if (channel != null) {// channel can be null if newChannel crashed (eg SocketException("too many open files"))channel.unsafe().closeForcibly();// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);}// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);}//注册通道ChannelFuture regFuture = config().group().register(channel);if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}return regFuture;}
- init方法
void init(Channel channel) {setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger);setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0)));//得到通道pipelineChannelPipeline p = channel.pipeline();//赋值workGroup与服务端handlerfinal EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));//添加通道初始化handlerp.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) {final ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = config.handler();if (handler != null) {pipeline.addLast(handler);}ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {//在initChannel方法中添加ServerBootstrapAcceptor的handlerpipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler,currentChildOptions, currentChildAttrs));}});}});}
- register方法
public final void register(EventLoop eventLoop, final ChannelPromise promise) {if (eventLoop == null) {throw new NullPointerException("eventLoop");}if (isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));return;}if (!isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));return;}AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {register0(promise);} else {try {//执行NioEventLoopeventLoop.execute(new Runnable() {@Overridepublic void run() {//注册通道register0(promise);}});} catch (Throwable t) {logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this, t);closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}}
- execute方法
public void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}boolean inEventLoop = inEventLoop();//添加到任务队列addTask(task);if (!inEventLoop) {//启动线程startThread();if (isShutdown()) {boolean reject = false;try {if (removeTask(task)) {reject = true;}} catch (UnsupportedOperationException e) {// The task queue does not support removal so the best thing we can do is to just move on and// hope we will be able to pick-up the task before its completely terminated.// In worst case we will log on termination.}if (reject) {reject();}}}if (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);}}
- startThread—>run方法
protected void run() {for (;;) {try {try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.BUSY_WAIT:// fall-through to SELECT since the busy-wait is not supported with NIOcase SelectStrategy.SELECT:select(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}// fall throughdefault:}} catch (IOException e) {// If we receive an IOException here its because the Selector is messed up. Let's rebuild// the selector and retry. https://github.com/netty/netty/issues/8566rebuildSelector0();handleLoopException(e);continue;}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;if (ioRatio == 100) {try {processSelectedKeys();} finally {// Ensure we always run tasks.runAllTasks();}} else {final long ioStartTime = System.nanoTime();try {//处理SelectedKeyprocessSelectedKeys();} finally {// Ensure we always run tasks.final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}}}
- runAllTasks方法
protected boolean runAllTasks(long timeoutNanos) {fetchFromScheduledTaskQueue();//轮询任务Runnable task = pollTask();if (task == null) {afterRunningAllTasks();return false;}final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;long runTasks = 0;long lastExecutionTime;for (;;) {//执行任务safeExecute(task);runTasks ++;// Check timeout every 64 tasks because nanoTime() is relatively expensive.// XXX: Hard-coded value - will make it configurable if it is really a problem.if ((runTasks & 0x3F) == 0) {lastExecutionTime = ScheduledFutureTask.nanoTime();if (lastExecutionTime >= deadline) {break;}}task = pollTask();if (task == null) {lastExecutionTime = ScheduledFutureTask.nanoTime();break;}}afterRunningAllTasks();this.lastExecutionTime = lastExecutionTime;return true;}
- register0方法
doRegister()private void register0(ChannelPromise promise) {try {// check if the channel is still open as it could be closed in the mean time when the register// call was outside of the eventLoopif (!promise.setUncancellable() || !ensureOpen(promise)) {return;}boolean firstRegistration = neverRegistered;//注册通道doRegister();neverRegistered = false;registered = true;// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the// 向pipeline中添加handler主要是针对initChannel方法中pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);pipeline.fireChannelRegistered();// Only fire a channelActive if the channel has never been registered. This prevents firing// multiple channel actives if the channel is deregistered and reregistered.if (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {// This channel was registered before and autoRead() is set. This means we need to begin read// again so that we process inbound data.//// See https://github.com/netty/netty/issues/4805beginRead();}}} catch (Throwable t) {// Close the channel directly to avoid FD leak.closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}
invokeHandlerAddedIfNeeded—>initChannel方法protected void doRegister() throws Exception {boolean selected = false;for (;;) {try {//将channel注册到Selector上selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {if (!selected) {// Force the Selector to select now as the "canceled" SelectionKey may still be// cached and not removed because no Select.select(..)operation was called yet.eventLoop().selectNow();selected = true;} else {// We forced a select operation on the selector before but the SelectionKey is still cached// for whatever reason. JDK bug ?throw e;}}}}
p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) {final ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = config.handler();if (handler != null) {pipeline.addLast(handler);}ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {//在initChannel方法中添加ServerBootstrapAcceptor的handlerpipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler,currentChildOptions, currentChildAttrs));}});}});
5.4 BossGroup/WorkGroup/消息入站源码
BossGroup主要负责监听. workGroup负责消息处理. 主要看下BossGroup如何将通道交给workGroup的,和如何处理消息读取的.即入站5.4.1 BossGroup/WorkGroup源码分析
5.4.2 主要流程源码
- processSelectedKeysOptimized
private void processSelectedKeysOptimized() {for (int i = 0; i < selectedKeys.size; ++i) {final SelectionKey k = selectedKeys.keys[i];// null out entry in the array to allow to have it GC'ed once the Channel close// See https://github.com/netty/netty/issues/2363selectedKeys.keys[i] = null;final Object a = k.attachment();if (a instanceof AbstractNioChannel) {//处理SelectedKeyprocessSelectedKey(k, (AbstractNioChannel) a);} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}if (needsToSelectAgain) {// null out entries in the array to allow to have it GC'ed once the Channel close// See https://github.com/netty/netty/issues/2363selectedKeys.reset(i + 1);selectAgain();i = -1;}}}
- processSelectedKey
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) {final EventLoop eventLoop;try {eventLoop = ch.eventLoop();} catch (Throwable ignored) {// If the channel implementation throws an exception because there is no event loop, we ignore this// because we are only trying to determine if ch is registered to this event loop and thus has authority// to close ch.return;}// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is// still healthy and should not be closed.// See https://github.com/netty/netty/issues/5125if (eventLoop != this || eventLoop == null) {return;}// close the channel if the key is not valid anymoreunsafe.close(unsafe.voidPromise());return;}try {int readyOps = k.readyOps();// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise// the NIO JDK channel implementation may throw a NotYetConnectedException.if ((readyOps & SelectionKey.OP_CONNECT) != 0) {// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking// See https://github.com/netty/netty/issues/924int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.if ((readyOps & SelectionKey.OP_WRITE) != 0) {// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to writech.unsafe().forceFlush();}// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// to a spin loopif ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {//读取数据unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}
unsafe.read()
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {@Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {//获得通道信息final Channel child = (Channel) msg;//将服务端处理器添加到pipeline中child.pipeline().addLast(childHandler);setChannelOptions(child, childOptions, logger);setAttributes(child, childAttrs);try {//将通道注册到workGroup线程组上childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throwsException {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}}}
5.5 消息出站源码
private void write(Object msg, boolean flush, ChannelPromise promise) {ObjectUtil.checkNotNull(msg, "msg");try {if (isNotValidPromise(promise, true)) {ReferenceCountUtil.release(msg);// cancelledreturn;}} catch (RuntimeException e) {ReferenceCountUtil.release(msg);throw e;}//得到上下文出站handler-从后往前查找final AbstractChannelHandlerContext next = findContextOutbound(flush ? (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);final Object m = pipeline.touch(msg, next);EventExecutor executor = next.executor();if (executor.inEventLoop()) {if (flush) {//调用写入和刷新方法next.invokeWriteAndFlush(m, promise);} else {next.invokeWrite(m, promise);}} else {final AbstractWriteTask task;if (flush) {task = WriteAndFlushTask.newInstance(next, m, promise);} else {task = WriteTask.newInstance(next, m, promise);}if (!safeExecute(executor, task, promise, m)) {// We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes// and put it back in the Recycler for re-use later.//// See https://github.com/netty/netty/issues/8343.task.cancel();}}}
