系统分层

- config 配置层:对外配置接口,以 ServiceConfig, ReferenceConfig 为中心,可以直接初始化配置类,也可以通过 spring 解析配置生成配置类
- proxy 服务代理层:服务接口透明代理,生成服务的客户端 Stub 和服务器端 Skeleton, 以 ServiceProxy 为中心,扩展接口为 ProxyFactory
- registry 注册中心层:封装服务地址的注册与发现,以服务 URL 为中心,扩展接口为 RegistryFactory, Registry, RegistryService
- cluster 路由层:封装多个提供者的路由及负载均衡,并桥接注册中心,以 Invoker 为中心,扩展接口为 Cluster, Directory, Router, LoadBalance
- monitor 监控层:RPC 调用次数和调用时间监控,以 Statistics 为中心,扩展接口为 MonitorFactory, Monitor, MonitorService
- protocol 远程调用层:封装 RPC 调用,以 Invocation, Result 为中心,扩展接口为 Protocol, Invoker, Exporter
- exchange 信息交换层:封装请求响应模式,同步转异步,以 Request, Response 为中心,扩展接口为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer
- transport 网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec
serialize 数据序列化层:可复用的一些工具,扩展接口为 Serialization, ObjectInput, ObjectOutput, ThreadPool
调用示例
dubbo服务
public class MyDemoServiceImpl implements MyDemoService {@Overridepublic String sayHello(String name) {return name;}@Overridepublic String sayHello(int i) {return String.valueOf(i);}}
provider启动
public static void main(String[] args) throws Exception {ServiceConfig<MyDemoServiceImpl> service = new ServiceConfig<>();service.setInterface(MyDemoService.class);service.setRef(new MyDemoServiceImpl());DubboBootstrap bootstrap = DubboBootstrap.getInstance();bootstrap.application(new ApplicationConfig("dubbo-demo-api-provider")).registry(new RegistryConfig("zookeeper://127.0.0.1:2181")).service(service).start().await();}
消费者调用服务
public static void main(String[] args) {ReferenceConfig<MyDemoService> reference = new ReferenceConfig<>();reference.setInterface(MyDemoService.class);DubboBootstrap bootstrap = DubboBootstrap.getInstance();bootstrap.application(new ApplicationConfig("dubbo-demo-api-consumer")).registry(new RegistryConfig("zookeeper://127.0.0.1:2181")).reference(reference).start();MyDemoService demoService = ReferenceConfigCache.getCache().get(reference);String message = demoService.sayHello("dubbo");String result = demoService.sayHello(1);System.out.println(message);}
服务导出

- 通过JavassistProxyFactory为服务生成代理对象Invoker。其中会调用Wrapper的getWrapper方法生成代理类。可以通过参数调用真正的服务方法
- 通过InJvmProtocol暴露本地服务。当provider与consumer在同一工程中使用本地服务。将invoker存入InJvmProtocol的exporterMap中。可通过@Reference注解的injvm参数进行关闭
- 向DubboProtocol中的exporterMap中存储provider代理对象。key:分组名+接口全路径名+版本
- 启动NettyServer并绑定端口号
- 通过ZookeeperRegistry(继承自FailbackRegistry)向zk中创建服务节点。
完整路径为:
zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.19.20.86%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-api-provider%26bind.ip%3D172.19.20.86%26bind.port%3D20880%26default%3Dtrue%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%2CsayHelloAsync%26pid%3D1693%26release%3D%26side%3Dprovider%26timestamp%3D1656563296673&pid=1693×tamp=1656563296662
其中向注册中心注册的路径为/dubbo/org.apache.dubbo.demo.DemoService/provider,127.0.0.1
上述案例中存在方法重载,但是dubbo服务暴露时只会注册一个方法,在真正调用时通过参数类型调用具体的重载方法
-
服务引用

先查询本进程内是否有该服务,如果有,直接从InJvmProtocol中获取
- 获取注册中心配置,获取所有注册中心信息
- 循环注册中心,获取对应的服务信息。上述案例中通过ZookeeperRegistry从注册中心拉取对应服务的地址。即查询/dubbo/org.apache.dubbo.demo.DemoService/provider路径下的服务有哪些
- 创建/dubbo/org.apache.dubbo.demo.DemoService/consumer节点,并set消费者的ip地址
- 订阅providers、configurators、routers等节点信息,监听提供者或配置信息的变化
- 初始化NettyClient
创建InvokerInvocationHandler代理对象并将invoker设置到InvokerInvocationHandler的invoker成员变量中
服务调用

当我们在调用DemoService#sayHello方法时其实是调用InvokerInvocationHandler的invoke方法,而InvokerInvocationHandler真正的业务逻辑,是由其持有的invoker开始执行的(基于责任链模式)
- MockClusterInvoker:用于服务降级
- FailoverClusterInvoker:集群容错。默认调用1次,重试2次
- ProtocolFilterWrapper:包装了各个过滤器,每个filter包装成invoker,然后执行各filter逻辑
- AsyncToSyncInvoker:异步转同步。dubbo底层都是异步调用,返回future。如果是同步调用的话,就在这里使用future#get()进行阻塞,等待返回结果
- 调用netty发送消息
- 服务端接收到消息后根据调用的分组+接口全路径名+版本从DubboProtocol的exporterMap中查询对应的invoker
- 执行invoker的过滤器链
- 通过Wrapper代理的invokeMethod来调用目标方法
生成代理对象
dubbo调用真正的对象的逻辑是由Wrapper的makeWrapper动态生成的,其中生成之后的类如下:
根据传入方法名+参数(可能重载)判断调用实际类的方法public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException{org.apache.dubbo.demo.provider.MyDemoServiceImpl w;try{w = ((org.apache.dubbo.demo.provider.MyDemoServiceImpl)$1);}catch(Throwable e){throw new IllegalArgumentException(e);}try{if( "sayHello".equals( $2 ) && $3.length == 1 && $3[0].getName().equals("java.lang.String") ) {return ($w)w.sayHello((java.lang.String)$4[0]);}if( "sayHello".equals( $2 ) && $3.length == 1 && $3[0].getName().equals("int") ) {return ($w)w.sayHello(((Number)$4[0]).intValue());}} catch(Throwable e) {throw new java.lang.reflect.InvocationTargetException(e);}throw new org.apache.dubbo.common.bytecode.NoSuchMethodException("Not found method \""+$2+"\" in class org.apache.dubbo.demo.provider.MyDemoServiceImpl.");}
序列化
在服务消费者通过netty调用前会将数据先序列化,服务提供者在收到消息后也需要先反序列化。dubbo数据包分为head和body两个部分,具体参考官网截图
```java
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
if (msg instanceof Request) {
} else if (msg instanceof Response) {encodeRequest(channel, buffer, (Request) msg);
} else {encodeResponse(channel, buffer, (Response) msg);
} }super.encode(channel, buffer, msg);
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException { Serialization serialization = getSerialization(channel); // header.16位长度的header byte[] header = new byte[HEADER_LENGTH]; // set magic number.魔数 Bytes.short2bytes(MAGIC, header);
// set request and serialization flag.
// 数据包类型 request/response 和序列化器编号
header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
// 通信方向 单向或双向
if (req.isTwoWay()) {
header[2] |= FLAG_TWOWAY;
}
// 设置事件标识
if (req.isEvent()) {
header[2] |= FLAG_EVENT;
}
// set request id.
// 设置请求编号,8个字节,从第4个字节开始设置
Bytes.long2bytes(req.getId(), header, 4);
// encode request data.
int savedWriteIndex = buffer.writerIndex();
// 为消息头预留16个字节
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (req.isEvent()) {
encodeEventData(channel, out, req.getData());
} else {
encodeRequestData(channel, out, req.getData(), req.getVersion());
}
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
bos.flush();
bos.close();
int len = bos.writtenBytes();
checkPayload(channel, len);
Bytes.int2bytes(len, header, 12);
// write
buffer.writerIndex(savedWriteIndex);
buffer.writeBytes(header); // write header.
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}
protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException { int savedWriteIndex = buffer.writerIndex(); try { Serialization serialization = getSerialization(channel); // header. byte[] header = new byte[HEADER_LENGTH]; // set magic number. Bytes.short2bytes(MAGIC, header); // set request and serialization flag. header[2] = serialization.getContentTypeId(); if (res.isHeartbeat()) { header[2] |= FLAG_EVENT; } // set response status. byte status = res.getStatus(); header[3] = status; // set request id. Bytes.long2bytes(res.getId(), header, 4);
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
// encode response data or error message.
if (status == Response.OK) {
if (res.isHeartbeat()) {
encodeEventData(channel, out, res.getResult());
} else {
encodeResponseData(channel, out, res.getResult(), res.getVersion());
}
} else {
out.writeUTF(res.getErrorMessage());
}
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
bos.flush();
bos.close();
int len = bos.writtenBytes();
checkPayload(channel, len);
Bytes.int2bytes(len, header, 12);
// write
buffer.writerIndex(savedWriteIndex);
buffer.writeBytes(header); // write header.
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
} catch (Throwable t) {
// clear buffer
buffer.writerIndex(savedWriteIndex);
// send error message to Consumer, otherwise, Consumer will wait till timeout.
if (!res.isEvent() && res.getStatus() != Response.BAD_RESPONSE) {
Response r = new Response(res.getId(), res.getVersion());
r.setStatus(Response.BAD_RESPONSE);
if (t instanceof ExceedPayloadLimitException) {
logger.warn(t.getMessage(), t);
try {
r.setErrorMessage(t.getMessage());
channel.send(r);
return;
} catch (RemotingException e) {
logger.warn("Failed to send bad_response info back: " + t.getMessage() + ", cause: " + e.getMessage(), e);
}
} else {
// FIXME log error message in Codec and handle in caught() of IoHanndler?
logger.warn("Fail to encode response: " + res + ", send bad_response info instead, cause: " + t.getMessage(), t);
try {
r.setErrorMessage("Failed to send response: " + res + ", cause: " + StringUtils.toString(t));
channel.send(r);
return;
} catch (RemotingException e) {
logger.warn("Failed to send bad_response info back: " + res + ", cause: " + e.getMessage(), e);
}
}
}
// Rethrow exception
if (t instanceof IOException) {
throw (IOException) t;
} else if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else if (t instanceof Error) {
throw (Error) t;
} else {
throw new RuntimeException(t.getMessage(), t);
}
}
}
<a name="e96t4"></a>
## 集群容错
集群容错相关的类都继承自AbstractClusterInvoker,其使用了模板方法模式invoke方法中获取了所有服务invoker和负载均衡策略后,把数据提交给doInvoker(抽象方法)由子类实现
<a name="vSUXY"></a>
### 
```java
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
// binding attachments into invocation.
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}
// 获取所有服务
List<Invoker<T>> invokers = list(invocation);
// 获取负载均衡策略
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
// 模板方法,由子类实现
return doInvoke(invocation, invokers, loadbalance);
}
FailoverClusterInvoker
� 该策略由@Reference注解的retries(默认为2)属性控制,即调用一次,如果失败了,重试2次,如果还失败,那么抛出异常。
当第一次调用时会记录之前调用过的invoker,重试时会重新负载均衡并尽量避开之前调用过的invoker
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
checkInvokers(copyInvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
// 记录上一次失败的exception
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
// 记录上次调用的provider的地址
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if (i > 0) {
checkWhetherDestroyed();
copyInvokers = list(invocation);
// check again
checkInvokers(copyInvokers, invocation);
}
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + methodName
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(le.getCode(), "Failed to invoke the method "
+ methodName + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ le.getMessage(), le.getCause() != null ? le.getCause() : le);
}
FailfastClusterInvoker
� 快速失败,当调用失败后直接抛出异常,不进行重试
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
try {
return invoker.invoke(invocation);
} catch (Throwable e) {
if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
throw (RpcException) e;
}
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,
"Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName()
+ " select from all providers " + invokers + " for service " + getInterface().getName()
+ " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion()
+ ", but no luck to perform the invocation. Last error is: " + e.getMessage(),
e.getCause() != null ? e.getCause() : e);
}
}
FailsafeClusterInvoker
� 如果调用失败,捕获异常,返回空对象
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
checkInvokers(invokers, invocation);
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
return invoker.invoke(invocation);
} catch (Throwable e) {
logger.error("Failsafe ignore exception: " + e.getMessage(), e);
return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
}
}
FailbackClusterInvoker
� 如果调用失败,返回空对象给调用者,然后启动定时任务进行重试。适用于消息通知类的场景
protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
Invoker<T> invoker = null;
try {
checkInvokers(invokers, invocation);
invoker = select(loadbalance, invocation, invokers, null);
return invoker.invoke(invocation);
} catch (Throwable e) {
logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
+ e.getMessage() + ", ", e);
addFailed(loadbalance, invocation, invokers, invoker);
return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
}
}
private void addFailed(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker) {
if (failTimer == null) {
synchronized (this) {
if (failTimer == null) {
failTimer = new HashedWheelTimer(
new NamedThreadFactory("failback-cluster-timer", true),
1,
TimeUnit.SECONDS, 32, failbackTasks);
}
}
}
RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD);
try {
failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS);
} catch (Throwable e) {
logger.error("Failback background works error,invocation->" + invocation + ", exception: " + e.getMessage());
}
}
BroadcastClusterInvoker
� 遍历所有的invoker,循环调用。如果有多个异常,只抛出最后一次的异常
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
RpcContext.getContext().setInvokers((List) invokers);
RpcException exception = null;
Result result = null;
for (Invoker<T> invoker : invokers) {
try {
result = invoker.invoke(invocation);
} catch (RpcException e) {
exception = e;
logger.warn(e.getMessage(), e);
} catch (Throwable e) {
exception = new RpcException(e.getMessage(), e);
logger.warn(e.getMessage(), e);
}
}
if (exception != null) {
throw exception;
}
return result;
}
ForkingClusterInvoker
� 运行时通过线程池并行调用多个服务,只要有一个提供者返回了结果,就立即结束运行
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
checkInvokers(invokers, invocation);
final List<Invoker<T>> selected;
final int forks = getUrl().getParameter(FORKS_KEY, DEFAULT_FORKS);
final int timeout = getUrl().getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
if (forks <= 0 || forks >= invokers.size()) {
selected = invokers;
} else {
selected = new ArrayList<>();
for (int i = 0; i < forks; i++) {
Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
if (!selected.contains(invoker)) {
//Avoid add the same invoker several times.
selected.add(invoker);
}
}
}
RpcContext.getContext().setInvokers((List) selected);
final AtomicInteger count = new AtomicInteger();
final BlockingQueue<Object> ref = new LinkedBlockingQueue<>();
for (final Invoker<T> invoker : selected) {
executor.execute(() -> {
try {
Result result = invoker.invoke(invocation);
ref.offer(result);
} catch (Throwable e) {
int value = count.incrementAndGet();
if (value >= selected.size()) {
ref.offer(e);
}
}
});
}
try {
Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
if (ret instanceof Throwable) {
Throwable e = (Throwable) ret;
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
}
return (Result) ret;
} catch (InterruptedException e) {
throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
} finally {
// clear attachments which is binding to current thread.
RpcContext.getContext().clearAttachments();
}
}
负载均衡
负载均衡的通用逻辑封装在AbstractLoadBalance类的select方法中。如果invoker只有1个,那么就返回这个invoker即可。如果invoker的数量大于1,调用子类实现的doSelect方法进行扩展
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
if (CollectionUtils.isEmpty(invokers)) {
return null;
}
if (invokers.size() == 1) {
return invokers.get(0);
}
return doSelect(invokers, url, invocation);
}
RandomLoadBalance
随机负载均衡。随机负载均衡也是要基于权重进行实现的。�例如:invoker权重[5, 3, 2],totalWeight=10,取0-9的随机数offset,假设offset=9,循环invoker列表,用offset减去当前invoker的权重值,直到小于0,返回invoker。具体执行过程 9-5>0不是当前invoker,9-5-4>0 不是当前invoker, 9-5-3-2<0是当前invoker
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// Number of invokers
int length = invokers.size();
// Every invoker has the same weight?
boolean sameWeight = true;
// the weight of every invokers
int[] weights = new int[length];
// the first invoker's weight
int firstWeight = getWeight(invokers.get(0), invocation);
weights[0] = firstWeight;
// The sum of weights
int totalWeight = firstWeight;
for (int i = 1; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
// save for later use
weights[i] = weight;
// Sum
totalWeight += weight;
if (sameWeight && weight != firstWeight) {
sameWeight = false;
}
}
//例如:invoker权重[5, 3, 2]totalWeight=10,取0-9的随机数offset,假设offset=9,循环invoker列表,用offset减去当前invoker的权重值,直到小于0,返回invoker
// 9-5>0不是当前invoker,9-5-4>0 不是当前invoker, 9-5-3-2<0是当前invoker
if (totalWeight > 0 && !sameWeight) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
int offset = ThreadLocalRandom.current().nextInt(totalWeight);
// Return a invoker based on the random value.
for (int i = 0; i < length; i++) {
offset -= weights[i];
if (offset < 0) {
return invokers.get(i);
}
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
// 都没配置权重或所有invoker的权重一致,随机取
return invokers.get(ThreadLocalRandom.current().nextInt(length));
}
LeastActiveLoadBalance
� 最少活跃数负载均衡。dubbo中的实现也是基于权重进行实现的,如果有多个服务的活跃数一致,那么通过权重获取具体invoker。
在消费者端会通过ActiveLimitFilter来记录接口方法的调用次数,调用的时候对RpcStatus#active(AtomicInteger)进行+1,调用完成后对其-1
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// Number of invokers
int length = invokers.size();
// The least active value of all invokers
int leastActive = -1;
// The number of invokers having the same least active value (leastActive)
int leastCount = 0;
// The index of invokers having the same least active value (leastActive)
int[] leastIndexes = new int[length];
// the weight of every invokers
int[] weights = new int[length];
// The sum of the warmup weights of all the least active invokers
int totalWeight = 0;
// The weight of the first least active invoker
int firstWeight = 0;
// Every least active invoker has the same weight value?
boolean sameWeight = true;
// Filter out all the least active invokers
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
// Get the active number of the invoker
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
// Get the weight of the invoker's configuration. The default value is 100.
int afterWarmup = getWeight(invoker, invocation);
// save for later use
weights[i] = afterWarmup;
// If it is the first invoker or the active number of the invoker is less than the current least active number
// 第一个invoker或当前活跃数比最小活跃数小,则重置计数器,并将leastIndexes数组的第一个元素设置为invoker下标
if (leastActive == -1 || active < leastActive) {
// Reset the active number of the current invoker to the least active number
leastActive = active;
// Reset the number of least active invokers
leastCount = 1;
// Put the first least active invoker first in leastIndexes
leastIndexes[0] = i;
// Reset totalWeight
totalWeight = afterWarmup;
// Record the weight the first least active invoker
firstWeight = afterWarmup;
// Each invoke has the same weight (only one invoker here)
sameWeight = true;
// If current invoker's active value equals with leaseActive, then accumulating.
// 如果当前活跃数和最小活跃数一致,则往leastIndexes数组中添加元素,并累加leastCount计数器
} else if (active == leastActive) {
// Record the index of the least active invoker in leastIndexes order
leastIndexes[leastCount++] = i;
// Accumulate the total weight of the least active invoker
totalWeight += afterWarmup;
// If every invoker has the same weight?
if (sameWeight && i > 0
&& afterWarmup != firstWeight) {
sameWeight = false;
}
}
}
// Choose an invoker from all the least active invokers
if (leastCount == 1) {
// If we got exactly one invoker having the least active value, return this invoker directly.
return invokers.get(leastIndexes[0]);
}
if (!sameWeight && totalWeight > 0) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on
// totalWeight.
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
// Return a invoker based on the random value.
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexes[i];
offsetWeight -= weights[leastIndex];
if (offsetWeight < 0) {
return invokers.get(leastIndex);
}
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
}
RoundRobinLoadBalance 待补充
� 加权轮询负载均衡。待补充
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
if (map == null) {
methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
map = methodWeightMap.get(key);
}
int totalWeight = 0;
long maxCurrent = Long.MIN_VALUE;
long now = System.currentTimeMillis();
Invoker<T> selectedInvoker = null;
WeightedRoundRobin selectedWRR = null;
for (Invoker<T> invoker : invokers) {
String identifyString = invoker.getUrl().toIdentityString();
WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
int weight = getWeight(invoker, invocation);
if (weightedRoundRobin == null) {
weightedRoundRobin = new WeightedRoundRobin();
weightedRoundRobin.setWeight(weight);
map.putIfAbsent(identifyString, weightedRoundRobin);
}
if (weight != weightedRoundRobin.getWeight()) {
//weight changed
weightedRoundRobin.setWeight(weight);
}
long cur = weightedRoundRobin.increaseCurrent();
weightedRoundRobin.setLastUpdate(now);
if (cur > maxCurrent) {
maxCurrent = cur;
selectedInvoker = invoker;
selectedWRR = weightedRoundRobin;
}
totalWeight += weight;
}
if (!updateLock.get() && invokers.size() != map.size()) {
if (updateLock.compareAndSet(false, true)) {
try {
// copy -> modify -> update reference
ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);
newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
methodWeightMap.put(key, newMap);
} finally {
updateLock.set(false);
}
}
}
if (selectedInvoker != null) {
selectedWRR.sel(totalWeight);
return selectedInvoker;
}
// should not happen here
return invokers.get(0);
}
ConsistentHashLoadBalance 待补充
� 一致性哈希负载均衡。
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String methodName = RpcUtils.getMethodName(invocation);
String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
int identityHashCode = System.identityHashCode(invokers);
ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
if (selector == null || selector.identityHashCode != identityHashCode) {
selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode));
selector = (ConsistentHashSelector<T>) selectors.get(key);
}
return selector.select(invocation);
}
private static final class ConsistentHashSelector<T> {
private final TreeMap<Long, Invoker<T>> virtualInvokers;
private final int replicaNumber;
private final int identityHashCode;
private final int[] argumentIndex;
ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
this.identityHashCode = identityHashCode;
URL url = invokers.get(0).getUrl();
this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160);
String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0"));
argumentIndex = new int[index.length];
for (int i = 0; i < index.length; i++) {
argumentIndex[i] = Integer.parseInt(index[i]);
}
for (Invoker<T> invoker : invokers) {
String address = invoker.getUrl().getAddress();
for (int i = 0; i < replicaNumber / 4; i++) {
byte[] digest = md5(address + i);
for (int h = 0; h < 4; h++) {
long m = hash(digest, h);
virtualInvokers.put(m, invoker);
}
}
}
}
public Invoker<T> select(Invocation invocation) {
String key = toKey(invocation.getArguments());
byte[] digest = md5(key);
return selectForKey(hash(digest, 0));
}
private String toKey(Object[] args) {
StringBuilder buf = new StringBuilder();
for (int i : argumentIndex) {
if (i >= 0 && i < args.length) {
buf.append(args[i]);
}
}
return buf.toString();
}
private Invoker<T> selectForKey(long hash) {
Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);
if (entry == null) {
entry = virtualInvokers.firstEntry();
}
return entry.getValue();
}
private long hash(byte[] digest, int number) {
return (((long) (digest[3 + number * 4] & 0xFF) << 24)
| ((long) (digest[2 + number * 4] & 0xFF) << 16)
| ((long) (digest[1 + number * 4] & 0xFF) << 8)
| (digest[number * 4] & 0xFF))
& 0xFFFFFFFFL;
}
private byte[] md5(String value) {
MessageDigest md5;
try {
md5 = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException(e.getMessage(), e);
}
md5.reset();
byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
md5.update(bytes);
return md5.digest();
}
}
SPI机制
dubbo的spi机制会扫描META-INF/services、META-INF/dubbo/、META-INF/dubbo/internal包路径下的配置文件,文件名为接口,文件内容配置为kv形式,配置名=扩展实现类全限定名。其改进了jdk的扩展机制:
- jdk的标准spi会一次性加载所有扩展点实现,即时没用上。dubbo实现了按需加载
- jdk的spi扩展点如果加载失败,连扩展点的名称都拿不到
-
spring整合dubbo
当我们在spring项目中整合dubbo时,只需要使用@EnableDubbo注解即可自动装配dubbo的服务和消费者
@EnableDubbo
� 该注解是一个复合注解,主要是引入了@EnableDubboConfig、@DubboComponentScan、@EnableDubboLifecycle这3个注解 ```java @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Inherited @Documented @EnableDubboConfig @DubboComponentScan @EnableDubboLifecycle public @interface EnableDubbo {
/**
- Base packages to scan for annotated @Service classes.
- Use {@link #scanBasePackageClasses()} for a type-safe alternative to String-based
- package names. *
- @return the base packages to scan
@see DubboComponentScan#basePackages() */ @AliasFor(annotation = DubboComponentScan.class, attribute = “basePackages”) String[] scanBasePackages() default {};
/**
- Type-safe alternative to {@link #scanBasePackages()} for specifying the packages to
- scan for annotated @Service classes. The package of each class specified will be
- scanned. *
- @return classes from the base packages to scan
- @see DubboComponentScan#basePackageClasses */ @AliasFor(annotation = DubboComponentScan.class, attribute = “basePackageClasses”) Class<?>[] scanBasePackageClasses() default {};
/**
* It indicates whether {@link AbstractConfig} binding to multiple Spring Beans.
*
* @return the default value is <code>false</code>
* @see EnableDubboConfig#multiple()
*/
@AliasFor(annotation = EnableDubboConfig.class, attribute = "multiple")
boolean multipleConfig() default true;
}
<a name="zkmfC"></a>
### @EnableDubboConfig
该注解引入了DubboConfigConfigurationRegistrar,其中的registerBeanDefinitions方法为dubbo开启了自动配置,会从配置信息中读取相关配置并向spring容器注册对应的配置bean<br />
<a name="Clloo"></a>
### @DubboComponentScan
<br />该注解导入了DubboComponentScanRegistrar类,其registerBeanDefinitions方法导入了2个spring的后置处理器ServiceAnnotationBeanPostProcessor和ReferenceAnnotationBeanPostProcessor
<a name="oIwkl"></a>
#### 注册Provider服务
ServiceAnnotationBeanPostProcessor会向spring容器中注册ServiceBean和本身类型的bean(用于@Autowired注入).ServiceBean的名称为ServiceBean + 接口名 + 版本 + 分组名
```java
private void registerServiceBeans(Set<String> packagesToScan, BeanDefinitionRegistry registry) {
DubboClassPathBeanDefinitionScanner scanner =
new DubboClassPathBeanDefinitionScanner(registry, environment, resourceLoader);
BeanNameGenerator beanNameGenerator = resolveBeanNameGenerator(registry);
scanner.setBeanNameGenerator(beanNameGenerator);
scanner.addIncludeFilter(new AnnotationTypeFilter(Service.class));
/**
* Add the compatibility for legacy Dubbo's @Service
*
* The issue : https://github.com/apache/dubbo/issues/4330
* @since 2.7.3
*/
scanner.addIncludeFilter(new AnnotationTypeFilter(com.alibaba.dubbo.config.annotation.Service.class));
for (String packageToScan : packagesToScan) {
// Registers @Service Bean first
// 先在spring容器中注册bean
scanner.scan(packageToScan);
// Finds all BeanDefinitionHolders of @Service whether @ComponentScan scans or not.
Set<BeanDefinitionHolder> beanDefinitionHolders =
findServiceBeanDefinitionHolders(scanner, packageToScan, registry, beanNameGenerator);
if (!CollectionUtils.isEmpty(beanDefinitionHolders)) {
for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) {
// 注册ServcieBean
registerServiceBean(beanDefinitionHolder, registry, scanner);
}
if (logger.isInfoEnabled()) {
logger.info(beanDefinitionHolders.size() + " annotated Dubbo's @Service Components { " +
beanDefinitionHolders +
" } were scanned under package[" + packageToScan + "]");
}
} else {
if (logger.isWarnEnabled()) {
logger.warn("No Spring Bean annotating Dubbo's @Service was found under package["
+ packageToScan + "]");
}
}
}
}
注册consumer服务
通过扫描配置的路径向spring容器注册�ReferenceBean
protected Object doGetInjectedBean(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType,
InjectionMetadata.InjectedElement injectedElement) throws Exception {
/**
* The name of bean that annotated Dubbo's {@link Service @Service} in local Spring {@link ApplicationContext}
*/
String referencedBeanName = buildReferencedBeanName(attributes, injectedType);
/**
* The name of bean that is declared by {@link Reference @Reference} annotation injection
*/
String referenceBeanName = getReferenceBeanName(attributes, injectedType);
ReferenceBean referenceBean = buildReferenceBeanIfAbsent(referenceBeanName, attributes, injectedType);
registerReferenceBean(referencedBeanName, referenceBean, attributes, injectedType);
cacheInjectedReferenceBean(referenceBean, injectedElement);
return getOrCreateProxy(referencedBeanName, referenceBeanName, referenceBean, injectedType);
}
@EnableDubboLifecycle
该注解导入了DubboLifecycleComponentRegistrar,由DubboLifecycleComponentRegistrar向spring容器注册DubboBootstrapApplicationListener监听器。该监听器处理spring容器刷新完成事件和销毁事件。
public void onApplicationContextEvent(ApplicationContextEvent event) {
if (event instanceof ContextRefreshedEvent) {
onContextRefreshedEvent((ContextRefreshedEvent) event);
} else if (event instanceof ContextClosedEvent) {
onContextClosedEvent((ContextClosedEvent) event);
}
}
private void onContextRefreshedEvent(ContextRefreshedEvent event) {
dubboBootstrap.start();
}
private void onContextClosedEvent(ContextClosedEvent event) {
dubboBootstrap.stop();
}
当spring容器刷新完成时,调用DubboBootstrap#start()方法暴露和引用dubbo服务
public DubboBootstrap start() {
if (started.compareAndSet(false, true)) {
initialize();
if (logger.isInfoEnabled()) {
logger.info(NAME + " is starting...");
}
// 1. export Dubbo Services
// 注册消费者服务
exportServices();
// Not only provider register
if (!isOnlyRegisterProvider() || hasExportedServices()) {
// 2. export MetadataService
exportMetadataService();
//3. Register the local ServiceInstance if required
registerServiceInstance();
}
referServices();
if (logger.isInfoEnabled()) {
logger.info(NAME + " has started.");
}
}
return this;
}
