一、概述
dubbo服务的底层通信实现默认是netty,netty的通道接收到数据之后还要经过层层的包装类处理,是一个调用链,最终才会调用具体的invoker反射调用对应的接口。这个调用链是怎么形成的呢?在DubboProtocol类发布服务export时,会开启通信服务,在这个过程中构建这个调用链
二、时序图

三、响应过程源码解析
1、NettyServerHandler
public class NettyServerHandler extends ChannelDuplexHandler {public void channelActive(ChannelHandlerContext ctx) throws Exception {NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);try {if (channel != null) {channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel);}handler.connected(channel);} finally {NettyChannel.removeChannelIfDisconnected(ctx.channel());}}public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);try {//handler指向NettyServerhandler.received(channel, msg);} finally {NettyChannel.removeChannelIfDisconnected(ctx.channel());}}}
2、NettyServer
public class NettyServer extends AbstractServer implements Server {...}public abstract class AbstractPeer implements Endpoint, ChannelHandler {public void connected(Channel ch) throws RemotingException {if (closed) {return;}handler.connected(ch);}public void received(Channel ch, Object msg) throws RemotingException {if (closed) {return;}//handler指向MultiMessageHandlerhandler.received(ch, msg);}}

3、MultiMessageHandler
public class MultiMessageHandler extends AbstractChannelHandlerDelegate {public void received(Channel channel, Object message) throws RemotingException {if (message instanceof MultiMessage) {MultiMessage list = (MultiMessage) message;for (Object obj : list) {handler.received(channel, obj);}} else {//handler指向HeartbeatHandlerhandler.received(channel, message);}}}
4、HeartbeatHandler
public class HeartbeatHandler extends AbstractChannelHandlerDelegate {public void received(Channel channel, Object message) throws RemotingException {setReadTimestamp(channel);//心跳检测if (isHeartbeatRequest(message)) {Request req = (Request) message;if (req.isTwoWay()) {Response res = new Response(req.getId(), req.getVersion());res.setEvent(Response.HEARTBEAT_EVENT);channel.send(res);if (logger.isInfoEnabled()) {int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);if (logger.isDebugEnabled()) {logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()+ ", cause: The channel has no data-transmission exceeds a heartbeat period"+ (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));}}}return;}if (isHeartbeatResponse(message)) {if (logger.isDebugEnabled()) {logger.debug("Receive heartbeat response in thread " + Thread.currentThread().getName());}return;}//handler指向的是AllChannelHandlerhandler.received(channel, message);}}
5、AllChannelHandler
public class AllChannelHandler extends WrappedChannelHandler {public void connected(Channel channel) throws RemotingException {ExecutorService executor = getExecutorService();try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));} catch (Throwable t) {throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);}}public void received(Channel channel, Object message) throws RemotingException {ExecutorService executor = getExecutorService();try {//提交到线程池执行executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {//TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring//fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time outif(message instanceof Request && t instanceof RejectedExecutionException){Request request = (Request)message;if(request.isTwoWay()){String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();Response response = new Response(request.getId(), request.getVersion());response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);response.setErrorMessage(msg);channel.send(response);return;}}throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}}}
6、ChannelEventRunnable
public class ChannelEventRunnable implements Runnable {public void run() {if (state == ChannelState.RECEIVED) {try {//handler指向的是DecodeHandlerhandler.received(channel, message);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is " + message, e);}} else {switch (state) {case CONNECTED:try {handler.connected(channel);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);}break;case DISCONNECTED:try {handler.disconnected(channel);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);}break;case SENT:try {handler.sent(channel, message);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is " + message, e);}break;case CAUGHT:try {handler.caught(channel, exception);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is: " + message + ", exception is " + exception, e);}break;default:logger.warn("unknown state: " + state + ", message is " + message);}}}}
7、DecodeHandler
public class DecodeHandler extends AbstractChannelHandlerDelegate {public void received(Channel channel, Object message) throws RemotingException {if (message instanceof Decodeable) {decode(message);}if (message instanceof Request) {decode(((Request) message).getData());}if (message instanceof Response) {decode(((Response) message).getResult());}//handler指向HeaderExchangeHandlerhandler.received(channel, message);}}
8、HeaderExchangeHandler
public class HeaderExchangeHandler implements ChannelHandlerDelegate {public void received(Channel channel, Object message) throws RemotingException {channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);try {if (message instanceof Request) {// handle request.Request request = (Request) message;if (request.isEvent()) {handlerEvent(channel, request);} else {if (request.isTwoWay()) {//代码会走到这一步handleRequest(exchangeChannel, request);} else {handler.received(exchangeChannel, request.getData());}}} else if (message instanceof Response) {handleResponse(channel, (Response) message);} else if (message instanceof String) {if (isClientSide(channel)) {Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());logger.error(e.getMessage(), e);} else {String echo = handler.telnet(channel, (String) message);if (echo != null && echo.length() > 0) {channel.send(echo);}}} else {handler.received(exchangeChannel, message);}} finally {HeaderExchangeChannel.removeChannelIfDisconnected(channel);}}void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {Response res = new Response(req.getId(), req.getVersion());if (req.isBroken()) {Object data = req.getData();String msg;if (data == null) {msg = null;} else if (data instanceof Throwable) {msg = StringUtils.toString((Throwable) data);} else {msg = data.toString();}res.setErrorMessage("Fail to decode request due to: " + msg);res.setStatus(Response.BAD_REQUEST);channel.send(res);return;}// find handler by message class.Object msg = req.getData();try {//handler指向的是ExchangeHandlerAdapter的实例,但ExchangeHandlerAdapter是抽象类,//它的具体实现在DubboProtocol类中CompletionStage<Object> future = handler.reply(channel, msg);future.whenComplete((appResult, t) -> {try {if (t == null) {res.setStatus(Response.OK);res.setResult(appResult);} else {res.setStatus(Response.SERVICE_ERROR);res.setErrorMessage(StringUtils.toString(t));}channel.send(res);} catch (RemotingException e) {logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);} finally {// HeaderExchangeChannel.removeChannelIfDisconnected(channel);}});} catch (Throwable e) {res.setStatus(Response.SERVICE_ERROR);res.setErrorMessage(StringUtils.toString(e));channel.send(res);}}}
9、ExchangeHandlerAdapter
public class DubboProtocol extends AbstractProtocol {private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {if (!(message instanceof Invocation)) {throw new RemotingException(channel, "Unsupported request: "+ (message == null ? null : (message.getClass().getName() + ": " + message))+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());}Invocation inv = (Invocation) message;//获取invoker实例,这个实例并不是接口的实现类,而是之前服务发布过程中被层层包装的//Invoker实例,要经过包装类和filter责任链的层层调用才能最终调用到服务的实现类Invoker<?> invoker = getInvoker(channel, inv);// need to consider backward-compatibility if it's a callbackif (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {String methodsStr = invoker.getUrl().getParameters().get("methods");boolean hasMethod = false;if (methodsStr == null || !methodsStr.contains(",")) {hasMethod = inv.getMethodName().equals(methodsStr);} else {String[] methods = methodsStr.split(",");for (String method : methods) {if (inv.getMethodName().equals(method)) {hasMethod = true;break;}}}if (!hasMethod) {logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()+ " not found in callback service interface ,invoke will be ignored."+ " please update the api interface. url is:"+ invoker.getUrl()) + " ,invocation is :" + inv);return null;}}RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());Result result = invoker.invoke(inv);return result.completionFuture().thenApply(Function.identity());}public void received(Channel channel, Object message) throws RemotingException {if (message instanceof Invocation) {reply((ExchangeChannel) channel, message);} else {super.received(channel, message);}}public void connected(Channel channel) throws RemotingException {invoke(channel, ON_CONNECT_KEY);}public void disconnected(Channel channel) throws RemotingException {if (logger.isDebugEnabled()) {logger.debug("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl());}invoke(channel, ON_DISCONNECT_KEY);}private void invoke(Channel channel, String methodKey) {Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);if (invocation != null) {try {received(channel, invocation);} catch (Throwable t) {logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);}}}private Invocation createInvocation(Channel channel, URL url, String methodKey) {String method = url.getParameter(methodKey);if (method == null || method.length() == 0) {return null;}RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);invocation.setAttachment(PATH_KEY, url.getPath());invocation.setAttachment(GROUP_KEY, url.getParameter(GROUP_KEY));invocation.setAttachment(INTERFACE_KEY, url.getParameter(INTERFACE_KEY));invocation.setAttachment(VERSION_KEY, url.getParameter(VERSION_KEY));if (url.getParameter(STUB_EVENT_KEY, false)) {invocation.setAttachment(STUB_EVENT_KEY, Boolean.TRUE.toString());}return invocation;}};}
