整个调用流程图

image.png

Dubbo 的总体的调用过程吗?

调用过程图:

Dubbo原理 - 图2

  • 1.Proxy 持有一个 Invoker 对象,使用 Invoker 调用
    1. 之后通过Cluster 进行负载容错,失败重试
    1. 调用 Directory获取远程服务的 Invoker列表
    1. 负载均衡
  • 用户配置了路由规则,则根据路由规则过滤获取到的 Invoker 列表
  • 用户没有配置路由规则或配置路由后还有很多节点,则使用 LoadBalance 方法做负载均衡,选用一个可以调用的 Invoker
  • 5.经过一个一个过滤器链,通常是处理上下文、限流、计数等。
    1. 使用 Client 做数据传输
  • 7.私有化协议的构造(Codec)
    1. 进行序列化
    1. 服务端收到这个 Request 请求,将其分配到 ThreadPool中进行处理
  • 10.Server 来处理这些 Request
    1. 根据请求查找对应的 Exporter
    1. 之后经过一个服务提供者端的过滤器链
    1. 然后找到接口实现并真正的调用,将请求结果返回

服务提供者暴露一个服务的详细过程

如果你仔细观察dubbo的启动日志你会发现,dubbo的provider启动主要是以下几个过程

1.首先provider启动时,先把想要提供的服务暴露在本地。

2.然后再把服务暴露到远程。

3.启动netty服务,建立长连接。

4.连接到注册中心zk上。

5.然后监控zk上的消费服务。
image.png

服务消费者消费一个服务的详细过程

image.png

首先ReferenceConfig类的init方法调用Protocol的refer方法生成Invoker实例。接下来把Invoker转为客户端需要的接口。

image.png
通过上⾯的流程图可以看到,服务消费者通过代理对象?Proxy?发起远程调⽤,接着通过⽹络客⼾端?Client?将编码后的请求发送给服务提供⽅的⽹络层上,也就是Server。Server?在收到请求后,⾸先要做的事情是对数据包进⾏解码。然后将解码后的请求发送⾄分发器?Dispatcher,再由分发器将请求派发到指定的线程池上,最后由线程池调⽤具体的服务。这就是⼀个远程调⽤请求的发送与接收过程。

服务提供者接收消费者请求的过程

默认情况下Dubbo使⽤Netty作为底层的通信框架。Netty检测到有数据⼊站后,⾸先会通过解码器对数据进⾏解码,并将解码后的数据传递给下⼀个⼊站处理器的指定⽅法。

执⾏流程

(1)?执⾏ExchangeCodec类解码⽅法
public?Object?decode(Channel?channel,?ChannelBuffer?buffer)?throws?IOException
(2)?解码后得到⼀个Request对象
? (3)?解码器将数据包解析成?Request?对象后,NettyServerHandler?的
channelRead?⽅法紧接着会收到这个对象,并将这个对象继续向下传递。这期间该
对象会被依次传递给?AbstractPeer、MultiMessageHandler、HeartbeatHandler
以及?AllChannelHandler。最后由?AllChannelHandler?将该对象封装到?Runnable
实现类对象中,并将?Runnable?放⼊线程池中执⾏后续的调⽤逻辑。
? (4)?接下来会调⽤ChannelEventRunnable,ChannelEventRunnable?将会是服务
调⽤过程的新起点
//?将?channel?和?message?传给?ChannelHandler?对象,进⾏后续的调⽤//?handler是
DecodeHandler类型对象handler.received(channel,?message);
(5)?执⾏HeaderExchangeHandler类⽅法
public?void?received(Channel?channel,?Object?message)
在该⽅法内部执⾏handleRequest逻辑
handleRequest(exchangeChannel,?request)
(6)?执⾏DubboProtocol类的invoke⽅法
Result?result?=?invoker.invoke(inv);
在该⽅法内部,调⽤我们的⽬标实现类内容
return?wrapper.invokeMethod(proxy,?methodName,?parameterTypes,?arguments);
⾄此,调⽤到了⽬标实现类的⽅法
handler.received(channel,?message);

Dubbo 服务注册与发现的流程

image.png

流程说明:

 Provider(提供者)绑定指定端口并启动服务
 指供者连接注册中心,并发本机 IP、端口、应用信息和提供服务信息发送至注册中心存储
 Consumer(消费者),连接注册中心 ,并发送应用信息、所求服务信息至注册中心
 注册中心根据 消费 者所求服务信息匹配对应的提供者列表发送至Consumer 应用缓存。
 Consumer 在发起远程调用时基于缓存的消费者列表择其一发起调用。
 Provider 状态变更会实时通知注册中心、在由注册中心实时推送至Consumer

设计的原因:

 Consumer 与 Provider 解偶,双方都可以横向增减节点数。
 注册中心对本身可做对等集群,可动态增减节点,并且任意一台宕掉后,将自动切换到另一台
 去中心化,双方不直接依懒注册中心,即使注册中心全部宕机短时间内也不会影响服务的调用
 服务提供者无状态,任意一台宕掉后,不影响使用

image.png

服务暴露和引用的流程

服务导出

Dubbo服务导出过程始于Spring容器发布刷新事件,Dubbo在接收到事件后会立即执行服务导出逻辑。

整个逻辑大致分为三个部分:

第一部分是前置工作,用于检查参数,组装URL。
第二部分是创建Invoker,导出服务到本地(JVM)或远程。
第三部分是向注册中心注册服务,用于服务发现。

服务导出做了两个事情

0.读取配置
1.服务注册: 服务信息注册到注册中心
2.根据服务的参数信息,启动对应的⽹络服务器(netty、tomcat、jetty等),⽤来接收⽹络请求

服务暴露的流程是怎么样的?

Dubbo原理 - 图8

  1. 通过 ServiceConfig 解析标签,创建 dubbo 标签解析器来解析 dubbo 的标签,容器创建完成之后,触发 ContextRefreshEvent 事件回调开始暴露服务

  2. 通过 proxyFactory.getInvoker 方法,并利用javassist 或 DdkProxyFactory 来进行动态代理,将服务暴露接口封装成 invoker 对象,里面包含了需要执行的方法的对象信息和具体的 URL 地址。

  3. 再通过 DubboProtocol 的实现把包装后的invoker 转换成 exporter

  4. 然后启动服务器 server,监听端口

  5. 最后 RegistryProtocol 保存 URL 地址和 invoker 的映射关系,同时注册到服务中心

Dubbo原理 - 图9

服务引用的流程是怎么样的?

当Spring启动过程中,会去给@Reference注解标注了的属性去进⾏赋值,赋值的对象为ReferenceBean中get()⽅法所返回的对象,这个对象是⼀个代理对象。

对于ReferenceBean,它表示应⽤想要引⼊的服务的信息,在执⾏get()时会做如下⼏步:
1. 调⽤checkAndUpdateSubConfigs(),检查和更新参数,和服务提供者类似,把ReferenceBean⾥的属性的值更新为优先级最⾼的参数值
2. 调⽤init()去⽣成代理对象ref,get()⽅法会返回这个ref
3. 在⽣成代理对象ref之前,先把消费者所引⼊服务设置的参数添加到⼀个map中,等会根据这个map中的参数去从注册中⼼查找服务
4. 把消费者配置的所有注册中⼼获取出来
a. 如果只有⼀个注册中⼼,那么直接调⽤Protocol的refer(interfaceClass, urls.get(0));得到⼀个Invoker对象


b. 如果有多个注册中⼼,则遍历每个注册中⼼,分别调⽤Protocol的refer(interfaceClass, url);得到⼀个Invoker对象添加到invokers中,然后把invokers调⽤CLUSTER.join(new StaticDirectory(u, invokers));封装所有invokers得到⼀个invoker,
5. 把最终得到的invoker对象调⽤PROXY_FACTORY.getProxy(invoker);得到⼀个代理对象,并返回,这个代理对象就是ref
6. 总结:上⽂的Invoker对象,表示服务执⾏者,从注册中⼼refer下来的是⼀个服务执⾏者,合并invokers后得到的invoker也是⼀个服务执⾏者(抽象范围更⼤了)

另一个版本

  1. 首先客户端根据 config 文件信息从注册中心订阅服务,首次会全量缓存到本地,后续的更新会监听动态更新到本地。

  2. 之后 DubboProtocol根据 provider的地址和接口信息连接到服务端server,开启客户端 client,然后创建 invoker

  3. 之后通过 invoker 为服务接口生成代理对象,这个代理对象用于远程调用 provider,至此完成了服务引用

Dubbo原理 - 图10

源码

出自图灵4

服务消费端执⾏逻辑

  1. MockClusterInvoker.invoke(new RpcInvocation(method, args)):Mock逻辑
    2. AbstractClusterInvoker.invoke(invocation):把RpcContext中设置的Attachments添加到
    invocation对象上,调⽤路由链从服务⽬录上筛选出适合的服务Invoker,获得服务均衡策略
    loadbalance
    3. FailoverClusterInvoker.doInvoke(invocation, invokers, loadbalance):根据负载均衡策略选出⼀
    个invoker,然后执⾏
    4. InvokerWrapper.invoke(invocation):没做什么事情
    5. CallbackRegistrationInvoker.invoke(invocation):开始执⾏Filter链,执⾏完得到结果后,会获取
    ListenableFilter中的listener,执⾏listener的onResponse⽅法
    6. ConsumerContextFilter.invoke(invocation):设置RpcContext中LocalAddress、
    RemoteAddress、RemoteApplicationName参数
    7. FutureFilter.invoke(invocation):
    8. MonitorFilter.invoke(invocation):⽅法的执⾏次数+1
    9. ListenerInvokerWrapper.invoke(invocation):没做什么事情
    10. AsyncToSyncInvoker.invoke(invocation):异步转同步,会先⽤下层Invoker去异步执⾏,然后阻塞
    Integer.MAX_VALUE时间,直到拿到了结果
    11. AbstractInvoker.invoke(invocation):主要调⽤DubboInvoker的doInvoke⽅法,如果doInvoker⽅
    法出现了异常,会进⾏包装,包装成AsyncRpcResult
    12. DubboInvoker.doInvoke(invocation):从clients轮询出⼀个client进⾏数据发送,如果配置了不关⼼
    结果,则调⽤ReferenceCountExchangeClient的send⽅法,否则调⽤
    ReferenceCountExchangeClient的request⽅法
    13. ReferenceCountExchangeClient.request(Object request, int timeout):没做什么事情
    14. HeaderExchangeClient.request(Object request, int timeout):没做什么事情
    15. HeaderExchangeChannel.request(Object request, int timeout):构造⼀个Request对象,并且会
    构造⼀个DefaultFuture对象来阻塞timeout的时间来等待结果,在构造DefaultFuture对象时,会把
    DefaultFuture对象和req的id存⼊FUTURES中,FUTURES是⼀个Map,当
    HeaderExchangeHandler接收到结果时,会从这个Map中根据id获取到DefaultFuture对象,然后返
    回Response。
    16. AbstractPeer.send(Object message):从url中获取send参数,默认为false
    17. AbstractClient.send(Object message, boolean sent):没做什么
    18. NettyChannel.send(Object message, boolean sent):调⽤NioSocketChannel的writeAndFlush
    发送数据,然后判断send如果是true,那么则阻塞url中指定的timeout时间,因为如果send是false,
    在HeaderExchangeChannel中会阻塞timeout时间
    19. NioSocketChannel.writeAndFlush(Object msg):最底层的Netty⾮阻塞式的发送数据

总结⼀下上⾯调⽤流程:
1. 最外层是Mock逻辑,调⽤前,调⽤后进⾏Mock
2. 从服务⽬录中,根据当前调⽤的⽅法和路由链,筛选出部分服务Invoker(DubboInvoker)
3. 对服务Invoker进⾏负载均衡,选出⼀个服务Invoker
4. 执⾏Filter链
5. AsyncToSyncInvoker完成异步转同步,因为DubboInvoker的执⾏是异步⾮阻塞的,所以如果是同步调⽤,则会在此处阻塞,知道拿到响应结果
6. DubboInvoker开始异步⾮阻塞的调⽤
7. HeaderExchangeChannel中会阻塞timeout的时间来等待结果,该timeout就是⽤户在消费端所配置的timeout

服务提供端执⾏逻辑

  1. NettyServerHandler:接收数据
    2. MultiMessageHandler:判断接收到的数据是否是MultiMessage,如果是则获取MultiMessage中的
    单个Message,传递给HeartbeatHandler进⾏处理
    3. HeartbeatHandler:判断是不是⼼跳消息,如果是不是则把Message传递给AllChannelHandler
    4. AllChannelHandler:把接收到的Message封装为⼀个ChannelEventRunnable对象,扔给线程池进⾏
    处理
    5. ChannelEventRunnable:在ChannelEventRunnable的run⽅法中会调⽤DecodeHandler处理
    Message
    6. DecodeHandler:按Dubbo协议的数据格式,解析当前请求的path,versio,⽅法,⽅法参数等等,
    然后把解析好了的请求交给HeaderExchangeHandler
    7. HeaderExchangeHandler:处理Request数据,⾸先构造⼀个Response对象,然后调⽤
    ExchangeHandlerAdapter得到⼀个CompletionStage future,然后给future通过whenComplete绑
    定⼀个回调函数,当future执⾏完了之后,就可以从回调函数中得到ExchangeHandlerAdapter的执⾏
    结果,并把执⾏结果设置给Response对象,通过channel发送出去。
    8. ExchangeHandlerAdapter:从本机已经导出的Exporter中根据当前Request所对应的服务key,去寻
    找Exporter对象,从Exporter中得到Invoker,然后执⾏invoke⽅法,此Invoker为
    ProtocolFilterWrapper$CallbackRegistrationInvoker
    9. ProtocolFilterWrapper$CallbackRegistrationInvoker:负责执⾏过滤器链,并且在执⾏完了之后回
    调每个过滤器的onResponse或onError⽅法
    10. EchoFilter:判断当前请求是不是⼀个回升测试,如果是,则不继续执⾏过滤器链了(服务实现者
    Invoker也不会调⽤了)
    11. ClassLoaderFilter:设置当前线程的classloader为当前要执⾏的服务接⼝所对应的classloader
    12. GenericFilter:把泛化调⽤发送过来的信息包装为RpcInvocation对象
    13. ContextFilter:设置RpcContext.getContext()的参数
    14. TraceFilter:先执⾏下⼀个invoker的invoke⽅法,调⽤成功后录调⽤信息
    15. TimeoutFilter:调⽤时没有特别处理,只是记录了⼀下当前时间,当整个filter链都执⾏完了之后回调
    TimeoutFilter的onResponse⽅法时,会判断本次调⽤是否超过了timeout
    16. MonitorFilter:记录当前服务的执⾏次数
    17. ExceptionFilter:调⽤时没有特别处理,在回调onResponse⽅法时,对不同的异常进⾏处理,详解
    Dubbo的异常处理
    18. DelegateProviderMetaDataInvoker:过滤器链结束,调⽤下⼀个Invoker
    19. AbstractProxyInvoker:在服务导出时,根据服务接⼝,服务实现类对象⽣成的,它的invoke⽅法就
    会执⾏服务实现类对象的⽅法,得到结果

Dubbo的异常处理

当服务消费者在调⽤⼀个服务时,服务提供者在执⾏服务逻辑时可能会出现异常,对于Dubbo来说,服务
消费者需要在消费端抛出这个异常,那么这个功能是怎么做到的呢?
服务提供者在执⾏服务时,如果出现了异常,那么框架会把异常捕获,捕获异常的逻辑在
AbstractProxyInvoker中,捕获到异常后,会把异常信息包装为正常的AppResponse对象,只是
AppResponse的value属性没有值,exception属性有值。
此后,服务提供者会把这个AppResponse对象发送给服务消费端,服务消费端是在
InvokerInvocationHandler中调⽤AppResponse的recreate⽅法重新得到⼀个结果,在recreate⽅法中
会去失败AppResponse对象是否正常,也就是是否存在exception信息,如果存在,则直接throw这个
exception,从⽽做到服务执⾏时出现的异常,在服务消费端抛出。
那么这⾥存在⼀个问题,如果服务提供者抛出的异常类,在服务消费者这边不存在,那么服务消费者也就
抛不出这个异常了,那么dubbo是怎么处理的呢?
这⾥就涉及到了ExceptionFilter,它是服务提供者端的⼀个过滤器,它主要是在服务提供者执⾏完服务后
会去识别异常:
1. 如果是需要开发⼈员捕获的异常,那么忽略,直接把这个异常返回给消费者
2. 如果在当前所执⾏的⽅法签名上有声明,那么忽略,直接把这个异常返回给消费者
3. 如果抛出的异常不需要开发⼈员捕获,或者⽅法上没有申明,那么服务端或记录⼀个error⽇志
4. 异常类和接⼝类在同⼀jar包⾥,那么忽略,直接把这个异常返回给消费者
5. 如果异常类是JDK⾃带的异常,那么忽略,直接把这个异常返回给消费者
6. 如果异常类是Dubbo⾃带的异常,那么忽略,直接把这个异常返回给消费者
7. 否则,把异常信息包装成RuntimeException,并覆盖AppResponse对象中的exception属性