一、概述

dubbo服务的底层通信实现默认是netty,netty的通道接收到数据之后还要经过层层的包装类处理,是一个调用链,最终才会调用具体的invoker反射调用对应的接口。这个调用链是怎么形成的呢?在DubboProtocol类发布服务export时,会开启通信服务,在这个过程中构建这个调用链

二、时序图

image.png

三、响应过程源码解析

1、NettyServerHandler

  1. public class NettyServerHandler extends ChannelDuplexHandler {
  2. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  3. NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
  4. try {
  5. if (channel != null) {
  6. channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel);
  7. }
  8. handler.connected(channel);
  9. } finally {
  10. NettyChannel.removeChannelIfDisconnected(ctx.channel());
  11. }
  12. }
  13. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  14. NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
  15. try {
  16. //handler指向NettyServer
  17. handler.received(channel, msg);
  18. } finally {
  19. NettyChannel.removeChannelIfDisconnected(ctx.channel());
  20. }
  21. }
  22. }

2、NettyServer

  1. public class NettyServer extends AbstractServer implements Server {
  2. ...
  3. }
  4. public abstract class AbstractPeer implements Endpoint, ChannelHandler {
  5. public void connected(Channel ch) throws RemotingException {
  6. if (closed) {
  7. return;
  8. }
  9. handler.connected(ch);
  10. }
  11. public void received(Channel ch, Object msg) throws RemotingException {
  12. if (closed) {
  13. return;
  14. }
  15. //handler指向MultiMessageHandler
  16. handler.received(ch, msg);
  17. }
  18. }

image.png

3、MultiMessageHandler

  1. public class MultiMessageHandler extends AbstractChannelHandlerDelegate {
  2. public void received(Channel channel, Object message) throws RemotingException {
  3. if (message instanceof MultiMessage) {
  4. MultiMessage list = (MultiMessage) message;
  5. for (Object obj : list) {
  6. handler.received(channel, obj);
  7. }
  8. } else {
  9. //handler指向HeartbeatHandler
  10. handler.received(channel, message);
  11. }
  12. }
  13. }

4、HeartbeatHandler

  1. public class HeartbeatHandler extends AbstractChannelHandlerDelegate {
  2. public void received(Channel channel, Object message) throws RemotingException {
  3. setReadTimestamp(channel);
  4. //心跳检测
  5. if (isHeartbeatRequest(message)) {
  6. Request req = (Request) message;
  7. if (req.isTwoWay()) {
  8. Response res = new Response(req.getId(), req.getVersion());
  9. res.setEvent(Response.HEARTBEAT_EVENT);
  10. channel.send(res);
  11. if (logger.isInfoEnabled()) {
  12. int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
  13. if (logger.isDebugEnabled()) {
  14. logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
  15. + ", cause: The channel has no data-transmission exceeds a heartbeat period"
  16. + (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
  17. }
  18. }
  19. }
  20. return;
  21. }
  22. if (isHeartbeatResponse(message)) {
  23. if (logger.isDebugEnabled()) {
  24. logger.debug("Receive heartbeat response in thread " + Thread.currentThread().getName());
  25. }
  26. return;
  27. }
  28. //handler指向的是AllChannelHandler
  29. handler.received(channel, message);
  30. }
  31. }

5、AllChannelHandler

  1. public class AllChannelHandler extends WrappedChannelHandler {
  2. public void connected(Channel channel) throws RemotingException {
  3. ExecutorService executor = getExecutorService();
  4. try {
  5. executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
  6. } catch (Throwable t) {
  7. throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
  8. }
  9. }
  10. public void received(Channel channel, Object message) throws RemotingException {
  11. ExecutorService executor = getExecutorService();
  12. try {
  13. //提交到线程池执行
  14. executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
  15. } catch (Throwable t) {
  16. //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
  17. //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
  18. if(message instanceof Request && t instanceof RejectedExecutionException){
  19. Request request = (Request)message;
  20. if(request.isTwoWay()){
  21. String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
  22. Response response = new Response(request.getId(), request.getVersion());
  23. response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
  24. response.setErrorMessage(msg);
  25. channel.send(response);
  26. return;
  27. }
  28. }
  29. throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
  30. }
  31. }
  32. }

6、ChannelEventRunnable

  1. public class ChannelEventRunnable implements Runnable {
  2. public void run() {
  3. if (state == ChannelState.RECEIVED) {
  4. try {
  5. //handler指向的是DecodeHandler
  6. handler.received(channel, message);
  7. } catch (Exception e) {
  8. logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
  9. + ", message is " + message, e);
  10. }
  11. } else {
  12. switch (state) {
  13. case CONNECTED:
  14. try {
  15. handler.connected(channel);
  16. } catch (Exception e) {
  17. logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
  18. }
  19. break;
  20. case DISCONNECTED:
  21. try {
  22. handler.disconnected(channel);
  23. } catch (Exception e) {
  24. logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
  25. }
  26. break;
  27. case SENT:
  28. try {
  29. handler.sent(channel, message);
  30. } catch (Exception e) {
  31. logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
  32. + ", message is " + message, e);
  33. }
  34. break;
  35. case CAUGHT:
  36. try {
  37. handler.caught(channel, exception);
  38. } catch (Exception e) {
  39. logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
  40. + ", message is: " + message + ", exception is " + exception, e);
  41. }
  42. break;
  43. default:
  44. logger.warn("unknown state: " + state + ", message is " + message);
  45. }
  46. }
  47. }
  48. }

7、DecodeHandler

  1. public class DecodeHandler extends AbstractChannelHandlerDelegate {
  2. public void received(Channel channel, Object message) throws RemotingException {
  3. if (message instanceof Decodeable) {
  4. decode(message);
  5. }
  6. if (message instanceof Request) {
  7. decode(((Request) message).getData());
  8. }
  9. if (message instanceof Response) {
  10. decode(((Response) message).getResult());
  11. }
  12. //handler指向HeaderExchangeHandler
  13. handler.received(channel, message);
  14. }
  15. }

8、HeaderExchangeHandler

  1. public class HeaderExchangeHandler implements ChannelHandlerDelegate {
  2. public void received(Channel channel, Object message) throws RemotingException {
  3. channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
  4. final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
  5. try {
  6. if (message instanceof Request) {
  7. // handle request.
  8. Request request = (Request) message;
  9. if (request.isEvent()) {
  10. handlerEvent(channel, request);
  11. } else {
  12. if (request.isTwoWay()) {
  13. //代码会走到这一步
  14. handleRequest(exchangeChannel, request);
  15. } else {
  16. handler.received(exchangeChannel, request.getData());
  17. }
  18. }
  19. } else if (message instanceof Response) {
  20. handleResponse(channel, (Response) message);
  21. } else if (message instanceof String) {
  22. if (isClientSide(channel)) {
  23. Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
  24. logger.error(e.getMessage(), e);
  25. } else {
  26. String echo = handler.telnet(channel, (String) message);
  27. if (echo != null && echo.length() > 0) {
  28. channel.send(echo);
  29. }
  30. }
  31. } else {
  32. handler.received(exchangeChannel, message);
  33. }
  34. } finally {
  35. HeaderExchangeChannel.removeChannelIfDisconnected(channel);
  36. }
  37. }
  38. void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
  39. Response res = new Response(req.getId(), req.getVersion());
  40. if (req.isBroken()) {
  41. Object data = req.getData();
  42. String msg;
  43. if (data == null) {
  44. msg = null;
  45. } else if (data instanceof Throwable) {
  46. msg = StringUtils.toString((Throwable) data);
  47. } else {
  48. msg = data.toString();
  49. }
  50. res.setErrorMessage("Fail to decode request due to: " + msg);
  51. res.setStatus(Response.BAD_REQUEST);
  52. channel.send(res);
  53. return;
  54. }
  55. // find handler by message class.
  56. Object msg = req.getData();
  57. try {
  58. //handler指向的是ExchangeHandlerAdapter的实例,但ExchangeHandlerAdapter是抽象类,
  59. //它的具体实现在DubboProtocol类中
  60. CompletionStage<Object> future = handler.reply(channel, msg);
  61. future.whenComplete((appResult, t) -> {
  62. try {
  63. if (t == null) {
  64. res.setStatus(Response.OK);
  65. res.setResult(appResult);
  66. } else {
  67. res.setStatus(Response.SERVICE_ERROR);
  68. res.setErrorMessage(StringUtils.toString(t));
  69. }
  70. channel.send(res);
  71. } catch (RemotingException e) {
  72. logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
  73. } finally {
  74. // HeaderExchangeChannel.removeChannelIfDisconnected(channel);
  75. }
  76. });
  77. } catch (Throwable e) {
  78. res.setStatus(Response.SERVICE_ERROR);
  79. res.setErrorMessage(StringUtils.toString(e));
  80. channel.send(res);
  81. }
  82. }
  83. }

9、ExchangeHandlerAdapter

  1. public class DubboProtocol extends AbstractProtocol {
  2. private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
  3. public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
  4. if (!(message instanceof Invocation)) {
  5. throw new RemotingException(channel, "Unsupported request: "
  6. + (message == null ? null : (message.getClass().getName() + ": " + message))
  7. + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
  8. }
  9. Invocation inv = (Invocation) message;
  10. //获取invoker实例,这个实例并不是接口的实现类,而是之前服务发布过程中被层层包装的
  11. //Invoker实例,要经过包装类和filter责任链的层层调用才能最终调用到服务的实现类
  12. Invoker<?> invoker = getInvoker(channel, inv);
  13. // need to consider backward-compatibility if it's a callback
  14. if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
  15. String methodsStr = invoker.getUrl().getParameters().get("methods");
  16. boolean hasMethod = false;
  17. if (methodsStr == null || !methodsStr.contains(",")) {
  18. hasMethod = inv.getMethodName().equals(methodsStr);
  19. } else {
  20. String[] methods = methodsStr.split(",");
  21. for (String method : methods) {
  22. if (inv.getMethodName().equals(method)) {
  23. hasMethod = true;
  24. break;
  25. }
  26. }
  27. }
  28. if (!hasMethod) {
  29. logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
  30. + " not found in callback service interface ,invoke will be ignored."
  31. + " please update the api interface. url is:"
  32. + invoker.getUrl()) + " ,invocation is :" + inv);
  33. return null;
  34. }
  35. }
  36. RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
  37. Result result = invoker.invoke(inv);
  38. return result.completionFuture().thenApply(Function.identity());
  39. }
  40. public void received(Channel channel, Object message) throws RemotingException {
  41. if (message instanceof Invocation) {
  42. reply((ExchangeChannel) channel, message);
  43. } else {
  44. super.received(channel, message);
  45. }
  46. }
  47. public void connected(Channel channel) throws RemotingException {
  48. invoke(channel, ON_CONNECT_KEY);
  49. }
  50. public void disconnected(Channel channel) throws RemotingException {
  51. if (logger.isDebugEnabled()) {
  52. logger.debug("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl());
  53. }
  54. invoke(channel, ON_DISCONNECT_KEY);
  55. }
  56. private void invoke(Channel channel, String methodKey) {
  57. Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
  58. if (invocation != null) {
  59. try {
  60. received(channel, invocation);
  61. } catch (Throwable t) {
  62. logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
  63. }
  64. }
  65. }
  66. private Invocation createInvocation(Channel channel, URL url, String methodKey) {
  67. String method = url.getParameter(methodKey);
  68. if (method == null || method.length() == 0) {
  69. return null;
  70. }
  71. RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
  72. invocation.setAttachment(PATH_KEY, url.getPath());
  73. invocation.setAttachment(GROUP_KEY, url.getParameter(GROUP_KEY));
  74. invocation.setAttachment(INTERFACE_KEY, url.getParameter(INTERFACE_KEY));
  75. invocation.setAttachment(VERSION_KEY, url.getParameter(VERSION_KEY));
  76. if (url.getParameter(STUB_EVENT_KEY, false)) {
  77. invocation.setAttachment(STUB_EVENT_KEY, Boolean.TRUE.toString());
  78. }
  79. return invocation;
  80. }
  81. };
  82. }

四、时序图中的调用链是如何构建的