服务消费端发起调用
服务消费端通过 ReferenceConfig 的 get 方法返回的是一个代理类,并且方法拦截器为 InvokerInvocationHandler。所以当消费方调用了服务的接口方法后会被 InvokerInvocationHandler 拦截,执行如下逻辑:
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
......
// 创建RpcInvocation,封装一些服务接口相关的信息
// method为调用的接口方法,args为方法入参信息
RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), protocolServiceKey, args);
String serviceKey = invoker.getUrl().getServiceKey();
rpcInvocation.setTargetServiceUniqueName(serviceKey);
// 发起方法调用
return invoker.invoke(rpcInvocation).recreate();
}
这里实际调用的 invoker 对象就是服务消费端启动时初始化的 FailoverClusterInvoker 对象。注意,这个 invoker 对象在创建时会有一些 AOP 的增强类对其进行包装,这里不讨论增强逻辑,它最终会调用 FailoverClusterInvoker 的 invoke 方法进行远程方法调用。
1. FailoverClusterInvoker
Dubbo 提供了多种集群容错策略,FailoverClusterInvoker 是默认的集群容错策略。
FailoverClusterInvoker:失败重试。当服务消费方调用服务提供者失败后,会自动切换到其他服务提供者服务器进行重试,这通常用于读操作或具有零等的写操作。
FailfastClusterInvoker:快速失败。当服务消费方调用服务提供者失败后,立即报错,也就是只调用一次。通常,这种模式用于非幂等性的写操作。
FailsafeClusterInvoker:安全失败。当服务消费者调用服务出现异常时,直接忽略异常。这种模式通常用于写入审计日志等不重要的操作。
FailbackClusterInvoker:失败自动恢复。当服务消费端调用服务出现异常后,在后台记录失败的请求,并按照一定的策略后期再进行重试。这种模式通常用于消息通知操作。
ForkingClusterInvoker:并行调用。当消费方调用一个接口方法后,Dubbo Client 会并行调用多个服务提供者的服务,只要其中有一个成功即返回。这种模式通常用于实时性要求较高的读操作,但需要浪费更多的服务资源。
BroadcastClusterInvoker:广播调用。当消费者调用一个接口方法后,Dubbo Client 会逐个调用所有服务提供者,任意一台服务器调用异常则这次调用就标志失败,这种模式通常用于通知所有提供者更新缓存或日志等本地资源信息。
FailoverClusterInvoker 继承了 AbstractClusterInvoker 类,在该父类中提供了 invoke 方法的具体实现:
@Override
public Result invoke(final Invocation invocation) throws RpcException {
// 获取所有远程服务调用列表
List<Invoker<T>> invokers = list(invocation);
// 初始化负载均衡策略
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
return doInvoke(invocation, invokers, loadbalance);
}
其中,doInvoke 方法是由具体的子类实现的,FailoverClusterInvoker 实现的 doInvoke 逻辑如下:
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
String methodName = RpcUtils.getMethodName(invocation);
// 根据 retries 参数确定重试次数
int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// 进行重试循环
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<>(copyInvokers.size());
Set<String> providers = new HashSet<>(len);
for (int i = 0; i < len; i++) {
// 根据负载均衡策略选择一个调用者
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
// 发起远程调用
Result result = invoker.invoke(invocation);
return result;
} catch (RpcException e) {
// 根据异常类型判断是直接抛出异常还是进行重试
if (e.isBiz()) {
throw e;
}
le = e;
} catch (Throwable e) {
// 记录异常信息
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
// 如果执行了指定重试次数,仍未执行成功,则抛出异常
throw new RpcException(le.getCode(), "Failed to invoke the method "
+ methodName + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ le.getMessage(), le.getCause() != null ? le.getCause() : le);
}
默认的集群容错策略 FailoverClusterInvoker,其内部首先根据设置的负载均衡策略 LoadBalance 的扩展实现,从 Directory 返回的 Invoker 列表中选择一个 Invoker 作为 FailoverClusterInvoker 具体的远程调用者,如果调用发生异常,则重新选择一个 Invoker 进行调用。当达到最大重试次数后,不再重试,抛出调用异常。
2. LoadBalance
当服务提供方是集群时,为了避免大量请求一直集中在一个或者几个服务提供方机器上,从而使这些机器负载很高,其至导致服务不可用,就需要做一定的负载均衡策略。Dubbo 提供了多种负载均衡策略,默认为 random,也就是每次随机调用一台服务提供者的服务。Dubbo 提供的负载均衡策略有如下几种:
- RandomLoadBalance:随机策略。按照概率设置权重,比较均匀,并且可以动态调节提供者的权重。
- RoundRobinLoadBalance:轮训策略。按公约后的权重设置轮训比例,会存在执行比较慢的服务提供者堆积请求的情況,比如一个机器执行得非常慢,但是机器没有宕机,当很多新的请求到达该机器后,由于之前的请求还没处理完,会导致新的请求被堆积,久而久之,消费者调用这台机器上的所有请求都会被阻塞。
- LeastActiveLoadBalance:最少活跃调用数。如果每个提供者的活跃数相同,则随机选择一个。在每个服务提供者里维护着一个活跃数计数器,用来记录当前同时处理请求的个数,也就是并发处理任务的个数。这个值越小,说明当前服务提供者处理的速度越快或者当前机器的负载此较低,所以路由选择时就选择该活跃度最低的机器。如果一个服务提供者处理速度很慢,由于堆积,那么同时处理的请求就比较多,也就是活跃调用数较大(活跃度较高),这时,处理速度慢的提供者将收到更少的请求。
- ConsistentHashLoadBalance:一致性 Hash 策略。一致性 Hash 可以保证相同参数的请求总是发到同一提供者,当某一台提供者机器宕机时,原本发往该提供者的请求,将基于虚拟节点平推给其他提供者,这样就不会引起剧烈变动。
AbstractLoadBalance 实现了 LoadBalance 接口,上面的各种负载均衡策略实际上就是继承了这个抽象类,但重写了其 doSelect 方法,下面我们看下默认的 RandomLoadBalance 策略的 doSelect 方法实现:
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// 服务提供者的总个数
int length = invokers.size();
// 所有服务提供者的权重是否都是一样的
boolean sameWeight = true;
// 存放所有服务提供者设置的权重
int[] weights = new int[length];
// 总权重,用来进行单个Invoker的权重比例计算
int totalWeight = 0;
for (int i = 0; i < length; i++) {
// 累加所有服务提供者设置的权重
int weight = getWeight(invokers.get(i), invocation);
totalWeight += weight;
weights[i] = totalWeight;
if (sameWeight && totalWeight != weight * (i + 1)) {
sameWeight = false;
}
}
// 如果所有服务提供者的权重并不都是一样的,并且至少有一个提供者的权重大于0,则基于总权重随机选择一个
if (totalWeight > 0 && !sameWeight) {
// 基于随机值返回一个Invoker,判断这个随机值落在哪个权重区间内就选择哪个Invoker
int offset = ThreadLocalRandom.current().nextInt(totalWeight);
for (int i = 0; i < length; i++) {
if (offset < weights[i]) {
return invokers.get(i);
}
}
}
// 随机选择
return invokers.get(ThreadLocalRandom.current().nextInt(length));
}
这里生成随机数采用的是 ThreadLocalRandom 类,这是出于性能上的考虑,因为 Random 在高并发下会导致大量线程竞争同一个原子变量,导致大量线程原地自旋,从而浪费 CPU 资源。
3. DubboInvoker
在上面代码第 20 行进行远程调用时使用的 invoker 对象,也是一个经过层层增强的包装类,但最终调用到的是原生的 DubboInvoker 中的 invoke 方法,其使用 NettyClient 与服务提供者进行交互,其核心逻辑主要在 DubboInvoker 中的 doInvoke 方法中,具体逻辑如下:
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
// 设置Invocation附加属性
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
// 获取用于远程调用的NettyClient
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
// 执行远程调用
try {
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
// 超时等待时间
int timeout = calculateTimeout(invocation, methodName);
invocation.put(TIMEOUT_KEY, timeout);
// 是否为Oneway,也就是不需要响应结果的请求
if (isOneway) {
// 不需要响应的请求
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
// 获取消费端线程池,用来执行CompletableFuture的回调逻辑
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
// request方法用来通过NettyClient与服务提供者进行请求,这里不会阻塞获取响应结果
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
return result;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
request 方法的核心逻辑如下:
@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
// 创建请求信息
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
// 将channel封装到Future中,后续接收响应信息都在这个Future中
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
try {
// 发送出去后,主线程就直接返回了
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
可以看到,服务消费者并不是直接阻塞等待提供者的响应的,而是通过回调建立了一个异步逻辑,避免了主线程一直阻塞。并且,doInvoke 方法最终返回的是一个 AsyncRpcResult 对象,这个对象只有在真正获取结果信息时才会去尝试阻塞获取 Future 的值。当服务消费端需要将调用结果返回给调用方时,会执行 Result 接口的 getValue 方法,而 AsyncRpcResult 重写了该方法:
public class AsyncRpcResult implements Result {
@Override
public Object getValue() {
return getAppResponse().getValue();
}
}
public Result getAppResponse() {
try {
if (responseFuture.isDone()) {
// 调用Future的get方法阻塞主线程直到获取了响应结果
return responseFuture.get();
}
} catch (Exception e) {
throw new RpcException(e);
}
return createDefaultValue(invocation);
}
服务提供端处理请求
服务提供端在暴露服务时,会启动一个 NettyServer 服务器监听指定端口的连接,当接收到服务消费端发来的请求时,服务提供方的 NettyServer 的 connect 方法会被激活,该方法的执行是在 Netty 的 I/O 线程上执行的,为了可以及时释放 I/O 线程,Netty 默认的线程模型为 ALL,即所有的消息都派发到 Dubbo 内部的业务线程池,这些消息包括请求事件、响应事件、连接事件、断开事件、心跳事件等。
1. Dubbo 线程模型
Dubbo 默认的底层网络通信使用的是 Netty,服务提供方 Netty Server 使用两级线程池,其中 EventLoopGroup(boss) 主要用来接收客户端的连接请求,并把完成 TCP 三次握手的连接分发给 EventLoopGroup(worker) 来处理,我们把 boss 和 worker 线程组称为 IO 线程。
如果服务提供方的逻辑处理能迅速完成,并且不会发起新的 IO 请求,那么直接在 IO 线程上处理会更快,因为这样减少了线程池调度与上下文切换的开销。但如果处理逻辑较慢,或者需要发起新的 IO 请求,比如需要查询数据库,则 IO 线程必须派发请求到新的线程池进行处理,否则 IO 线程会被阻寒,导致不能接收其他请求,根据请求的消息类是被 IO 线程处理还是被业务线程池处理,Dubbo 提供了下面几种线程模型:
all(AllDispatcher 类):所有的消息都派发到 Dubbo 内部的业务线程池,这些消息包括请求、响应、连接事件、断开事件、心跳事件等。它是默认的线程模型。
direct(DirectDispatcher 类):所有的消息都不派发到 Dubbo 内部的业务线程池,全部在 IO 线程上直接执行。
message(MessageOnlyDispatcher 类):只有请求响应消息派发到 Dubbo 内部的业务线程池,其他消息如连接事件、断开事件、心跳事件等,直接在 IO 线程上执行。
execution(ExecutionDispatcher 类):只把请求类消息派发到 Dubbo 内部的业务线程池,其他消息如响应、连接事件、断开事件、心跳事件等,直接在 IO 线程上执行。
connection(ConnectionOrderedDispatcher 类):在 IO 线程上将连接事件、断开事件放入队列,有序地逐个执行,其他消息派发到 Dubbo 内部的业务线程池。
2. AllChannelHandler
AllDispatcher 类的具体实现在 AllChannelHandler 中。所以,NettyServer 的 connect 方法被激活后,最终执行的是 AllChannelHandler 中的 connect 方法。AllChannelHandler 类会把 I/O 线程接收到的所有消息包装为 ChannelEventRunnable 任务并投递到线程池。
@Override
public void connected(Channel channel) throws RemotingException {
ExecutorService executor = getExecutorService();
try {
// 交给Dubbo内部的业务线程池处理,不阻塞IO线程
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
}
}
线程池里的任务被执行后,最终会调用到 DubboProtocol 的 connected 方法:
@Override
public void connected(Channel channel) throws RemotingException {
invoke(channel, ON_CONNECT_KEY);
}
3. DubboProtocol
DubboProtocol 的 invoke 方法是一个通用方法,其逻辑如下:
private void invoke(Channel channel, String methodKey) {
// 创建Invocation对象
Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
if (invocation != null) {
try {
// 接收服务消费端传过来的数据
received(channel, invocation);
} catch (Throwable t) {
logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
}
}
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
// 可以看到,入参message的类型为Invocation,所以这里会调用reply方法
if (message instanceof Invocation) {
reply((ExchangeChannel) channel, message);
} else {
super.received(channel, message);
}
}
reply 方法逻辑如下:
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
Invocation inv = (Invocation) message;
// 获取该调用服务对应的 Invoker 对象
Invoker<?> invoker = getInvoker(channel, inv);
// 判断调用的方法是否存在
......
// 将对端地址放到上下文对象中
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
// 执行invoke调用链
Result result = invoker.invoke(inv);
return result.thenApply(Function.identity());
}
在 getInvoker 方法中通过 DubboExporter 对象来获取对应 Invoker 对象:
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
String serviceKey = serviceKey(
port, path,
(String) inv.getObjectAttachments().get(VERSION_KEY),
(String) inv.getObjectAttachments().get(GROUP_KEY)
);
// 在服务提供方暴露服务时,会将DubboExporter缓存在exporterMap中,所以这里直接从缓冲中获取
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
return exporter.getInvoker();
}
这里导出的 Invoker 内部有个调用链,经过调用链后最终调用了服务提供方启动时 AbstractProxyInvoker 代理类的 invoke 方法,其代码如下:
public Result invoke(Invocation invocation) throws RpcException {
try {
// 具体执行本地服务调用
Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
CompletableFuture<Object> future = wrapWithFuture(value);
// 将结果封装成AsyncRpcResult,用以异步获取方法调用结果
CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
AppResponse result = new AppResponse(invocation);
if (t != null) {
if (t instanceof CompletionException) {
result.setException(t.getCause());
} else {
result.setException(t);
}
} else {
result.setValue(obj);
}
return result;
});
return new AsyncRpcResult(appResponseFuture, invocation);
}
......
}
4. 执行 Invoker
在服务提供者暴露服务时,创建的 Invoker 逻辑如下:
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
所以,上面调用的 doInvoke 方法实际调用的是 Wrapper 包装类的 invokeMethod 方法。这个 Wrapper 实例是通过 Javassist 动态代理生成的类,具体逻辑就是调用服务提供方接口的实现类来执行本地服务。