服务消费端发起调用
服务消费端通过 ReferenceConfig 的 get 方法返回的是一个代理类,并且方法拦截器为 InvokerInvocationHandler。所以当消费方调用了服务的接口方法后会被 InvokerInvocationHandler 拦截,执行如下逻辑:
@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {......// 创建RpcInvocation,封装一些服务接口相关的信息// method为调用的接口方法,args为方法入参信息RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), protocolServiceKey, args);String serviceKey = invoker.getUrl().getServiceKey();rpcInvocation.setTargetServiceUniqueName(serviceKey);// 发起方法调用return invoker.invoke(rpcInvocation).recreate();}
这里实际调用的 invoker 对象就是服务消费端启动时初始化的 FailoverClusterInvoker 对象。注意,这个 invoker 对象在创建时会有一些 AOP 的增强类对其进行包装,这里不讨论增强逻辑,它最终会调用 FailoverClusterInvoker 的 invoke 方法进行远程方法调用。
1. FailoverClusterInvoker
Dubbo 提供了多种集群容错策略,FailoverClusterInvoker 是默认的集群容错策略。
FailoverClusterInvoker:失败重试。当服务消费方调用服务提供者失败后,会自动切换到其他服务提供者服务器进行重试,这通常用于读操作或具有零等的写操作。
FailfastClusterInvoker:快速失败。当服务消费方调用服务提供者失败后,立即报错,也就是只调用一次。通常,这种模式用于非幂等性的写操作。
FailsafeClusterInvoker:安全失败。当服务消费者调用服务出现异常时,直接忽略异常。这种模式通常用于写入审计日志等不重要的操作。
FailbackClusterInvoker:失败自动恢复。当服务消费端调用服务出现异常后,在后台记录失败的请求,并按照一定的策略后期再进行重试。这种模式通常用于消息通知操作。
ForkingClusterInvoker:并行调用。当消费方调用一个接口方法后,Dubbo Client 会并行调用多个服务提供者的服务,只要其中有一个成功即返回。这种模式通常用于实时性要求较高的读操作,但需要浪费更多的服务资源。
BroadcastClusterInvoker:广播调用。当消费者调用一个接口方法后,Dubbo Client 会逐个调用所有服务提供者,任意一台服务器调用异常则这次调用就标志失败,这种模式通常用于通知所有提供者更新缓存或日志等本地资源信息。
FailoverClusterInvoker 继承了 AbstractClusterInvoker 类,在该父类中提供了 invoke 方法的具体实现:
@Overridepublic Result invoke(final Invocation invocation) throws RpcException {// 获取所有远程服务调用列表List<Invoker<T>> invokers = list(invocation);// 初始化负载均衡策略LoadBalance loadbalance = initLoadBalance(invokers, invocation);return doInvoke(invocation, invokers, loadbalance);}
其中,doInvoke 方法是由具体的子类实现的,FailoverClusterInvoker 实现的 doInvoke 逻辑如下:
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {List<Invoker<T>> copyInvokers = invokers;String methodName = RpcUtils.getMethodName(invocation);// 根据 retries 参数确定重试次数int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;if (len <= 0) {len = 1;}// 进行重试循环RpcException le = null; // last exception.List<Invoker<T>> invoked = new ArrayList<>(copyInvokers.size());Set<String> providers = new HashSet<>(len);for (int i = 0; i < len; i++) {// 根据负载均衡策略选择一个调用者Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);invoked.add(invoker);RpcContext.getContext().setInvokers((List) invoked);try {// 发起远程调用Result result = invoker.invoke(invocation);return result;} catch (RpcException e) {// 根据异常类型判断是直接抛出异常还是进行重试if (e.isBiz()) {throw e;}le = e;} catch (Throwable e) {// 记录异常信息le = new RpcException(e.getMessage(), e);} finally {providers.add(invoker.getUrl().getAddress());}}// 如果执行了指定重试次数,仍未执行成功,则抛出异常throw new RpcException(le.getCode(), "Failed to invoke the method "+ methodName + " in the service " + getInterface().getName()+ ". Tried " + len + " times of the providers " + providers+ " (" + providers.size() + "/" + copyInvokers.size()+ ") from the registry " + directory.getUrl().getAddress()+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "+ Version.getVersion() + ". Last error is: "+ le.getMessage(), le.getCause() != null ? le.getCause() : le);}
默认的集群容错策略 FailoverClusterInvoker,其内部首先根据设置的负载均衡策略 LoadBalance 的扩展实现,从 Directory 返回的 Invoker 列表中选择一个 Invoker 作为 FailoverClusterInvoker 具体的远程调用者,如果调用发生异常,则重新选择一个 Invoker 进行调用。当达到最大重试次数后,不再重试,抛出调用异常。
2. LoadBalance
当服务提供方是集群时,为了避免大量请求一直集中在一个或者几个服务提供方机器上,从而使这些机器负载很高,其至导致服务不可用,就需要做一定的负载均衡策略。Dubbo 提供了多种负载均衡策略,默认为 random,也就是每次随机调用一台服务提供者的服务。Dubbo 提供的负载均衡策略有如下几种:
- RandomLoadBalance:随机策略。按照概率设置权重,比较均匀,并且可以动态调节提供者的权重。
- RoundRobinLoadBalance:轮训策略。按公约后的权重设置轮训比例,会存在执行比较慢的服务提供者堆积请求的情況,比如一个机器执行得非常慢,但是机器没有宕机,当很多新的请求到达该机器后,由于之前的请求还没处理完,会导致新的请求被堆积,久而久之,消费者调用这台机器上的所有请求都会被阻塞。
- LeastActiveLoadBalance:最少活跃调用数。如果每个提供者的活跃数相同,则随机选择一个。在每个服务提供者里维护着一个活跃数计数器,用来记录当前同时处理请求的个数,也就是并发处理任务的个数。这个值越小,说明当前服务提供者处理的速度越快或者当前机器的负载此较低,所以路由选择时就选择该活跃度最低的机器。如果一个服务提供者处理速度很慢,由于堆积,那么同时处理的请求就比较多,也就是活跃调用数较大(活跃度较高),这时,处理速度慢的提供者将收到更少的请求。
- ConsistentHashLoadBalance:一致性 Hash 策略。一致性 Hash 可以保证相同参数的请求总是发到同一提供者,当某一台提供者机器宕机时,原本发往该提供者的请求,将基于虚拟节点平推给其他提供者,这样就不会引起剧烈变动。
AbstractLoadBalance 实现了 LoadBalance 接口,上面的各种负载均衡策略实际上就是继承了这个抽象类,但重写了其 doSelect 方法,下面我们看下默认的 RandomLoadBalance 策略的 doSelect 方法实现:
@Overrideprotected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {// 服务提供者的总个数int length = invokers.size();// 所有服务提供者的权重是否都是一样的boolean sameWeight = true;// 存放所有服务提供者设置的权重int[] weights = new int[length];// 总权重,用来进行单个Invoker的权重比例计算int totalWeight = 0;for (int i = 0; i < length; i++) {// 累加所有服务提供者设置的权重int weight = getWeight(invokers.get(i), invocation);totalWeight += weight;weights[i] = totalWeight;if (sameWeight && totalWeight != weight * (i + 1)) {sameWeight = false;}}// 如果所有服务提供者的权重并不都是一样的,并且至少有一个提供者的权重大于0,则基于总权重随机选择一个if (totalWeight > 0 && !sameWeight) {// 基于随机值返回一个Invoker,判断这个随机值落在哪个权重区间内就选择哪个Invokerint offset = ThreadLocalRandom.current().nextInt(totalWeight);for (int i = 0; i < length; i++) {if (offset < weights[i]) {return invokers.get(i);}}}// 随机选择return invokers.get(ThreadLocalRandom.current().nextInt(length));}
这里生成随机数采用的是 ThreadLocalRandom 类,这是出于性能上的考虑,因为 Random 在高并发下会导致大量线程竞争同一个原子变量,导致大量线程原地自旋,从而浪费 CPU 资源。
3. DubboInvoker
在上面代码第 20 行进行远程调用时使用的 invoker 对象,也是一个经过层层增强的包装类,但最终调用到的是原生的 DubboInvoker 中的 invoke 方法,其使用 NettyClient 与服务提供者进行交互,其核心逻辑主要在 DubboInvoker 中的 doInvoke 方法中,具体逻辑如下:
@Overrideprotected Result doInvoke(final Invocation invocation) throws Throwable {// 设置Invocation附加属性RpcInvocation inv = (RpcInvocation) invocation;final String methodName = RpcUtils.getMethodName(invocation);inv.setAttachment(PATH_KEY, getUrl().getPath());inv.setAttachment(VERSION_KEY, version);// 获取用于远程调用的NettyClientExchangeClient currentClient;if (clients.length == 1) {currentClient = clients[0];} else {currentClient = clients[index.getAndIncrement() % clients.length];}// 执行远程调用try {boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);// 超时等待时间int timeout = calculateTimeout(invocation, methodName);invocation.put(TIMEOUT_KEY, timeout);// 是否为Oneway,也就是不需要响应结果的请求if (isOneway) {// 不需要响应的请求boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);currentClient.send(inv, isSent);return AsyncRpcResult.newDefaultAsyncResult(invocation);} else {// 获取消费端线程池,用来执行CompletableFuture的回调逻辑ExecutorService executor = getCallbackExecutor(getUrl(), inv);// request方法用来通过NettyClient与服务提供者进行请求,这里不会阻塞获取响应结果CompletableFuture<AppResponse> appResponseFuture =currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);return result;}} catch (TimeoutException e) {throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);} catch (RemotingException e) {throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);}}
request 方法的核心逻辑如下:
@Overridepublic CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {// 创建请求信息Request req = new Request();req.setVersion(Version.getProtocolVersion());req.setTwoWay(true);req.setData(request);// 将channel封装到Future中,后续接收响应信息都在这个Future中DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);try {// 发送出去后,主线程就直接返回了channel.send(req);} catch (RemotingException e) {future.cancel();throw e;}return future;}
可以看到,服务消费者并不是直接阻塞等待提供者的响应的,而是通过回调建立了一个异步逻辑,避免了主线程一直阻塞。并且,doInvoke 方法最终返回的是一个 AsyncRpcResult 对象,这个对象只有在真正获取结果信息时才会去尝试阻塞获取 Future 的值。当服务消费端需要将调用结果返回给调用方时,会执行 Result 接口的 getValue 方法,而 AsyncRpcResult 重写了该方法:
public class AsyncRpcResult implements Result {@Overridepublic Object getValue() {return getAppResponse().getValue();}}public Result getAppResponse() {try {if (responseFuture.isDone()) {// 调用Future的get方法阻塞主线程直到获取了响应结果return responseFuture.get();}} catch (Exception e) {throw new RpcException(e);}return createDefaultValue(invocation);}
服务提供端处理请求
服务提供端在暴露服务时,会启动一个 NettyServer 服务器监听指定端口的连接,当接收到服务消费端发来的请求时,服务提供方的 NettyServer 的 connect 方法会被激活,该方法的执行是在 Netty 的 I/O 线程上执行的,为了可以及时释放 I/O 线程,Netty 默认的线程模型为 ALL,即所有的消息都派发到 Dubbo 内部的业务线程池,这些消息包括请求事件、响应事件、连接事件、断开事件、心跳事件等。
1. Dubbo 线程模型
Dubbo 默认的底层网络通信使用的是 Netty,服务提供方 Netty Server 使用两级线程池,其中 EventLoopGroup(boss) 主要用来接收客户端的连接请求,并把完成 TCP 三次握手的连接分发给 EventLoopGroup(worker) 来处理,我们把 boss 和 worker 线程组称为 IO 线程。
如果服务提供方的逻辑处理能迅速完成,并且不会发起新的 IO 请求,那么直接在 IO 线程上处理会更快,因为这样减少了线程池调度与上下文切换的开销。但如果处理逻辑较慢,或者需要发起新的 IO 请求,比如需要查询数据库,则 IO 线程必须派发请求到新的线程池进行处理,否则 IO 线程会被阻寒,导致不能接收其他请求,根据请求的消息类是被 IO 线程处理还是被业务线程池处理,Dubbo 提供了下面几种线程模型:
all(AllDispatcher 类):所有的消息都派发到 Dubbo 内部的业务线程池,这些消息包括请求、响应、连接事件、断开事件、心跳事件等。它是默认的线程模型。
direct(DirectDispatcher 类):所有的消息都不派发到 Dubbo 内部的业务线程池,全部在 IO 线程上直接执行。
message(MessageOnlyDispatcher 类):只有请求响应消息派发到 Dubbo 内部的业务线程池,其他消息如连接事件、断开事件、心跳事件等,直接在 IO 线程上执行。
execution(ExecutionDispatcher 类):只把请求类消息派发到 Dubbo 内部的业务线程池,其他消息如响应、连接事件、断开事件、心跳事件等,直接在 IO 线程上执行。
connection(ConnectionOrderedDispatcher 类):在 IO 线程上将连接事件、断开事件放入队列,有序地逐个执行,其他消息派发到 Dubbo 内部的业务线程池。
2. AllChannelHandler
AllDispatcher 类的具体实现在 AllChannelHandler 中。所以,NettyServer 的 connect 方法被激活后,最终执行的是 AllChannelHandler 中的 connect 方法。AllChannelHandler 类会把 I/O 线程接收到的所有消息包装为 ChannelEventRunnable 任务并投递到线程池。
@Overridepublic void connected(Channel channel) throws RemotingException {ExecutorService executor = getExecutorService();try {// 交给Dubbo内部的业务线程池处理,不阻塞IO线程executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));} catch (Throwable t) {throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);}}
线程池里的任务被执行后,最终会调用到 DubboProtocol 的 connected 方法:
@Overridepublic void connected(Channel channel) throws RemotingException {invoke(channel, ON_CONNECT_KEY);}
3. DubboProtocol
DubboProtocol 的 invoke 方法是一个通用方法,其逻辑如下:
private void invoke(Channel channel, String methodKey) {// 创建Invocation对象Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);if (invocation != null) {try {// 接收服务消费端传过来的数据received(channel, invocation);} catch (Throwable t) {logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);}}}@Overridepublic void received(Channel channel, Object message) throws RemotingException {// 可以看到,入参message的类型为Invocation,所以这里会调用reply方法if (message instanceof Invocation) {reply((ExchangeChannel) channel, message);} else {super.received(channel, message);}}
reply 方法逻辑如下:
@Overridepublic CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {Invocation inv = (Invocation) message;// 获取该调用服务对应的 Invoker 对象Invoker<?> invoker = getInvoker(channel, inv);// 判断调用的方法是否存在......// 将对端地址放到上下文对象中RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());// 执行invoke调用链Result result = invoker.invoke(inv);return result.thenApply(Function.identity());}
在 getInvoker 方法中通过 DubboExporter 对象来获取对应 Invoker 对象:
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {String serviceKey = serviceKey(port, path,(String) inv.getObjectAttachments().get(VERSION_KEY),(String) inv.getObjectAttachments().get(GROUP_KEY));// 在服务提供方暴露服务时,会将DubboExporter缓存在exporterMap中,所以这里直接从缓冲中获取DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);return exporter.getInvoker();}
这里导出的 Invoker 内部有个调用链,经过调用链后最终调用了服务提供方启动时 AbstractProxyInvoker 代理类的 invoke 方法,其代码如下:
public Result invoke(Invocation invocation) throws RpcException {try {// 具体执行本地服务调用Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());CompletableFuture<Object> future = wrapWithFuture(value);// 将结果封装成AsyncRpcResult,用以异步获取方法调用结果CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {AppResponse result = new AppResponse(invocation);if (t != null) {if (t instanceof CompletionException) {result.setException(t.getCause());} else {result.setException(t);}} else {result.setValue(obj);}return result;});return new AsyncRpcResult(appResponseFuture, invocation);}......}
4. 执行 Invoker
在服务提供者暴露服务时,创建的 Invoker 逻辑如下:
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);return new AbstractProxyInvoker<T>(proxy, type, url) {@Overrideprotected Object doInvoke(T proxy, String methodName,Class<?>[] parameterTypes,Object[] arguments) throws Throwable {return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);}};}
所以,上面调用的 doInvoke 方法实际调用的是 Wrapper 包装类的 invokeMethod 方法。这个 Wrapper 实例是通过 Javassist 动态代理生成的类,具体逻辑就是调用服务提供方接口的实现类来执行本地服务。
