系统分层

image.png

  • config 配置层:对外配置接口,以 ServiceConfig, ReferenceConfig 为中心,可以直接初始化配置类,也可以通过 spring 解析配置生成配置类
  • proxy 服务代理层:服务接口透明代理,生成服务的客户端 Stub 和服务器端 Skeleton, 以 ServiceProxy 为中心,扩展接口为 ProxyFactory
  • registry 注册中心层:封装服务地址的注册与发现,以服务 URL 为中心,扩展接口为 RegistryFactory, Registry, RegistryService
  • cluster 路由层:封装多个提供者的路由及负载均衡,并桥接注册中心,以 Invoker 为中心,扩展接口为 Cluster, Directory, Router, LoadBalance
  • monitor 监控层:RPC 调用次数和调用时间监控,以 Statistics 为中心,扩展接口为 MonitorFactory, Monitor, MonitorService
  • protocol 远程调用层:封装 RPC 调用,以 Invocation, Result 为中心,扩展接口为 Protocol, Invoker, Exporter
  • exchange 信息交换层:封装请求响应模式,同步转异步,以 Request, Response 为中心,扩展接口为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer
  • transport 网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec
  • serialize 数据序列化层:可复用的一些工具,扩展接口为 Serialization, ObjectInput, ObjectOutput, ThreadPool

    调用示例

    dubbo服务

    1. public class MyDemoServiceImpl implements MyDemoService {
    2. @Override
    3. public String sayHello(String name) {
    4. return name;
    5. }
    6. @Override
    7. public String sayHello(int i) {
    8. return String.valueOf(i);
    9. }
    10. }

    provider启动

    1. public static void main(String[] args) throws Exception {
    2. ServiceConfig<MyDemoServiceImpl> service = new ServiceConfig<>();
    3. service.setInterface(MyDemoService.class);
    4. service.setRef(new MyDemoServiceImpl());
    5. DubboBootstrap bootstrap = DubboBootstrap.getInstance();
    6. bootstrap
    7. .application(new ApplicationConfig("dubbo-demo-api-provider"))
    8. .registry(new RegistryConfig("zookeeper://127.0.0.1:2181"))
    9. .service(service)
    10. .start()
    11. .await();
    12. }

    消费者调用服务

    1. public static void main(String[] args) {
    2. ReferenceConfig<MyDemoService> reference = new ReferenceConfig<>();
    3. reference.setInterface(MyDemoService.class);
    4. DubboBootstrap bootstrap = DubboBootstrap.getInstance();
    5. bootstrap
    6. .application(new ApplicationConfig("dubbo-demo-api-consumer"))
    7. .registry(new RegistryConfig("zookeeper://127.0.0.1:2181"))
    8. .reference(reference)
    9. .start();
    10. MyDemoService demoService = ReferenceConfigCache.getCache().get(reference);
    11. String message = demoService.sayHello("dubbo");
    12. String result = demoService.sayHello(1);
    13. System.out.println(message);
    14. }

    服务导出

    dubbo服务启动.jpg

  1. 通过JavassistProxyFactory为服务生成代理对象Invoker。其中会调用Wrapper的getWrapper方法生成代理类。可以通过参数调用真正的服务方法
  2. 通过InJvmProtocol暴露本地服务。当provider与consumer在同一工程中使用本地服务。将invoker存入InJvmProtocol的exporterMap中。可通过@Reference注解的injvm参数进行关闭
  3. 向DubboProtocol中的exporterMap中存储provider代理对象。key:分组名+接口全路径名+版本
  4. 启动NettyServer并绑定端口号
  5. 通过ZookeeperRegistry(继承自FailbackRegistry)向zk中创建服务节点。

完整路径为:
zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.19.20.86%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-api-provider%26bind.ip%3D172.19.20.86%26bind.port%3D20880%26default%3Dtrue%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%2CsayHelloAsync%26pid%3D1693%26release%3D%26side%3Dprovider%26timestamp%3D1656563296673&pid=1693&timestamp=1656563296662
其中向注册中心注册的路径为/dubbo/org.apache.dubbo.demo.DemoService/provider,127.0.0.1
上述案例中存在方法重载,但是dubbo服务暴露时只会注册一个方法,在真正调用时通过参数类型调用具体的重载方法

  1. 订阅override数据

    服务引用

    consumer引用服务.jpg

  2. 先查询本进程内是否有该服务,如果有,直接从InJvmProtocol中获取

  3. 获取注册中心配置,获取所有注册中心信息
  4. 循环注册中心,获取对应的服务信息。上述案例中通过ZookeeperRegistry从注册中心拉取对应服务的地址。即查询/dubbo/org.apache.dubbo.demo.DemoService/provider路径下的服务有哪些
  5. 创建/dubbo/org.apache.dubbo.demo.DemoService/consumer节点,并set消费者的ip地址
  6. 订阅providers、configurators、routers等节点信息,监听提供者或配置信息的变化
  7. 初始化NettyClient
  8. 创建InvokerInvocationHandler代理对象并将invoker设置到InvokerInvocationHandler的invoker成员变量中

    服务调用

    dubbo调用过程.png

  9. 当我们在调用DemoService#sayHello方法时其实是调用InvokerInvocationHandler的invoke方法,而InvokerInvocationHandler真正的业务逻辑,是由其持有的invoker开始执行的(基于责任链模式)

  10. MockClusterInvoker:用于服务降级
  11. FailoverClusterInvoker:集群容错。默认调用1次,重试2次
  12. ProtocolFilterWrapper:包装了各个过滤器,每个filter包装成invoker,然后执行各filter逻辑
  13. AsyncToSyncInvoker:异步转同步。dubbo底层都是异步调用,返回future。如果是同步调用的话,就在这里使用future#get()进行阻塞,等待返回结果
  14. 调用netty发送消息
  15. 服务端接收到消息后根据调用的分组+接口全路径名+版本从DubboProtocol的exporterMap中查询对应的invoker
  16. 执行invoker的过滤器链
  17. 通过Wrapper代理的invokeMethod来调用目标方法

    生成代理对象

    dubbo调用真正的对象的逻辑是由Wrapper的makeWrapper动态生成的,其中生成之后的类如下:
    1. public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException{
    2. org.apache.dubbo.demo.provider.MyDemoServiceImpl w;
    3. try{
    4. w = ((org.apache.dubbo.demo.provider.MyDemoServiceImpl)$1);
    5. }catch(Throwable e){
    6. throw new IllegalArgumentException(e);
    7. }
    8. try{
    9. if( "sayHello".equals( $2 ) && $3.length == 1 && $3[0].getName().equals("java.lang.String") ) {
    10. return ($w)w.sayHello((java.lang.String)$4[0]);
    11. }
    12. if( "sayHello".equals( $2 ) && $3.length == 1 && $3[0].getName().equals("int") ) {
    13. return ($w)w.sayHello(((Number)$4[0]).intValue());
    14. }
    15. } catch(Throwable e) {
    16. throw new java.lang.reflect.InvocationTargetException(e);
    17. }
    18. throw new org.apache.dubbo.common.bytecode.NoSuchMethodException("Not found method \""+$2+"\" in class org.apache.dubbo.demo.provider.MyDemoServiceImpl.");
    19. }
    根据传入方法名+参数(可能重载)判断调用实际类的方法

    序列化

    在服务消费者通过netty调用前会将数据先序列化,服务提供者在收到消息后也需要先反序列化。dubbo数据包分为head和body两个部分,具体参考官网截图
    image.png
    image.png ```java public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException { if (msg instanceof Request) {
    1. encodeRequest(channel, buffer, (Request) msg);
    } else if (msg instanceof Response) {
     encodeResponse(channel, buffer, (Response) msg);
    
    } else {
     super.encode(channel, buffer, msg);
    
    } }

protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException { Serialization serialization = getSerialization(channel); // header.16位长度的header byte[] header = new byte[HEADER_LENGTH]; // set magic number.魔数 Bytes.short2bytes(MAGIC, header);

    // set request and serialization flag.
    // 数据包类型 request/response 和序列化器编号
    header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
    // 通信方向 单向或双向
    if (req.isTwoWay()) {
        header[2] |= FLAG_TWOWAY;
    }
    // 设置事件标识
    if (req.isEvent()) {
        header[2] |= FLAG_EVENT;
    }

    // set request id.
    // 设置请求编号,8个字节,从第4个字节开始设置
    Bytes.long2bytes(req.getId(), header, 4);

    // encode request data.
    int savedWriteIndex = buffer.writerIndex();
    // 为消息头预留16个字节
    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
    ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
    ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
    if (req.isEvent()) {
        encodeEventData(channel, out, req.getData());
    } else {
        encodeRequestData(channel, out, req.getData(), req.getVersion());
    }
    out.flushBuffer();
    if (out instanceof Cleanable) {
        ((Cleanable) out).cleanup();
    }
    bos.flush();
    bos.close();
    int len = bos.writtenBytes();
    checkPayload(channel, len);
    Bytes.int2bytes(len, header, 12);

    // write
    buffer.writerIndex(savedWriteIndex);
    buffer.writeBytes(header); // write header.
    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}

protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException { int savedWriteIndex = buffer.writerIndex(); try { Serialization serialization = getSerialization(channel); // header. byte[] header = new byte[HEADER_LENGTH]; // set magic number. Bytes.short2bytes(MAGIC, header); // set request and serialization flag. header[2] = serialization.getContentTypeId(); if (res.isHeartbeat()) { header[2] |= FLAG_EVENT; } // set response status. byte status = res.getStatus(); header[3] = status; // set request id. Bytes.long2bytes(res.getId(), header, 4);

    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
    ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
    ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
    // encode response data or error message.
    if (status == Response.OK) {
        if (res.isHeartbeat()) {
            encodeEventData(channel, out, res.getResult());
        } else {
            encodeResponseData(channel, out, res.getResult(), res.getVersion());
        }
    } else {
        out.writeUTF(res.getErrorMessage());
    }
    out.flushBuffer();
    if (out instanceof Cleanable) {
        ((Cleanable) out).cleanup();
    }
    bos.flush();
    bos.close();

    int len = bos.writtenBytes();
    checkPayload(channel, len);
    Bytes.int2bytes(len, header, 12);
    // write
    buffer.writerIndex(savedWriteIndex);
    buffer.writeBytes(header); // write header.
    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
} catch (Throwable t) {
    // clear buffer
    buffer.writerIndex(savedWriteIndex);
    // send error message to Consumer, otherwise, Consumer will wait till timeout.
    if (!res.isEvent() && res.getStatus() != Response.BAD_RESPONSE) {
        Response r = new Response(res.getId(), res.getVersion());
        r.setStatus(Response.BAD_RESPONSE);

        if (t instanceof ExceedPayloadLimitException) {
            logger.warn(t.getMessage(), t);
            try {
                r.setErrorMessage(t.getMessage());
                channel.send(r);
                return;
            } catch (RemotingException e) {
                logger.warn("Failed to send bad_response info back: " + t.getMessage() + ", cause: " + e.getMessage(), e);
            }
        } else {
            // FIXME log error message in Codec and handle in caught() of IoHanndler?
            logger.warn("Fail to encode response: " + res + ", send bad_response info instead, cause: " + t.getMessage(), t);
            try {
                r.setErrorMessage("Failed to send response: " + res + ", cause: " + StringUtils.toString(t));
                channel.send(r);
                return;
            } catch (RemotingException e) {
                logger.warn("Failed to send bad_response info back: " + res + ", cause: " + e.getMessage(), e);
            }
        }
    }

    // Rethrow exception
    if (t instanceof IOException) {
        throw (IOException) t;
    } else if (t instanceof RuntimeException) {
        throw (RuntimeException) t;
    } else if (t instanceof Error) {
        throw (Error) t;
    } else {
        throw new RuntimeException(t.getMessage(), t);
    }
}

}

<a name="e96t4"></a>
## 集群容错
集群容错相关的类都继承自AbstractClusterInvoker,其使用了模板方法模式invoke方法中获取了所有服务invoker和负载均衡策略后,把数据提交给doInvoker(抽象方法)由子类实现
<a name="vSUXY"></a>
### ![集群容错.png](https://cdn.nlark.com/yuque/0/2022/png/492083/1657176875105-c4332f31-c7a0-44ca-86f2-51eef76f22c8.png#clientId=ub973ed5c-061f-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=940&id=u68ee7c3a&margin=%5Bobject%20Object%5D&name=%E9%9B%86%E7%BE%A4%E5%AE%B9%E9%94%99.png&originHeight=1880&originWidth=8038&originalType=binary&ratio=1&rotation=0&showTitle=false&size=378498&status=done&style=none&taskId=u16c3ce26-d4c9-43b5-b942-2383a55b97f&title=&width=4019)
```java
public Result invoke(final Invocation invocation) throws RpcException {
    checkWhetherDestroyed();

    // binding attachments into invocation.
    Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        ((RpcInvocation) invocation).addAttachments(contextAttachments);
    }
    // 获取所有服务
    List<Invoker<T>> invokers = list(invocation);
    // 获取负载均衡策略
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    // 模板方法,由子类实现
    return doInvoke(invocation, invokers, loadbalance);
}

FailoverClusterInvoker

� 该策略由@Reference注解的retries(默认为2)属性控制,即调用一次,如果失败了,重试2次,如果还失败,那么抛出异常。
当第一次调用时会记录之前调用过的invoker,重试时会重新负载均衡并尽量避开之前调用过的invoker

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        List<Invoker<T>> copyInvokers = invokers;
        checkInvokers(copyInvokers, invocation);
        String methodName = RpcUtils.getMethodName(invocation);
        int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        // retry loop.
        // 记录上一次失败的exception
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
        // 记录上次调用的provider的地址
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {
            //Reselect before retry to avoid a change of candidate `invokers`.
            //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
            if (i > 0) {
                checkWhetherDestroyed();
                copyInvokers = list(invocation);
                // check again
                checkInvokers(copyInvokers, invocation);
            }
            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + methodName
                            + " in the service " + getInterface().getName()
                            + " was successful by the provider " + invoker.getUrl().getAddress()
                            + ", but there have been failed providers " + providers
                            + " (" + providers.size() + "/" + copyInvokers.size()
                            + ") from the registry " + directory.getUrl().getAddress()
                            + " on the consumer " + NetUtils.getLocalHost()
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                            + le.getMessage(), le);
                }
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
        throw new RpcException(le.getCode(), "Failed to invoke the method "
                + methodName + " in the service " + getInterface().getName()
                + ". Tried " + len + " times of the providers " + providers
                + " (" + providers.size() + "/" + copyInvokers.size()
                + ") from the registry " + directory.getUrl().getAddress()
                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                + Version.getVersion() + ". Last error is: "
                + le.getMessage(), le.getCause() != null ? le.getCause() : le);
    }

FailfastClusterInvoker

� 快速失败,当调用失败后直接抛出异常,不进行重试

public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    checkInvokers(invokers, invocation);
    Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
    try {
        return invoker.invoke(invocation);
    } catch (Throwable e) {
        if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
            throw (RpcException) e;
        }
        throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,
                               "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName()
                               + " select from all providers " + invokers + " for service " + getInterface().getName()
                               + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost()
                               + " use dubbo version " + Version.getVersion()
                               + ", but no luck to perform the invocation. Last error is: " + e.getMessage(),
                               e.getCause() != null ? e.getCause() : e);
    }
}

FailsafeClusterInvoker

� 如果调用失败,捕获异常,返回空对象

public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    try {
        checkInvokers(invokers, invocation);
        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
        return invoker.invoke(invocation);
    } catch (Throwable e) {
        logger.error("Failsafe ignore exception: " + e.getMessage(), e);
        return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
    }
}

FailbackClusterInvoker

� 如果调用失败,返回空对象给调用者,然后启动定时任务进行重试。适用于消息通知类的场景

protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    Invoker<T> invoker = null;
    try {
        checkInvokers(invokers, invocation);
        invoker = select(loadbalance, invocation, invokers, null);
        return invoker.invoke(invocation);
    } catch (Throwable e) {
        logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
                     + e.getMessage() + ", ", e);
        addFailed(loadbalance, invocation, invokers, invoker);
        return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
    }
}

private void addFailed(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker) {
    if (failTimer == null) {
        synchronized (this) {
            if (failTimer == null) {
                failTimer = new HashedWheelTimer(
                    new NamedThreadFactory("failback-cluster-timer", true),
                    1,
                    TimeUnit.SECONDS, 32, failbackTasks);
            }
        }
    }
    RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD);
    try {
        failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS);
    } catch (Throwable e) {
        logger.error("Failback background works error,invocation->" + invocation + ", exception: " + e.getMessage());
    }
}

BroadcastClusterInvoker

� 遍历所有的invoker,循环调用。如果有多个异常,只抛出最后一次的异常

public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    checkInvokers(invokers, invocation);
    RpcContext.getContext().setInvokers((List) invokers);
    RpcException exception = null;
    Result result = null;
    for (Invoker<T> invoker : invokers) {
        try {
            result = invoker.invoke(invocation);
        } catch (RpcException e) {
            exception = e;
            logger.warn(e.getMessage(), e);
        } catch (Throwable e) {
            exception = new RpcException(e.getMessage(), e);
            logger.warn(e.getMessage(), e);
        }
    }
    if (exception != null) {
        throw exception;
    }
    return result;
}

ForkingClusterInvoker

� 运行时通过线程池并行调用多个服务,只要有一个提供者返回了结果,就立即结束运行

public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            checkInvokers(invokers, invocation);
            final List<Invoker<T>> selected;
            final int forks = getUrl().getParameter(FORKS_KEY, DEFAULT_FORKS);
            final int timeout = getUrl().getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
            if (forks <= 0 || forks >= invokers.size()) {
                selected = invokers;
            } else {
                selected = new ArrayList<>();
                for (int i = 0; i < forks; i++) {
                    Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
                    if (!selected.contains(invoker)) {
                        //Avoid add the same invoker several times.
                        selected.add(invoker);
                    }
                }
            }
            RpcContext.getContext().setInvokers((List) selected);
            final AtomicInteger count = new AtomicInteger();
            final BlockingQueue<Object> ref = new LinkedBlockingQueue<>();
            for (final Invoker<T> invoker : selected) {
                executor.execute(() -> {
                    try {
                        Result result = invoker.invoke(invocation);
                        ref.offer(result);
                    } catch (Throwable e) {
                        int value = count.incrementAndGet();
                        if (value >= selected.size()) {
                            ref.offer(e);
                        }
                    }
                });
            }
            try {
                Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
                if (ret instanceof Throwable) {
                    Throwable e = (Throwable) ret;
                    throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
                }
                return (Result) ret;
            } catch (InterruptedException e) {
                throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
        } finally {
            // clear attachments which is binding to current thread.
            RpcContext.getContext().clearAttachments();
        }
    }

负载均衡

负载均衡的通用逻辑封装在AbstractLoadBalance类的select方法中。如果invoker只有1个,那么就返回这个invoker即可。如果invoker的数量大于1,调用子类实现的doSelect方法进行扩展
LoadBalance.png

public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    if (CollectionUtils.isEmpty(invokers)) {
        return null;
    }
    if (invokers.size() == 1) {
        return invokers.get(0);
    }
    return doSelect(invokers, url, invocation);
}

RandomLoadBalance

随机负载均衡。随机负载均衡也是要基于权重进行实现的。�例如:invoker权重[5, 3, 2],totalWeight=10,取0-9的随机数offset,假设offset=9,循环invoker列表,用offset减去当前invoker的权重值,直到小于0,返回invoker。具体执行过程 9-5>0不是当前invoker,9-5-4>0 不是当前invoker, 9-5-3-2<0是当前invoker

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    // Number of invokers
    int length = invokers.size();
    // Every invoker has the same weight?
    boolean sameWeight = true;
    // the weight of every invokers
    int[] weights = new int[length];
    // the first invoker's weight
    int firstWeight = getWeight(invokers.get(0), invocation);
    weights[0] = firstWeight;
    // The sum of weights
    int totalWeight = firstWeight;
    for (int i = 1; i < length; i++) {
        int weight = getWeight(invokers.get(i), invocation);
        // save for later use
        weights[i] = weight;
        // Sum
        totalWeight += weight;
        if (sameWeight && weight != firstWeight) {
            sameWeight = false;
        }
    }
    //例如:invoker权重[5, 3, 2]totalWeight=10,取0-9的随机数offset,假设offset=9,循环invoker列表,用offset减去当前invoker的权重值,直到小于0,返回invoker
    // 9-5>0不是当前invoker,9-5-4>0 不是当前invoker, 9-5-3-2<0是当前invoker
    if (totalWeight > 0 && !sameWeight) {
        // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
        int offset = ThreadLocalRandom.current().nextInt(totalWeight);
        // Return a invoker based on the random value.
        for (int i = 0; i < length; i++) {
            offset -= weights[i];
            if (offset < 0) {
                return invokers.get(i);
            }
        }
    }
    // If all invokers have the same weight value or totalWeight=0, return evenly.
    // 都没配置权重或所有invoker的权重一致,随机取
    return invokers.get(ThreadLocalRandom.current().nextInt(length));
}

LeastActiveLoadBalance

� 最少活跃数负载均衡。dubbo中的实现也是基于权重进行实现的,如果有多个服务的活跃数一致,那么通过权重获取具体invoker。
在消费者端会通过ActiveLimitFilter来记录接口方法的调用次数,调用的时候对RpcStatus#active(AtomicInteger)进行+1,调用完成后对其-1

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        // Number of invokers
        int length = invokers.size();
        // The least active value of all invokers
        int leastActive = -1;
        // The number of invokers having the same least active value (leastActive)
        int leastCount = 0;
        // The index of invokers having the same least active value (leastActive)
        int[] leastIndexes = new int[length];
        // the weight of every invokers
        int[] weights = new int[length];
        // The sum of the warmup weights of all the least active invokers
        int totalWeight = 0;
        // The weight of the first least active invoker
        int firstWeight = 0;
        // Every least active invoker has the same weight value?
        boolean sameWeight = true;


        // Filter out all the least active invokers
        for (int i = 0; i < length; i++) {
            Invoker<T> invoker = invokers.get(i);
            // Get the active number of the invoker
            int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
            // Get the weight of the invoker's configuration. The default value is 100.
            int afterWarmup = getWeight(invoker, invocation);
            // save for later use
            weights[i] = afterWarmup;
            // If it is the first invoker or the active number of the invoker is less than the current least active number
            // 第一个invoker或当前活跃数比最小活跃数小,则重置计数器,并将leastIndexes数组的第一个元素设置为invoker下标
            if (leastActive == -1 || active < leastActive) {
                // Reset the active number of the current invoker to the least active number
                leastActive = active;
                // Reset the number of least active invokers
                leastCount = 1;
                // Put the first least active invoker first in leastIndexes
                leastIndexes[0] = i;
                // Reset totalWeight
                totalWeight = afterWarmup;
                // Record the weight the first least active invoker
                firstWeight = afterWarmup;
                // Each invoke has the same weight (only one invoker here)
                sameWeight = true;
                // If current invoker's active value equals with leaseActive, then accumulating.
                // 如果当前活跃数和最小活跃数一致,则往leastIndexes数组中添加元素,并累加leastCount计数器
            } else if (active == leastActive) {
                // Record the index of the least active invoker in leastIndexes order
                leastIndexes[leastCount++] = i;
                // Accumulate the total weight of the least active invoker
                totalWeight += afterWarmup;
                // If every invoker has the same weight?
                if (sameWeight && i > 0
                        && afterWarmup != firstWeight) {
                    sameWeight = false;
                }
            }
        }
        // Choose an invoker from all the least active invokers
        if (leastCount == 1) {
            // If we got exactly one invoker having the least active value, return this invoker directly.
            return invokers.get(leastIndexes[0]);
        }
        if (!sameWeight && totalWeight > 0) {
            // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on 
            // totalWeight.
            int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
            // Return a invoker based on the random value.
            for (int i = 0; i < leastCount; i++) {
                int leastIndex = leastIndexes[i];
                offsetWeight -= weights[leastIndex];
                if (offsetWeight < 0) {
                    return invokers.get(leastIndex);
                }
            }
        }
        // If all invokers have the same weight value or totalWeight=0, return evenly.
        return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
    }

RoundRobinLoadBalance 待补充

� 加权轮询负载均衡。待补充

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
    ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
    if (map == null) {
        methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
        map = methodWeightMap.get(key);
    }
    int totalWeight = 0;
    long maxCurrent = Long.MIN_VALUE;
    long now = System.currentTimeMillis();
    Invoker<T> selectedInvoker = null;
    WeightedRoundRobin selectedWRR = null;
    for (Invoker<T> invoker : invokers) {
        String identifyString = invoker.getUrl().toIdentityString();
        WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
        int weight = getWeight(invoker, invocation);

        if (weightedRoundRobin == null) {
            weightedRoundRobin = new WeightedRoundRobin();
            weightedRoundRobin.setWeight(weight);
            map.putIfAbsent(identifyString, weightedRoundRobin);
        }
        if (weight != weightedRoundRobin.getWeight()) {
            //weight changed
            weightedRoundRobin.setWeight(weight);
        }
        long cur = weightedRoundRobin.increaseCurrent();
        weightedRoundRobin.setLastUpdate(now);
        if (cur > maxCurrent) {
            maxCurrent = cur;
            selectedInvoker = invoker;
            selectedWRR = weightedRoundRobin;
        }
        totalWeight += weight;
    }
    if (!updateLock.get() && invokers.size() != map.size()) {
        if (updateLock.compareAndSet(false, true)) {
            try {
                // copy -> modify -> update reference
                ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);
                newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
                methodWeightMap.put(key, newMap);
            } finally {
                updateLock.set(false);
            }
        }
    }
    if (selectedInvoker != null) {
        selectedWRR.sel(totalWeight);
        return selectedInvoker;
    }
    // should not happen here
    return invokers.get(0);
}

ConsistentHashLoadBalance 待补充

� 一致性哈希负载均衡。

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        String methodName = RpcUtils.getMethodName(invocation);
        String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
        int identityHashCode = System.identityHashCode(invokers);
        ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
        if (selector == null || selector.identityHashCode != identityHashCode) {
            selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode));
            selector = (ConsistentHashSelector<T>) selectors.get(key);
        }
        return selector.select(invocation);
    }

    private static final class ConsistentHashSelector<T> {

        private final TreeMap<Long, Invoker<T>> virtualInvokers;

        private final int replicaNumber;

        private final int identityHashCode;

        private final int[] argumentIndex;

        ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
            this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
            this.identityHashCode = identityHashCode;
            URL url = invokers.get(0).getUrl();
            this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160);
            String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0"));
            argumentIndex = new int[index.length];
            for (int i = 0; i < index.length; i++) {
                argumentIndex[i] = Integer.parseInt(index[i]);
            }
            for (Invoker<T> invoker : invokers) {
                String address = invoker.getUrl().getAddress();
                for (int i = 0; i < replicaNumber / 4; i++) {
                    byte[] digest = md5(address + i);
                    for (int h = 0; h < 4; h++) {
                        long m = hash(digest, h);
                        virtualInvokers.put(m, invoker);
                    }
                }
            }
        }

        public Invoker<T> select(Invocation invocation) {
            String key = toKey(invocation.getArguments());
            byte[] digest = md5(key);
            return selectForKey(hash(digest, 0));
        }

        private String toKey(Object[] args) {
            StringBuilder buf = new StringBuilder();
            for (int i : argumentIndex) {
                if (i >= 0 && i < args.length) {
                    buf.append(args[i]);
                }
            }
            return buf.toString();
        }

        private Invoker<T> selectForKey(long hash) {
            Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);
            if (entry == null) {
                entry = virtualInvokers.firstEntry();
            }
            return entry.getValue();
        }

        private long hash(byte[] digest, int number) {
            return (((long) (digest[3 + number * 4] & 0xFF) << 24)
                    | ((long) (digest[2 + number * 4] & 0xFF) << 16)
                    | ((long) (digest[1 + number * 4] & 0xFF) << 8)
                    | (digest[number * 4] & 0xFF))
                    & 0xFFFFFFFFL;
        }

        private byte[] md5(String value) {
            MessageDigest md5;
            try {
                md5 = MessageDigest.getInstance("MD5");
            } catch (NoSuchAlgorithmException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            md5.reset();
            byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
            md5.update(bytes);
            return md5.digest();
        }

    }

SPI机制

dubbo的spi机制会扫描META-INF/services、META-INF/dubbo/、META-INF/dubbo/internal包路径下的配置文件,文件名为接口,文件内容配置为kv形式,配置名=扩展实现类全限定名。其改进了jdk的扩展机制:

  1. jdk的标准spi会一次性加载所有扩展点实现,即时没用上。dubbo实现了按需加载
  2. jdk的spi扩展点如果加载失败,连扩展点的名称都拿不到
  3. 增加了对扩展点ioc和aop的支持

    spring整合dubbo

    当我们在spring项目中整合dubbo时,只需要使用@EnableDubbo注解即可自动装配dubbo的服务和消费者

    @EnableDubbo

    � 该注解是一个复合注解,主要是引入了@EnableDubboConfig、@DubboComponentScan、@EnableDubboLifecycle这3个注解 ```java @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Inherited @Documented @EnableDubboConfig @DubboComponentScan @EnableDubboLifecycle public @interface EnableDubbo {

    /**

    • Base packages to scan for annotated @Service classes.
    • Use {@link #scanBasePackageClasses()} for a type-safe alternative to String-based
    • package names. *
    • @return the base packages to scan
    • @see DubboComponentScan#basePackages() */ @AliasFor(annotation = DubboComponentScan.class, attribute = “basePackages”) String[] scanBasePackages() default {};

      /**

    • Type-safe alternative to {@link #scanBasePackages()} for specifying the packages to
    • scan for annotated @Service classes. The package of each class specified will be
    • scanned. *
    • @return classes from the base packages to scan
    • @see DubboComponentScan#basePackageClasses */ @AliasFor(annotation = DubboComponentScan.class, attribute = “basePackageClasses”) Class<?>[] scanBasePackageClasses() default {};
/**
 * It indicates whether {@link AbstractConfig} binding to multiple Spring Beans.
 *
 * @return the default value is <code>false</code>
 * @see EnableDubboConfig#multiple()
 */
@AliasFor(annotation = EnableDubboConfig.class, attribute = "multiple")
boolean multipleConfig() default true;

}

<a name="zkmfC"></a>
### @EnableDubboConfig
该注解引入了DubboConfigConfigurationRegistrar,其中的registerBeanDefinitions方法为dubbo开启了自动配置,会从配置信息中读取相关配置并向spring容器注册对应的配置bean<br />![](https://cdn.nlark.com/yuque/0/2022/jpeg/492083/1657193272559-e9ff1d10-3458-433b-ada6-364ad6a7bfd2.jpeg)
<a name="Clloo"></a>
### @DubboComponentScan
![](https://cdn.nlark.com/yuque/0/2022/jpeg/492083/1657195118782-acb8a288-d3bc-4cb2-b510-e1360e37bef0.jpeg)<br />该注解导入了DubboComponentScanRegistrar类,其registerBeanDefinitions方法导入了2个spring的后置处理器ServiceAnnotationBeanPostProcessor和ReferenceAnnotationBeanPostProcessor
<a name="oIwkl"></a>
#### 注册Provider服务
ServiceAnnotationBeanPostProcessor会向spring容器中注册ServiceBean和本身类型的bean(用于@Autowired注入).ServiceBean的名称为ServiceBean + 接口名 + 版本 + 分组名
```java
private void registerServiceBeans(Set<String> packagesToScan, BeanDefinitionRegistry registry) {

        DubboClassPathBeanDefinitionScanner scanner =
                new DubboClassPathBeanDefinitionScanner(registry, environment, resourceLoader);

        BeanNameGenerator beanNameGenerator = resolveBeanNameGenerator(registry);

        scanner.setBeanNameGenerator(beanNameGenerator);

        scanner.addIncludeFilter(new AnnotationTypeFilter(Service.class));

        /**
         * Add the compatibility for legacy Dubbo's @Service
         *
         * The issue : https://github.com/apache/dubbo/issues/4330
         * @since 2.7.3
         */
        scanner.addIncludeFilter(new AnnotationTypeFilter(com.alibaba.dubbo.config.annotation.Service.class));

        for (String packageToScan : packagesToScan) {

            // Registers @Service Bean first
            // 先在spring容器中注册bean
            scanner.scan(packageToScan);

            // Finds all BeanDefinitionHolders of @Service whether @ComponentScan scans or not.
            Set<BeanDefinitionHolder> beanDefinitionHolders =
                    findServiceBeanDefinitionHolders(scanner, packageToScan, registry, beanNameGenerator);

            if (!CollectionUtils.isEmpty(beanDefinitionHolders)) {

                for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) {
                    // 注册ServcieBean
                    registerServiceBean(beanDefinitionHolder, registry, scanner);
                }

                if (logger.isInfoEnabled()) {
                    logger.info(beanDefinitionHolders.size() + " annotated Dubbo's @Service Components { " +
                            beanDefinitionHolders +
                            " } were scanned under package[" + packageToScan + "]");
                }

            } else {

                if (logger.isWarnEnabled()) {
                    logger.warn("No Spring Bean annotating Dubbo's @Service was found under package["
                            + packageToScan + "]");
                }

            }

        }

    }

注册consumer服务

通过扫描配置的路径向spring容器注册�ReferenceBean

protected Object doGetInjectedBean(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType,
                                       InjectionMetadata.InjectedElement injectedElement) throws Exception {
    /**
         * The name of bean that annotated Dubbo's {@link Service @Service} in local Spring {@link ApplicationContext}
         */
    String referencedBeanName = buildReferencedBeanName(attributes, injectedType);

    /**
         * The name of bean that is declared by {@link Reference @Reference} annotation injection
         */
    String referenceBeanName = getReferenceBeanName(attributes, injectedType);

    ReferenceBean referenceBean = buildReferenceBeanIfAbsent(referenceBeanName, attributes, injectedType);

    registerReferenceBean(referencedBeanName, referenceBean, attributes, injectedType);

    cacheInjectedReferenceBean(referenceBean, injectedElement);

    return getOrCreateProxy(referencedBeanName, referenceBeanName, referenceBean, injectedType);
}

@EnableDubboLifecycle

该注解导入了DubboLifecycleComponentRegistrar,由DubboLifecycleComponentRegistrar向spring容器注册DubboBootstrapApplicationListener监听器。该监听器处理spring容器刷新完成事件和销毁事件。

public void onApplicationContextEvent(ApplicationContextEvent event) {
        if (event instanceof ContextRefreshedEvent) {
            onContextRefreshedEvent((ContextRefreshedEvent) event);
        } else if (event instanceof ContextClosedEvent) {
            onContextClosedEvent((ContextClosedEvent) event);
        }
    }

    private void onContextRefreshedEvent(ContextRefreshedEvent event) {
        dubboBootstrap.start();
    }

    private void onContextClosedEvent(ContextClosedEvent event) {
        dubboBootstrap.stop();
    }
当spring容器刷新完成时,调用DubboBootstrap#start()方法暴露和引用dubbo服务
public DubboBootstrap start() {
    if (started.compareAndSet(false, true)) {
        initialize();
        if (logger.isInfoEnabled()) {
            logger.info(NAME + " is starting...");
        }
        // 1. export Dubbo Services
        // 注册消费者服务
        exportServices();

        // Not only provider register
        if (!isOnlyRegisterProvider() || hasExportedServices()) {
            // 2. export MetadataService
            exportMetadataService();
            //3. Register the local ServiceInstance if required
            registerServiceInstance();
        }

        referServices();

        if (logger.isInfoEnabled()) {
            logger.info(NAME + " has started.");
        }
    }
    return this;
}