- (1)dubbo的架构原理-探索
- (2)dubbo自己的spi实现
- (3)SPI机制的adaptive原理
- (4)dubbo自己的IOC和AOP原理
- (5)dubbo的动态编译
- (6)dubbo如何和spring完美融合
- (7)服务发现 - 原理探索
- (8)服务发现 - netty的服务暴露
- (9)zookeeper的连接-创建-订阅
- (10)dubbo如何连接zookeeper?
- (11)dubbo如何创建zookeeper节点?
- (12)dubbo如何订阅zookeeper信息?
- (13)服务发布 - 整体架构设计图
- (14)服务引用的设计原理
- (15)服务引用 - 整体架构设计图
- (17)集群容错之directory目录
- (18)集群容错之router路由规则
- (19)集群容错之cluster集群
- (20)集群容错之loadbalance负载均衡
- (21)dubbo如何实现SOA的服务降级?
- (22)网络通信-consumer发送原理
- (23)网络通信-provider的接收与发送原理
- (24)网络通信-consumer的接收原理
- (25)如何把网络通信的IO异步变同步?
- (26)dubbo的核心级概念-invoker
- (27)网络通信–编码解码
(1)dubbo的架构原理-探索
节点角色说明
节点 | 角色说明 |
---|---|
Provider | 暴露服务的服务提供方 |
Consumer | 调用远程服务的服务消费方 |
Registry | 服务注册与发现的注册中心 |
Monitor | 统计服务的调用次数和调用时间的监控中心 |
Container | 服务运行容器 |
调用关系说明
- provider启动时,会把所有接口注册到注册中心,并且订阅动态配置configurators
- consumer启动时,向注册中心订阅自己所需的providers,configurators,routers
- 订阅内容变更时,注册中心将基于长连接推送变更数据给consumer,包括providers,configurators,routers
- consumer启动时,从provider地址列表中,基于软负载均衡算法,选一台provider进行调用,如果调用失败,再选另一台调用,建立长连接,然后进行数据通信(consumer->provider)
- consumer、provider启动后,在内存中累计调用次数和调用时间,定时每分钟发送一次统计数据到monitor
Dubbo 架构具有以下几个特点,分别是连通性、健壮性、伸缩性、以及向未来架构的升级性。
连通性
- 注册中心负责服务地址的注册与查找,相当于目录服务,服务提供者和消费者只在启动时与注册中心交互,注册中心不转发请求,压力较小
- 监控中心负责统计各服务调用次数,调用时间等,统计先在内存汇总后每分钟一次发送到监控中心服务器,并以报表展示
- 服务提供者向注册中心注册其提供的服务,并汇报调用时间到监控中心,此时间不包含网络开销
- 服务消费者向注册中心获取服务提供者地址列表,并根据负载算法直接调用提供者,同时汇报调用时间到监控中心,此时间包含网络开销
- 注册中心,服务提供者,服务消费者三者之间均为长连接,监控中心除外
- 注册中心通过长连接感知服务提供者的存在,服务提供者宕机,注册中心将立即推送事件通知消费者
- 注册中心和监控中心全部宕机,不影响已运行的提供者和消费者,消费者在本地缓存了提供者列表
- 注册中心和监控中心都是可选的,服务消费者可以直连服务提供者
健壮性
- 监控中心宕掉不影响使用,只是丢失部分采样数据
- 数据库宕掉后,注册中心仍能通过缓存提供服务列表查询,但不能注册新服务
- 注册中心对等集群,任意一台宕掉后,将自动切换到另一台
- 注册中心全部宕掉后,服务提供者和服务消费者仍能通过本地缓存通讯
- 服务提供者无状态,任意一台宕掉后,不影响使用
- 服务提供者全部宕掉后,服务消费者应用将无法使用,并无限次重连等待服务提供者恢复
伸缩性
- 注册中心为对等集群,可动态增加机器部署实例,所有客户端将自动发现新的注册中心
- 服务提供者无状态,可动态增加机器部署实例,注册中心将推送新的服务提供者信息给消费者
升级性
- 当服务集群规模进一步扩大,带动IT治理结构进一步升级,需要实现动态部署,进行流动计算,现有分布式服务架构不会带来阻力。
- 下图是未来可能的一种架构:
节点角色说明
节点 | 角色说明 |
---|---|
Deployer | 自动部署服务的本地代理 |
Repository | 仓库用于存储服务应用发布包 |
Scheduler | 调度中心基于访问压力自动增减服务提供者 |
Admin | 统一管理控制台 |
Registry | 服务注册与发现的注册中心 |
Monitor | 统计服务的调用次数和调用时间的监控中心 |
(2)dubbo自己的spi实现
dubbo内核包括四个:SPI(模仿JDK的SPI)、AOP(模仿Spring)、IOC(模仿Spring)、compiler(动态编译)
SPI的设计目标:
- 面向对象的设计里,模块之间基于接口编程,模块之间不对实现类进行硬编码(硬编码:数据直接嵌入到程序)
- 一旦代码涉及具体的实现类,就违反了可拔插的原则,如果需要替换一种实现,就需要修改代码
- 为了实现在模块中装配的时候,不在模块里写死代码,这就需要一种服务发现机制
- 为某个接口寻找服务实现的机制,有点类似IOC的思想,就是将装配的控制权转移到代码之外
SPI的具体约定:
- 当服务的提供者(provide),提供了一个接口多种实现时,一般会在jar包的META_INF/services/目录下,创建该接口的同名文件,该文件里面的内容就是该服务接口的具体实现类的名称
- 当外部加载这个模块的时候,就能通过jar包的META_INF/services/目录的配置文件得到具体的实现类名,并加载实例化,完成模块的装配
dubbo为什么不直接使用JDK的SPI?
- JDK标准的SPI会一次性实例化扩展点所有实现,如果有扩展实现初始化很耗时,但如果没用上也加载,会很浪费资源
- dubbo的SPI增加了对扩展点IoC和AOP的支持,一个扩展点可以直接setter注入其他扩展点,JDK的SPI是没有的
dubbo的SPI目的:获取一个实现类的对象
途径:ExtensionLoader.getExtension(String name)
实现路径:
- getExtensionLoader(Class< Type > type)就是为该接口new 一个ExtensionLoader,然后缓存起来。
- getAdaptiveExtension() 获取一个扩展类,如果@Adaptive注解在类上就是一个装饰类;如果注解在方法上就是一个动态代理类,例如Protocol$Adaptive对象。
- getExtension(String name) 获取一个指定对象。
ExtensionLoader
从ExtensionLoader.getExtensionLoader(Class< Type > type)讲起
-----------------------ExtensionLoader.getExtensionLoader(Class<T> type)
ExtensionLoader.getExtensionLoader(Container.class)
-->this.type = type;
-->objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
-->ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension()
-->this.type = type;
-->objectFactory =null;
执行以上代码完成了2个属性的初始化
1、每个一个ExtensionLoader都包含了2个值type 和objectFactory
Class< ? > type:
- 构造器 初始化要得到的接口名
ExtensionFactory objectFactory :
- 构造器 初始化AdaptiveExtensionFactory[ SpiExtensionFactory, SpringExtensionFactory]
- new一个ExtensionLoader存储在ConcurrentMap< Class< ? >, ExtensionLoader< ? > > EXTENSION_LOADERS
关于objectFactory的一些细节:
- objectFactory 就是ExtensionFactory ,也是通过ExtensionLoader.getExtensionLoader(ExtensionFactory.class)来实现,但objectFactory = null;
- objectFactory 的作用就是为dubbo的IOC提供所有对象
(3)SPI机制的adaptive原理
先来看一下adaptive的源码
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD}) //只能注解在类、接口、方法上面
public @interface Adaptive {
String[] value() default {};
}
@adaptive注解在类上面:AdaptiveExtensionFactory
@adaptive注解在方法上面:Protocol
@adaptive注解在类和方法上的区别:
- 注解在类上:代表人工实现编码,即实现了一个装饰类(设计模式中的装饰模式),例如:ExtensionFactory
- 注解在方法上:代表自动生成和编译一个动态的adpative类,例如:Protocol$adpative
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class)
.getAdaptiveExtension();
-----------------------getAdaptiveExtension()
-->getAdaptiveExtension() //目的为 cachedAdaptiveInstance赋值
-->createAdaptiveExtension()
-->getAdaptiveExtension()
-->getExtensionClasses() //目的为cachedClasses赋值
-->loadExtensionClasses() //加载
-->loadFile() //加载配置信息(主要是META_INF/services/下)
-->createAdaptiveExtensionClass() //自动生成和编译一个动态的adpative类,这个类是个代理类
-->createAdaptiveExtensionClassCode()//通过adaptive模板生成代码
-->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();//编译
-->compile(String code, ClassLoader classLoader)
-->injectExtension()//作用:进入IOC的反转控制模式,实现了动态入注
adaptive模板
package <扩展点接口所在包>;
public class <扩展点接口名>$Adpative implements <扩展点接口> {
public <有@Adaptive注解的接口方法>(<方法参数>) {
if(是否有URL类型方法参数?) 使用该URL参数
else if(是否有方法类型上有URL属性) 使用该URL属性
# <else 在加载扩展点生成自适应扩展点类时抛异常,即加载扩展点失败!>
if(获取的URL == null) {
throw new IllegalArgumentException("url == null");
}
根据@Adaptive注解上声明的Key的顺序,从URL获致Value,作为实际扩展点名。
如URL没有Value,则使用缺省扩展点实现。如没有扩展点, throw new IllegalStateException("Fail to get extension");
在扩展点实现调用该方法,并返回结果。
}
public <有@Adaptive注解的接口方法>(<方法参数>) {
throw new UnsupportedOperationException("is not adaptive method!");
}
}
例如com.alibaba.dubbo.rpc.Protocol接口的动态编译的扩展类Protocol$Adpative为:
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adpative implements Protocol {
public void destroy() {
throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public com.alibaba.dubbo.rpc.Exporter export(Invoker invoker) throws RpcException {
if (invoker == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (invoker.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = invoker.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
Protocol extension = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
return extension.export(invoker);
}
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
}
关于loadFile() 的一些细节:
目的:把配置文件META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol的内容存储在缓存变量里面,下面四种缓存变量:
- cachedAdaptiveClass //如果这个类有Adaptive注解就赋值,而Protocol在这个环节是没有的
- cachedWrapperClasses //只有当该class无Adaptive注解,并且构造函数包含目标接口(type),例如protocol里面的spi就只有ProtocolFilterWrapper、ProtocolListenerWrapper能命中
- cachedActivates //剩下的类包含adaptive注解
- cachedNames //剩下的类就存储在这里
(4)dubbo自己的IOC和AOP原理
-----------------------getExtension(String name)
getExtension(String name) //指定对象缓存在cachedInstances;get出来的对象wrapper对象,例如protocol就是ProtocolFilterWrapper和ProtocolListenerWrapper其中一个。
-->createExtension(String name)
-->getExtensionClasses()
-->injectExtension(T instance)//dubbo的IOC反转控制,就是从spi和spring里面提取对象赋值。
-->objectFactory.getExtension(pt, property)
-->SpiExtensionFactory.getExtension(type, name)
-->ExtensionLoader.getExtensionLoader(type)
-->loader.getAdaptiveExtension()
-->SpringExtensionFactory.getExtension(type, name)
-->context.getBean(name)
-->injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance))//AOP的简单设计
dubbo的spi流程:
(5)dubbo的动态编译
Compile接口定义:
@SPI("javassist")
public interface Compiler {
/**
* Compile java source code.
*
* @param code Java source code
* @param classLoader TODO
* @return Compiled class
*/
Class<?> compile(String code, ClassLoader classLoader);
}
- @SPI(“javassist”):表示如果没有配置,dubbo默认选用javassist编译源代码
- 接口方法compile第一个入参code,就是java的源代码
- 接口方法compile第二个入参classLoader,按理是类加载器用来加载编译后的字节码,其实没用到,都是根据当前线程或者调用方的classLoader加载的
AdaptiveCompiler是Compiler的设配类,它的作用是Compiler策略的选择,根据条件选择使用何种编译策略来编译动态生成SPI扩展 ,默认为javassist.
AbstractCompiler是一个抽象类,它通过正则表达式获取到对象的包名以及Class名称。这样就可以获取对象的全类名(包名+Class名称)。通过反射Class.forName()来判断当前ClassLoader是否有这个类,如果有就返回,如果没有就通过JdkCompiler或者JavassistCompiler通过传入的code编译这个类。
com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
getExtensionLoader ():new一个ExtensionLoader对象,用到单例模式、工厂模式,然后换成起来
getAdaptiveExtension() :为了获取扩展装饰类或代理类的对像,不过有个规则:如果@Adaptive注解在类上就是一个装饰类;如果注解在方法上就是一个动态代理类。
Javassist
Javassist是一款字节码编辑工具,同时也是一个动态类库,它可以直接检查、修改以及创建 Java类。
以下例子就是通过javassist创建一个动态编译的例子,dubbo使用的JavassistCompiler类跟下面例子基本差不多
public class CompilerByJavassist {
public static void main(String[] args) throws Exception {
// ClassPool:CtClass对象的容器
ClassPool pool = ClassPool.getDefault();
// 通过ClassPool生成一个public新类Emp.java
CtClass ctClass = pool.makeClass("com.study.javassist.Emp");
// 添加属性
// 首先添加属性private String ename
CtField enameField = new CtField(pool.getCtClass("java.lang.String"),
"ename", ctClass);
enameField.setModifiers(Modifier.PRIVATE);
ctClass.addField(enameField);
// 其次添加熟悉privtae int eno
CtField enoField = new CtField(pool.getCtClass("int"), "eno", ctClass);
enoField.setModifiers(Modifier.PRIVATE);
ctClass.addField(enoField);
// 为属性ename和eno添加getXXX和setXXX方法
ctClass.addMethod(CtNewMethod.getter("getEname", enameField));
ctClass.addMethod(CtNewMethod.setter("setEname", enameField));
ctClass.addMethod(CtNewMethod.getter("getEno", enoField));
ctClass.addMethod(CtNewMethod.setter("setEno", enoField));
// 添加构造函数
CtConstructor ctConstructor = new CtConstructor(new CtClass[] {},
ctClass);
// 为构造函数设置函数体
StringBuffer buffer = new StringBuffer();
buffer.append("{\n").append("ename=\"yy\";\n").append("eno=001;\n}");
ctConstructor.setBody(buffer.toString());
// 把构造函数添加到新的类中
ctClass.addConstructor(ctConstructor);
// 添加自定义方法
CtMethod ctMethod = new CtMethod(CtClass.voidType, "printInfo",
new CtClass[] {}, ctClass);
// 为自定义方法设置修饰符
ctMethod.setModifiers(Modifier.PUBLIC);
// 为自定义方法设置函数体
StringBuffer buffer2 = new StringBuffer();
buffer2.append("{\nSystem.out.println(\"begin!\");\n")
.append("System.out.println(ename);\n")
.append("System.out.println(eno);\n")
.append("System.out.println(\"over!\");\n").append("}");
ctMethod.setBody(buffer2.toString());
ctClass.addMethod(ctMethod);
//最好生成一个class
Class<?> clazz = ctClass.toClass();
Object obj = clazz.newInstance();
//反射 执行方法
obj.getClass().getMethod("printInfo", new Class[] {})
.invoke(obj, new Object[] {});
// 把生成的class文件写入文件
byte[] byteArr = ctClass.toBytecode();
FileOutputStream fos = new FileOutputStream(new File("D://Emp.class"));
fos.write(byteArr);
fos.close();
}
}
public class Emp {
private String ename;
private int eno;
public Emp(){
ename="yy";
eno=001;
}
public String getEname() {
return ename;
}
public void setEname(String ename) {
this.ename = ename;
}
public int getEno() {
return eno;
}
public void setEno(int eno) {
this.eno = eno;
}
//添加一个自定义方法
public void printInfo(){
System.out.println("begin!");
System.out.println(ename);
System.out.println(eno);
System.out.println("over!");
}
}
(6)dubbo如何和spring完美融合
1、dubbo采取通过配置文件来启动container容器,dubbo是使用spring来做容器
<!-- 提供方应用信息,用于计算依赖关系 -->
<dubbo:application name="demo-provider"/>
<!-- 使用multicast广播注册中心暴露服务地址 -->
<dubbo:registry address="multicast://224.5.6.7:1234"/>
<!-- 用dubbo协议在20880端口暴露服务 -->
<dubbo:protocol name="dubbo" port="20880"/>
<!-- 声明需要暴露的服务接口 -->
<bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/>
<!-- 和本地bean一样实现服务 -->
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/>
dubbo实现通过下面的配置schema自定义配置
完成一个spring的自定义配置一般需要以下5个步骤:
- 设计配置属性和JavaBean
- 编写XSD文件 全称就是 XML Schema 它就是校验XML,定义了一些列的语法来规范XML
- 编写NamespaceHandler和BeanDefinitionParser完成解析工作
- 编写两个类spring.handlers和spring.schemas串联起所有部件
- 在Bean文件中应用
详细步骤链接:spring的自定义配置
学习spring如何实现自定义配置后,我们再来看看dubbo的代码,差不多是一样的
<!-- 和本地bean一样实现服务 -->
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/>
dubbo.xsd,作用是约束interface和ref属性
<xsd:element name="service" type="serviceType">
<xsd:annotation>
<xsd:documentation><![CDATA[ Export service config ]]></xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:complexType name="serviceType">
<xsd:complexContent>
<xsd:extension base="abstractServiceType">
<xsd:choice minOccurs="0" maxOccurs="unbounded">
<xsd:element ref="method" minOccurs="0" maxOccurs="unbounded"/>
<xsd:element ref="parameter" minOccurs="0" maxOccurs="unbounded"/>
<xsd:element ref="beans:property" minOccurs="0" maxOccurs="unbounded"/>
</xsd:choice>
<xsd:attribute name="interface" type="xsd:token" use="required">
<xsd:annotation>
<xsd:documentation>
<![CDATA[ Defines the interface to advertise for this service in the service registry. ]]></xsd:documentation>
<xsd:appinfo>
<tool:annotation>
<tool:expected-type type="java.lang.Class"/>
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="ref" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation>
<![CDATA[ The service implementation instance bean id. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
然后就是处理器DubboNamespaceHandler,将初始化时将各种bean注册到解析器上,将配置文件的值赋值到bean上
public class DubboNamespaceHandler extends NamespaceHandlerSupport {
static {
Version.checkDuplicate(DubboNamespaceHandler.class);
}
public void init() {
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));
}
}
解析器DubboBeanDefinitionParser,代码太长就不贴了
最后也是通过两个类spring.handlers和spring.schemas串联起所有部件
spring.handlers
http\://code.alibabatech.com/schema/dubbo=com.alibaba.dubbo.config.spring.schema.DubboNamespaceHandler
spring.schemas
http\://code.alibabatech.com/schema/dubbo/dubbo.xsd=META-INF/dubbo.xsd
(7)服务发现 - 原理探索
启动服务提供者的时候通过打印出来的日志知道整个服务发现流程:
第一个发布的动作:暴露本地服务
Export dubbo service com.alibaba.dubbo.demo.DemoService to local registry, dubbo version: 2.0.0, current host: 127.0.0.1
第二个发布动作:暴露远程服务
Export dubbo service com.alibaba.dubbo.demo.DemoService to url dubbo://192.168.100.38:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&owner=william&pid=8484&side=provider×tamp=1473908495465, dubbo version: 2.0.0, current host: 127.0.0.1
Register dubbo service com.alibaba.dubbo.demo.DemoService url dubbo://192.168.100.38:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&monitor=dubbo%3A%2F%2F192.168.48.117%3A2181%2Fcom.alibaba.dubbo.registry.RegistryService%3Fapplication%3Ddemo-provider%26backup%3D192.168.48.120%3A2181%2C192.168.48.123%3A2181%26dubbo%3D2.0.0%26owner%3Dwilliam%26pid%3D8484%26protocol%3Dregistry%26refer%3Ddubbo%253D2.0.0%2526interface%253Dcom.alibaba.dubbo.monitor.MonitorService%2526pid%253D8484%2526timestamp%253D1473908495729%26registry%3Dzookeeper%26timestamp%3D1473908495398&owner=william&pid=8484&side=provider×tamp=1473908495465 to registry registry://192.168.48.117:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&backup=192.168.48.120:2181,192.168.48.123:2181&dubbo=2.0.0&owner=william&pid=8484®istry=zookeeper×tamp=1473908495398, dubbo version: 2.0.0, current host: 127.0.0.1
第三个发布动作:启动netty
Start NettyServer bind /0.0.0.0:20880, export /192.168.100.38:20880, dubbo version: 2.0.0, current host: 127.0.0.1
第四个发布动作:打开连接zk
INFO zookeeper.ClientCnxn: Opening socket connection to server /192.168.48.117:2181
第五个发布动作:到zk注册
Register: dubbo://192.168.100.38:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&owner=william&pid=8484&side=provider×tamp=1473908495465, dubbo version: 2.0.0, current host: 127.0.0.1
第六个发布动作;监听zk
Subscribe: provider://192.168.100.38:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&owner=william&pid=8484&side=provider×tamp=1473908495465, dubbo version: 2.0.0, current host: 127.0.0.1
Notify urls for subscribe url provider://192.168.100.38:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&owner=william&pid=8484&side=provider×tamp=1473908495465, urls: [empty://192.168.100.38:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&owner=william&pid=8484&side=provider×tamp=1473908495465], dubbo version: 2.0.0, current host: 127.0.0.1
暴露本地服务和暴露远程服务的区别是什么?
- 暴露本地服务:指暴露在用一个JVM里面,不用通过调用zk来进行远程通信。例如:在同一个服务,自己调用自己的接口,就没必要进行网络IP连接来通信。
- 暴露远程服务:指暴露给远程客户端的IP和端口号,通过网络来实现通信。
ServiceBean.onApplicationEvent
-->export()
-->ServiceConfig.export()
-->doExport()
-->doExportUrls()//里面有一个for循环,代表了一个服务可以有多个通信协议,例如 tcp协议 http协议,默认是tcp协议
-->loadRegistries(true)//从dubbo.properties里面组装registry的url信息
-->doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs)
//配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
-->exportLocal(URL url)
-->proxyFactory.getInvoker(ref, (Class) interfaceClass, local)
-->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension("javassist");
-->extension.getInvoker(arg0, arg1, arg2)
-->StubProxyFactoryWrapper.getInvoker(T proxy, Class<T> type, URL url)
-->proxyFactory.getInvoker(proxy, type, url)
-->JavassistProxyFactory.getInvoker(T proxy, Class<T> type, URL url)
-->Wrapper.getWrapper(com.alibaba.dubbo.demo.provider.DemoServiceImpl)
-->makeWrapper(Class<?> c)
-->return new AbstractProxyInvoker<T>(proxy, type, url)
-->protocol.export
-->Protocol$Adpative.export
-->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension("injvm");
-->extension.export(arg0)
-->ProtocolFilterWrapper.export
-->buildInvokerChain //创建8个filter
-->ProtocolListenerWrapper.export
-->InjvmProtocol.export
-->return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap)
-->目的:exporterMap.put(key, this)//key=com.alibaba.dubbo.demo.DemoService, this=InjvmExporter
//如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务)
-->proxyFactory.getInvoker//原理和本地暴露一样都是为了获取一个Invoker对象
-->protocol.export(invoker)
-->Protocol$Adpative.export
-->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension("registry");
-->extension.export(arg0)
-->ProtocolFilterWrapper.export
-->ProtocolListenerWrapper.export
-->RegistryProtocol.export
-->doLocalExport(originInvoker)
-->getCacheKey(originInvoker);//读取 dubbo://192.168.100.51:20880/
-->rotocol.export
-->Protocol$Adpative.export
-->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension("dubbo");
-->extension.export(arg0)
-->ProtocolFilterWrapper.export
-->buildInvokerChain//创建8个filter
-->ProtocolListenerWrapper.export
---------1.netty服务暴露的开始------- -->DubboProtocol.export
-->serviceKey(url)//组装key=com.alibaba.dubbo.demo.DemoService:20880
-->目的:exporterMap.put(key, this)//key=com.alibaba.dubbo.demo.DemoService:20880, this=DubboExporter
-->openServer(url)
-->createServer(url)
--------2.信息交换层 exchanger 开始-------------->Exchangers.bind(url, requestHandler)//exchaanger是一个信息交换层
-->getExchanger(url)
-->getExchanger(type)
-->ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension("header")
-->HeaderExchanger.bind
-->Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
-->new HeaderExchangeHandler(handler)//this.handler = handler
-->new DecodeHandler
-->new AbstractChannelHandlerDelegate//this.handler = handler;
---------3.网络传输层 transporter--------------------->Transporters.bind
-->getTransporter()
-->ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension()
-->Transporter$Adpative.bind
-->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension("netty");
-->extension.bind(arg0, arg1)
-->NettyTransporter.bind
--new NettyServer(url, listener)
-->AbstractPeer //this.url = url; this.handler = handler;
-->AbstractEndpoint//codec timeout=1000 connectTimeout=3000
-->AbstractServer //bindAddress accepts=0 idleTimeout=600000
---------4.打开断开,暴露netty服务-------------------------------->doOpen()
-->设置 NioServerSocketChannelFactory boss worker的线程池 线程个数为3
-->设置编解码 hander
-->bootstrap.bind(getBindAddress())
-->new HeaderExchangeServer
-->this.server=NettyServer
-->heartbeat=60000
-->heartbeatTimeout=180000
-->startHeatbeatTimer()//这是一个心跳定时器,采用了线程池,如果断开就心跳重连。
-->getRegistry(originInvoker)//zk 连接
-->registryFactory.getRegistry(registryUrl)
-->ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension("zookeeper");
-->extension.getRegistry(arg0)
-->AbstractRegistryFactory.getRegistry//创建一个注册中心,存储在REGISTRIES
-->createRegistry(url)
-->new ZookeeperRegistry(url, zookeeperTransporter)
-->AbstractRegistry
-->loadProperties()//目的:把C:\Users\bobo\.dubbo\dubbo-registry-192.168.48.117.cache
文件中的内容加载为properties
-->notify(url.getBackupUrls())//不做任何事
-->FailbackRegistry
-->retryExecutor.scheduleWithFixedDelay(new Runnable()//建立线程池,检测并连接注册中心,如果失败了就重连
-->ZookeeperRegistry
-->zookeeperTransporter.connect(url)
-->ZookeeperTransporter$Adpative.connect(url)
-->ExtensionLoader.getExtensionLoader(ZookeeperTransporter.class).getExtension("zkclient");
-->extension.connect(arg0)
-->ZkclientZookeeperTransporter.connect
-->new ZkclientZookeeperClient(url)
-->AbstractZookeeperClient
-->ZkclientZookeeperClient
-->new ZkClient(url.getBackupAddress());//连接ZK
-->client.subscribeStateChanges(new IZkStateListener()//订阅的目标:连接断开,重连
-->zkClient.addStateListener(new StateListener()
-->recover //连接失败 重连
-->registry.register(registedProviderUrl)//创建节点
-->AbstractRegistry.register
-->FailbackRegistry.register
-->doRegister(url)//向zk服务器端发送注册请求
-->ZookeeperRegistry.doRegister
-->zkClient.create
-->AbstractZookeeperClient.create//dubbo/com.alibaba.dubbo.demo.DemoService/providers/
dubbo%3A%2F%2F192.168.100.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26
application%3Ddemo-provider%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3D
com.alibaba.dubbo.demo.DemoService%26loadbalance%3Droundrobin%26methods%3DsayHello%26owner%3
Dwilliam%26pid%3D2416%26side%3Dprovider%26timestamp%3D1474276306353
-->createEphemeral(path);//临时节点 dubbo%3A%2F%2F192.168.100.52%3A20880%2F.............
-->createPersistent(path);//持久化节点 dubbo/com.alibaba.dubbo.demo.DemoService/providers
-->registry.subscribe//订阅ZK
-->AbstractRegistry.subscribe
-->FailbackRegistry.subscribe
-->doSubscribe(url, listener)// 向服务器端发送订阅请求
-->ZookeeperRegistry.doSubscribe
-->new ChildListener()
-->实现了 childChanged
-->实现并执行 ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
//A
-->zkClient.create(path, false);//第一步:先创建持久化节点/dubbo/com.alibaba.dubbo.demo.DemoService/configurators
-->zkClient.addChildListener(path, zkListener)
-->AbstractZookeeperClient.addChildListener
//C
-->createTargetChildListener(path, listener)//第三步:收到订阅后的处理,交给FailbackRegistry.notify处理
-->ZkclientZookeeperClient.createTargetChildListener
-->new IZkChildListener()
-->实现了 handleChildChange //收到订阅后的处理
-->listener.childChanged(parentPath, currentChilds);
-->实现并执行ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
-->收到订阅后处理 FailbackRegistry.notify
//B
-->addTargetChildListener(path, targetListener)第二步
-->ZkclientZookeeperClient.addTargetChildListener
-->client.subscribeChildChanges(path, listener)//第二步:启动加入订阅/dubbo/com.alibaba.dubbo.demo.DemoService/configurators
-->notify(url, listener, urls)
-->FailbackRegistry.notify
-->doNotify(url, listener, urls);
-->AbstractRegistry.notify
-->saveProperties(url);//把服务端的注册url信息更新到C:\Users\bobo\.dubbo\dubbo-registry-192.168.48.117.cache
-->registryCacheExecutor.execute(new SaveProperties(version));//采用线程池来处理
-->listener.notify(categoryList)
-->RegistryProtocol.notify
-->RegistryProtocol.this.getProviderUrl(originInvoker)//通过invoker的url 获取 providerUrl的地址
重要概念
1、proxyFactory:为了获取一个接口的代理类,例如获取一个远程接口的代理
它有2个方法,代表2个作用:
- getInvoker():针对server端,将服务对象,如DemoServiceImpl包装成一个Invoker对象
- getProxy():针对client端,创建接口的代理对象,例如DemoService的接口
2、Wrapper:它类似spring的beanWrapper,它就是包装了一个接口或一个类,可以通过wrapper对实例对象进行赋值以及制定方法的调用
3、Invoker:一个可执行的对象,能够根据方法的名称、参数得到相应的执行结果。
它里面有一个很重要的方法Result invoke(Invocation invocation),Invocation是包含了需要执行的方法和参数等重要信息,目前只有两个实现类。RpcInvocation 、MockInvocation
它有三种类型的Invoker:
- 本地执行的Invoker
- 远程通信的Invoker
- 多个远程通信执行类的Invoker聚合成集群版的Invoker
4、Protocol
- export:暴露远程服务(用于服务端),就是将proxyFactory.getInvoker创建的代理类invoker对象,通过协议暴露给外部
- refer:引用远程服务(用于客户端),通过proxyFactory.getProxy来创建远程的动态代理类,例如DemoDemoService的接口
5、exporter:维护invoder的生命周期
6、exchanger:信息交换层,封装请求相应模式,同步转异步
7、transporter:网络传输层,用来抽象netty和mina的统一接口
(8)服务发现 - netty的服务暴露
dubbo 做为 RPC 框架,需要进行跨 JVM 通信,要保证高性、稳定的进行远程通信。dubbo 底层通信选择了 netty 这个 nio 框架做为默认的网络通信框架并且通过自定义协议进行通信。dubbo 支持以下网络通信框架:
- Netty(默认)
- Mina
- Grizzly
为了更好了解dubbo是如何使用netty进行通信,我们先来看看netty的例子
服务端
服务端启动类,下面有详细的注释
public class Server {
private ChannelFactory factory;
public static ChannelGroup channelGroup = new DefaultChannelGroup();
public void start() {
// NioServerSocketChannelFactory用于创建基于NIO的服务端
// ServerSocketChannel。本身包含2种线程,boss线程和worker线程。
// 每个ServerSocketChannel会都会拥有自己的boss线程,
// 当一个连接被服务端接受(accepted),
// boss线程就会将接收到的Channel传递给一个worker线程处理,
// 而worker线程以非阻塞的方式为一个或多个Channel提供非阻塞的读写
factory = new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(), // boss线程池
Executors.newCachedThreadPool(), // worker线程池
8); // worker线程数
// ServerBootstrap用于帮助服务器启动
ServerBootstrap bootstrap = new ServerBootstrap(factory);
// 没有child.前缀,则该选项是为ServerSocketChannel设置
bootstrap.setOption("reuseAddress", true);
// 有child.前缀,则该选项是为Channel设置
// bootstrap.setOption("child.tcpNoDelay", true);
// bootstrap.setOption("child.keepAlive", true);
// 对每一个连接(channel),server都会调用
// ChannelPipelineFactory为该连接创建一个ChannelPipeline
ServerChannelPiplineFactory channelPiplineFactory = new ServerChannelPiplineFactory();
bootstrap.setPipelineFactory(channelPiplineFactory);
// 这里绑定服务端监听的IP和端口
Channel channel = bootstrap.bind(new InetSocketAddress("127.0.0.1",8000));
Server.channelGroup.add(channel);
System.out.println("Server is started...");
}
public void stop() {
// ChannelGroup为其管理的Channels提供一系列的批量操作
// 关闭的Channel会自动从ChannelGroup中移除
ChannelGroupFuture channelGroupFuture = Server.channelGroup.close();
channelGroupFuture.awaitUninterruptibly();
factory.releaseExternalResources();
System.out.println("Server is stopped.");
}
public static void main(String[] args) throws Exception {
Server server = new Server();
server.start();
Thread.sleep(300 * 1000);
server.stop();
}
}
管道工厂类,设置了netty自带的编解码,还设置了逻辑处理类
public class ServerChannelPiplineFactory implements ChannelPipelineFactory {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline channelPipeline = Channels.pipeline();
channelPipeline.addLast("decoder", new StringDecoder());
channelPipeline.addLast("encoder", new StringEncoder());
channelPipeline.addLast("handler", new ServerLogicHandler());
return channelPipeline;
}
}
服务端逻辑处理类,里面主要有三个方法,channelConnected连接、messageReceived接收和exceptionCaught异常处理
public class ServerLogicHandler extends SimpleChannelHandler {
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
System.out.println("######channelConnected");
// channel group is thread safe
Server.channelGroup.add(e.getChannel());
System.out.println(e.getChannel().toString());
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
System.out.println("######messageReceived");
// 经过了ServerReadDecoder的处理,这里可以直接得到String类型的message
String msg = (String)e.getMessage();
System.out.println("The message sent by client is : " + msg);
Channel ch = e.getChannel();
String str = "Hi, Client.";
// 由于IO操作是异步的,当方法返回时并不能保证IO操作一定完成了
// 因此返回一个ChannelFuture对象实例
// 该实例中保存了IO操作的状态信息
ChannelFuture cf = ch.write(str);
// 为ChannelFuture对象实例添加监听,如果数据发送完毕则关闭连接
cf.addListener(new ChannelFutureListener(){
@Override
public void operationComplete(ChannelFuture future)
throws Exception {
Channel ch = future.getChannel();
ch.close();
}
});
System.out.println("The message has sent to client.");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
e.getCause().printStackTrace();
Channel ch = e.getChannel();
ch.close();
}
}
客户端
客户端启动类
public class Client {
public static void main(String[] args){
// 同服务端相同,只是这里使用的是NioClientSocketChannelFactory
final ChannelFactory factory = new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool(),
8);
// ClientBootstrap用于帮助客户端启动
ClientBootstrap bootstrap = new ClientBootstrap(factory);
// 由于客户端不包含ServerSocketChannel,所以参数名不能带有child.前缀
bootstrap.setOption("tcpNoDelay", true);
// bootstrap.setOption("keepAlive", true);
//设置管道工厂,里面也设置了编解码
bootstrap.setPipelineFactory(new ChannelPipelineFactory(){
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline channelPipeline = Channels.pipeline();
channelPipeline.addLast("decoder", new StringDecoder());
channelPipeline.addLast("encoder", new StringEncoder());
channelPipeline.addLast("handler", new ClientLogicHandler());
return channelPipeline;
}
});
// 这里连接服务端绑定的IP和端口 ,连接服务端
bootstrap.connect(new InetSocketAddress("127.0.0.1", 8000));
System.out.println("Client is started...");
}
}
客户端逻辑处理类,里面同样主要有三个方法,channelConnected连接、messageReceived接收和exceptionCaught异常处理
public class ClientLogicHandler extends SimpleChannelHandler {
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
System.out.println("######channelConnected");
Channel ch = e.getChannel();
String msg = "Hi, Server.by agan";
ch.write(msg);
}
@Override
public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e)
throws Exception {
System.out.println("######writeComplete");
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
System.out.println("######messageReceived");
String msg = (String)e.getMessage();
System.out.println("The message gotten from server is : " + msg);
ChannelFuture channelFuture = e.getChannel().close();
channelFuture.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
e.getCause().printStackTrace();
Channel ch = e.getChannel();
ch.close();
}
}
(9)zookeeper的连接-创建-订阅
(10)dubbo如何连接zookeeper?
(11)dubbo如何创建zookeeper节点?
(12)dubbo如何订阅zookeeper信息?
(13)服务发布 - 整体架构设计图
(14)服务引用的设计原理
ReferenceBean.getObject()
-->ReferenceConfig.get()
-->init()
-->createProxy(map)
-->refprotocol.refer(interfaceClass, urls.get(0))
-->ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("registry");
-->extension.refer(arg0, arg1);
-->ProtocolFilterWrapper.refer
-->RegistryProtocol.refer
-->registryFactory.getRegistry(url)//建立zk的连接,和服务端发布一样(省略代码)
-->doRefer(cluster, registry, type, url)
-->registry.register//创建zk的节点,和服务端发布一样(省略代码)。节点名为:dubbo/com.alibaba.dubbo.demo.DemoService/consumers
-->registry.subscribe//订阅zk的节点,和服务端发布一样(省略代码)。 /dubbo/com.alibaba.dubbo.demo.DemoService/providers,
/dubbo/com.alibaba.dubbo.demo.DemoService/configurators,
/dubbo/com.alibaba.dubbo.demo.DemoService/routers]
-->notify(url, listener, urls);
-->FailbackRegistry.notify
-->doNotify(url, listener, urls);
-->AbstractRegistry.notify
-->saveProperties(url);//把服务端的注册url信息更新到C:\Users\bobo\.dubbo\dubbo-registry-192.168.48.117.cache
-->registryCacheExecutor.execute(new SaveProperties(version));//采用线程池来处理
-->listener.notify(categoryList)
-->RegistryDirectory.notify
-->refreshInvoker(invokerUrls)//刷新缓存中的invoker列表
-->destroyUnusedInvokers(oldUrlInvokerMap,newUrlInvokerMap); // 关闭未使用的Invoker
-->最终目的:刷新Map<String, Invoker<T>> urlInvokerMap 对象
刷新Map<String, List<Invoker<T>>> methodInvokerMap对象
-->cluster.join(directory)//加入集群路由
-->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension("failover");
-->MockClusterWrapper.join
-->this.cluster.join(directory)
-->FailoverCluster.join
-->return new FailoverClusterInvoker<T>(directory)
-->new MockClusterInvoker
-->proxyFactory.getProxy(invoker)//创建服务代理
-->ProxyFactory$Adpative.getProxy
-->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension("javassist");
-->StubProxyFactoryWrapper.getProxy
-->proxyFactory.getProxy(invoker)
-->AbstractProxyFactory.getProxy
-->getProxy(invoker, interfaces)
-->Proxy.getProxy(interfaces)//目前代理对象interface com.alibaba.dubbo.demo.DemoService, interface com.alibaba.dubbo.rpc.service.EchoService
-->InvokerInvocationHandler// 采用jdk自带的InvocationHandler,创建InvokerInvocationHandler对象。
(15)服务引用 - 整体架构设计图
(17)集群容错之directory目录
通过目录来查找服务,它代表多个invoker,从methodInvokerMap提取,但是他的值是动态,例如注册中心的变更,下面是dubbo集群容错的设计图
public interface Directory<T> extends Node {
/**
* get service type.
*
* @return service type.
*/
Class<T> getInterface();
/**
* list invokers.
*
* @return invokers
*/
List<Invoker<T>> list(Invocation invocation) throws RpcException;
}
Directory有两个实现类,一个静态Directory(不常用)、一个注册中心Directory
Directory目录服务
- StaticDirectory:静态目录服务,他的Invoker是固定的
- RegistryDirectory:注册目录服务,他的Invoker集合数据来源于zk注册中心,他实现了NotifyListener接口,并且实现回调函数 notify(List< URL > urls),整个过程有一个重要的map变量,methodInvokerMap(它是数据的来源,同时也是notify的重要操作对象;重点是写操作),注册中心有变更就刷新map变量,通过doList()来读,通过notify()来写
(18)集群容错之router路由规则
在 dubbo 中路由规则决定一次服务调用的目标服务器,分为条件路由规则和脚本路由规则,并且支持可扩展(SPI)。
public interface Router extends Comparable<Router> {
URL getUrl();
<T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}
调用 route 方法,传入从目录服务获取到的 Invoke 列表,通过 URL 或者 Invocation 里面配置的条件(路由规则)筛选出满足条件的 Invoke 列表。例如应用隔离或读写分离或灰度发布等等
下面是 dubbo 路由服务的类图:
第一个问题:启动路由规则,它触发哪些动作?
a. 什么时候加入ConditionRouter?
答:默认是MockInvokersSelector,只要修改后台管理或注册中心改变的时候就加入ConditionRouter
b. ConditionRouter是怎么过滤的?
第二个问题:路由规则有哪些实现类?
MockInvokersSelector:默认
ConditionRouter:条件路由,后台管理的路由配置都是条件路由,默认是MockInvokersSelector,只要修改后台管理或注册中心改变的时候就加入ConditionRouter
ScriptRouter:脚本路由
dubbo 默认会在 AbstractDirectory#setRouters 自动添加 MockInvokersSelector 路由规则。
MockInvokersSelector
MockInvokersSelector:其实就是用于路由 Mock 服务与非 Mock 服务。
public <T> List<Invoker<T>> route(final List<Invoker<T>> invokers,
URL url, final Invocation invocation) throws RpcException {
if (invocation.getAttachments() == null) {
return getNormalInvokers(invokers);
} else {
String value = invocation.getAttachments().get(Constants.INVOCATION_NEED_MOCK);
if (value == null)
return getNormalInvokers(invokers);
else if (Boolean.TRUE.toString().equalsIgnoreCase(value)) {
return getMockedInvokers(invokers);
}
}
return invokers;
}
上面的代码逻辑其实就是:
- 如果 Invocation 的扩展参数不为空 并且 Invocation 的扩展参数里面包含 invocation.need.mock 参数并且值为 true 就获取 Invoke 列表里面 protocol 为 mock 的 Invoke 列表。
- 否则获取Invoke 列表里面 protocol 为非 mock 的 Invoke 列表。
ConditionRouter
ConditionRouter:基于条件表达式的路由规则,它的条件规则如下:
- => 之前的为消费者匹配条件,所有参数和消费者的 URL 进行对比,当消费者满足匹配条件时,对该消费者执行后面的过滤规则。
- => 之后为提供者地址列表的过滤条件,所有参数和提供者的 URL 进行对比,消费者最终只拿到过滤后的地址列表。
- 如果匹配条件为空,表示对所有消费方应用,如:=> host != 10.20.153.11
- 如果过滤条件为空,表示禁止访问,如:host = 10.20.153.10 =>
参数支持:
- 服务调用信息,如:method, argument 等,暂不支持参数路由
- URL 本身的字段,如:protocol, host, port 等
- 以及 URL 上的所有参数,如:application, organization 等
条件支持:
- 等号 = 表示”匹配”,如:host = 10.20.153.10
- 不等号 != 表示”不匹配”,如:host != 10.20.153.10
值支持:
- 以逗号 , 分隔多个值,如:host != 10.20.153.10,10.20.153.11
- 以星号 结尾,表示通配,如:host != 10.20.
- 以美元符 $ 开头,表示引用消费者参数,如:host = $host
ScriptRouter
ScriptRouter:脚本路由规则,脚本路由规则支持 JDK 脚本引擎的所有脚本,比如:javascript, jruby, groovy 等,通过 type=javascript 参数设置脚本类型,缺省为 javascript。
基于脚本引擎的路由规则,如:
(function route(invokers) {
var result = new java.util.ArrayList(invokers.size());
for (i = 0; i < invokers.size(); i ++) {
if ("10.20.153.10".equals(invokers.get(i).getUrl().getHost())) {
result.add(invokers.get(i));
}
}
return result;
} (invokers)); // 表示立即执行方法
Route 功能
通过配置不同的 Route 规则,我们可以实现以下功能。
排除预发布机:
=> host != 172.22.3.91
白名单:
host != 10.20.153.10,10.20.153.11 =>
黑名单:
host = 10.20.153.10,10.20.153.11 =>
服务寄宿在应用上,只暴露一部分的机器,防止整个集群挂掉:
=> host = 172.22.3.1*,172.22.3.2*
为重要应用提供额外的机器:
application != kylin => host != 172.22.3.95,172.22.3.96
读写分离:
method = find*,list*,get*,is* => host = 172.22.3.94,172.22.3.95,172.22.3.96
method != find*,list*,get*,is* => host = 172.22.3.97,172.22.3.98
前后台分离:
application = bops => host = 172.22.3.91,172.22.3.92,172.22.3.93
application != bops => host = 172.22.3.94,172.22.3.95,172.22.3.96
隔离不同机房网段:
host != 172.22.3.* => host != 172.22.3.*
提供者与消费者部署在同集群内,本机只访问本机的服务:
=> host = $host
灰度发布例子:
灰度发布是指在黑与白之间,能够平滑过渡的一种发布方式。AB test就是一种灰度发布方式,让一部分用户继续用A,一部分用户开始用B,如果用户对B没有什么反对意见,那么逐步扩大范围,把所有用户都迁移到B上面来。灰度发布可以保证整体系统的稳定,在初始灰度的时候就可以发现、调整问题,以保证其影响度。
provider 192.168.100.38 192.168.48.32
- 发布192.168.48.32,切断192.168.48.32访问流量(不匹配, 已禁用),然后进行服务的发布。
- 192.168.48.32发布成功后,恢复192.168.48.32的流量,切断192.168.100.38,继续发布192.168.100.38
(19)集群容错之cluster集群
Cluster将Directory中的多个Invoker伪装成一个Invoker来容错,调用失败重试。
在服务引用在加入join()方法
@SPI(FailoverCluster.NAME)//失败转移,当失败的时候重试其他服务器
public interface Cluster {
/**
* Merge the directory invokers to a virtual invoker.
*
* @param <T>
* @param directory
* @return cluster invoker
* @throws RpcException
*/
@Adaptive
<T> Invoker<T> join(Directory<T> directory) throws RpcException;
}
Cluster有八个实现类,也就是有八个集群算法
- FailoverCluster:(默认)失败转移,当出现失败,重试其它服务器,通常用于读操作,但重试会带来更长延迟。
- FailfastCluster:快速失败,只发起一次调用,失败立即报错,通常用于非幂等性的写操作。
- FailbackCluster:失败自动恢复,后台记录失败请求,定时重发,通常用于消息通知操作。
- FailsafeCluster:失败安全,出现异常时,直接忽略,通常用于写入审计日志等操作。
- ForkingCluster: 并行调用,只要一个成功即返回,通常用于实时性要求较高的操作,但需要浪费更多服务资源。
- BroadcastCluster: 广播调用。遍历所有Invokers, 逐个调用每个调用catch住异常不影响其他invoker调用
- MergeableCluster: 分组聚合, 按组合并返回结果,比如菜单服务,接口一样,但有多种实现,用group区分,现在消费方需从每种group中调用一次返回结果,合并结果返回,这样就可以实现聚合菜单项。
- AvailableCluster: 获取可用的调用。遍历所有Invokers判断Invoker.isAvalible,只要一个有为true直接调用返回,不管成不成功
失败转移源码
失败转移和快速失败的区别,是失败转移出现异常会存储异常,而快速失败出现异常会直接抛出去
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);
public FailoverClusterInvoker(Directory<T> directory) {
super(directory);
}
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
checkInvokers(copyinvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
//获取重试的次数,默认是3
int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//重试时,进行重新选择,避免重试时invoker列表已发生变化
//注意:如果列表发生变化,那么invoker判断会失效,因为invoker示例已经改变
if (i > 0) {
checkWhetherDestroyed();
copyinvokers = list(invocation);
// check again
checkInvokers(copyinvokers, invocation);
}
//从负载均衡获取一个invoker
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + methodName
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(le.getCode(), "Failed to invoke the method "
+ methodName + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ le.getMessage(), le.getCause() != null ? le.getCause() : le);
}
}
快速失败源码
快速失败,只发起一次调用,失败立即报错,通常用于非幂等性的写操作
public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {
public FailfastClusterInvoker(Directory<T> directory) {
super(directory);
}
@Override
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
try {
return invoker.invoke(invocation);
} catch (Throwable e) {
if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
throw (RpcException) e;
}
//失败转移和快速失败的区别,是失败转移出现异常会存储异常,而快速失败出现异常会直接抛出去
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,
"Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName()
+ " select from all providers " + invokers + " for service " + getInterface().getName()
+ " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion()
+ ", but no luck to perform the invocation. Last error is: " + e.getMessage(),
e.getCause() != null ? e.getCause() : e);
}
}
}
(20)集群容错之loadbalance负载均衡
loadbalance负载均衡:从多个Invoker选取一个做本次调用,具体包含很多负载均衡算法
@SPI(RandomLoadBalance.NAME)//默认是随机
public interface LoadBalance {
/**
* select one invoker in list.
*
* @param invokers invokers.
* @param url refer url
* @param invocation invocation.
* @return selected invoker.
*/
@Adaptive("loadbalance")//动态编译
<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}
loadbalance负载均衡有四个实现类
- RandomLoadBalance:随机,按权重设置随机概率。在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。
- RoundRobin LoadBalance:轮循,按公约后的权重设置轮循比率。存在慢的提供者累积请求问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。
- LeastActiveLoadBalance:最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。
- ConsistentHash LoadBalance:一致性Hash,相同参数的请求总是发到同一提供者。当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。
RoundRobin轮循算法源码
public class RoundRobinLoadBalance extends AbstractLoadBalance {
public static final String NAME = "roundrobin";
private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
int length = invokers.size(); // Number of invokers
int maxWeight = 0; // The maximum weight
int minWeight = Integer.MAX_VALUE; // The minimum weight
final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();
int weightSum = 0;
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight
minWeight = Math.min(minWeight, weight); // Choose the minimum weight
if (weight > 0) {
invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
weightSum += weight;
}
}
AtomicPositiveInteger sequence = sequences.get(key);
if (sequence == null) {
sequences.putIfAbsent(key, new AtomicPositiveInteger());
sequence = sequences.get(key);
}
int currentSequence = sequence.getAndIncrement();
if (maxWeight > 0 && minWeight < maxWeight) {
int mod = currentSequence % weightSum;
for (int i = 0; i < maxWeight; i++) {
for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
final Invoker<T> k = each.getKey();
final IntegerWrapper v = each.getValue();
if (mod == 0 && v.getValue() > 0) {
return k;
}
if (v.getValue() > 0) {
v.decrement();
mod--;
}
}
}
}
// Round robin
return invokers.get(currentSequence % length);
}
private static final class IntegerWrapper {
private int value;
public IntegerWrapper(int value) {
this.value = value;
}
public int getValue() {
return value;
}
public void setValue(int value) {
this.value = value;
}
public void decrement() {
this.value--;
}
}
}
(21)dubbo如何实现SOA的服务降级?
什么是服务开关?
先讲一下开关的由来,例如淘宝在11月11日做促销活动,在交易下单环节,可能需要调用A、B、C三个接口来完成,但是其实A和B是必须的, C只是附加的功能(例如在下单的时候做一下推荐,或push消息),可有可无,在平时系统没有压力,容量充足的情况下,调用下没问题,但是在类似店庆之类的大促环节, 系统已经满负荷了,这时候其实完全可以不去调用C接口,怎么实现这个呢? 改代码?
什么是服务降级
服务降级,当服务器压力剧增的情况下,根据当前业务情况及流量对一些服务和页面有策略的降级,以此释放服务器资源以保证核心任务的正常运行。
dubbo如何实现服务降级?
1、容错:当系统出现非业务异常(比如并发数太高导致超时,网络异常等)时,不对该接口进行处理。(不可知)
mock=fail:return null
2、 屏蔽:在大促,促销活动的可预知情况下,例如双11活动。采用直接屏蔽接口访问。(可知)
mock=force:return null
(22)网络通信-consumer发送原理
dubbo的调用链图,也可以说网络通信架构图
demoService.sayHello("world" + i)
-->InvokerInvocationHandler.invoke
-->invoker.invoke
-->RpcInvocation//所有请求参数都会转换为RpcInvocation
-->MockClusterInvoker.invoke //1.进入集群
-->invoker.invoke(invocation)
-->AbstractClusterInvoker.invoke
-->list(invocation)
-->directory.list//2.进入目录查找 从this.methodInvokerMap里面查找一个Invoker
-->AbstractDirectory.list
-->doList(invocation)
-->RegistryDirectory.doList// 从this.methodInvokerMap里面查找一个Invoker
-->router.route //3.进入路由
-->MockInvokersSelector.route
-->getNormalInvokers
-->ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("roundrobin")
-->doInvoke
-->FailoverClusterInvoker.doInvoke
-->select//4.进入负载均衡
-->AbstractClusterInvoker.select
-->doselect
-->loadbalance.select
-->AbstractLoadBalance.select
-->doSelect
-->RoundRobinLoadBalance.doSelect
-->invokers.get(currentSequence % length)//取模轮循
-->Result result = invoker.invoke(invocation)
--------------------------------------------------------------------------扩展点----------------
-->InvokerWrapper.invoke
-->ProtocolFilterWrapper.invoke
-->ConsumerContextFilter.invoke
-->ProtocolFilterWrapper.invoke
-->MonitorFilter.invoke
-->ProtocolFilterWrapper.invoke
-->FutureFilter.invoke
-->ListenerInvokerWrapper.invoke
-->AbstractInvoker.invoke
---------------------------------------------------------------------------扩展点---------------
-->doInvoke(invocation)
-->DubboInvoker.doInvoke//为什么DubboInvoker是个protocol? 因为RegistryDirectory.refreshInvoker.toInvokers: protocol.refer
-->ReferenceCountExchangeClient.request
-->HeaderExchangeClient.request
-->HeaderExchangeChannel.request
-->NettyClient.send
-->AbstractPeer.send
-->NettyChannel.send
-->ChannelFuture future = channel.write(message);//最终的目的:通过netty的channel发送网络数据
//consumer的接收原理
NettyHandler.messageReceived
-->AbstractPeer.received
-->MultiMessageHandler.received
-->HeartbeatHandler.received
-->AllChannelHandler.received
-->ChannelEventRunnable.run //线程池 执行线程
-->DecodeHandler.received
-->HeaderExchangeHandler.received
-->handleResponse(channel, (Response) message);
-->HeaderExchangeHandler.handleResponse
-->DefaultFuture.received
-->DefaultFuture.doReceived
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
(23)网络通信-provider的接收与发送原理
NettyHandler.messageReceived
-->AbstractPeer.received
-->MultiMessageHandler.received
-->HeartbeatHandler.received
-->AllChannelHandler.received
-->ChannelEventRunnable.run //线程池 执行线程
-->DecodeHandler.received
-->HeaderExchangeHandler.received
-->handleRequest(exchangeChannel, request)//网络通信接收处理
-->DubboProtocol.reply
-->getInvoker
-->exporterMap.get(serviceKey)//从服务暴露里面提取
-->DubboExporter.getInvoker()//最终得到一个invoker
-------------------------------------------------------------------------扩展点--------------
-->ProtocolFilterWrapper.invoke
-->EchoFilter.invoke
-->ClassLoaderFilter.invoke
-->GenericFilter.invoke
-->TraceFilter.invoke
-->MonitorFilter.invoke
-->TimeoutFilter.invoke
-->ExceptionFilter.invoke
-->InvokerWrapper.invoke
-------------------------------------------------------------------------扩展点--------------
-->AbstractProxyInvoker.invoke
-->JavassistProxyFactory.AbstractProxyInvoker.doInvoke
--> 进入真正执行的实现类 DemoServiceImpl.sayHello
....................................
-->channel.send(response);//把接收处理的结果,发送回去
-->AbstractPeer.send
-->NettyChannel.send
-->ChannelFuture future = channel.write(message);//数据发回consumer
(24)网络通信-consumer的接收原理
(25)如何把网络通信的IO异步变同步?
先讲一下单工、全双工 、半双工 区别
- 单工:在同一时间只允许一方向另一方传送信息,而另一方不能向一方传送
- 全双工:是指在发送数据的同时也能够接收数据,两者同步进行,这好像我们平时打电话一样,说话的同时也能够听到对方的声音。目前的网卡一般都支持全双工。
- 半双工:所谓半双工就是指一个时间段内只有一个动作发生,举个简单例子,一条窄窄的马路,同时只能有一辆车通过,当目前有两量车对开,这种情况下就只能一辆先过,等到头后另一辆再开,这个例子就形象的说明了半双工的原理。
dubbo 是基于netty NIO的非阻塞 并行调用通信。 (阻塞 非阻塞 异步 同步 区别 )
dubbo从头到脚都是异步的
dubbo 的通信方式 有3类类型:
1、异步,无返回值
这种请求最简单,consumer 把请求信息发送给 provider 就行了。只是需要在 consumer 端把请求方式配置成异步请求就好了。如下:
<dubbo:method name="sayHello" return="false"></dubbo:method>
2、 异步,有返回值
这种情况下consumer首先把请求信息发送给provider,这个时候在consumer端不仅把请求方式配置成异步,并且需要RpcContext这个ThreadLocal对象获取到Future对象,然后通过Future#get( )阻塞式获取provider的相应,那么这个Future是如何添加到RpcContext中呢?
在第二小节讲服务发送的时候, 在 DubboInvoke 里面有三种调用方式,之前只具体请求了同步请求的发送方式而且没有异步请求的发送。异步请求发送代码如下:
DubboInvoker#doInvoke 中的 else if (isAsync) 分支
ResponseFuture future = currentClient.request(inv, timeout);
FutureAdapter<T> futureAdapter = new FutureAdapter<>(future);
RpcContext.getContext().setFuture(futureAdapter);
Result result;
if (RpcUtils.isAsyncFuture(getUrl(), inv)) {
result = new AsyncRpcResult<>(futureAdapter);
} else {
result = new RpcResult();
}
return result;
上面的代码逻辑是直接发送请求到 provider 返回一个 ResponseFuture 实例,然后把这个 Future 对象保存到 RpcContext#LOCAL 这个 ThreadLocal 当前线程对象当中,并且返回一个空的 RpcResult对象。如果要获取到 provider响应的信息,需要进行以下操作:
// 拿到调用的Future引用,当结果返回后,会被通知和设置到此Future
Future<String> temp= RpcContext.getContext().getFuture();
// 同理等待bar返回
hello=temp.get();
配置为异步,有返回值
<dubbo:method name="sayHello" async="true"></dubbo:method>
3、异步,变同步(默认的通信方式)
异步,变同步其实原理和异步请求的通过 Future#get 等待 provider 响应返回一样,只不过异步有返回值是显示调用而默认是 dubbo 内部把这步完成了。
A. 当前线程怎么让它 “暂停,等结果回来后,再执行”?
B. socket是一个全双工的通信方式,那么在多线程的情况下,如何知道那个返回结果对应原先那条线程的调用?
通过一个全局唯一的ID来做consumer 和 provider 来回传输。
我们都知道在 consumer 发送请求的时候会调用 HeaderExchangeChannel#request 方法:
HeaderExchangeChannel#request
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
它首先会通过 dubbo 自定义的 Channel、Request 与 timeout(int) 构造一个 DefaultFuture 对象。然后再通过 NettyChannel 发送请求到 provider,最后返回这个 DefaultFuture。下面我们来看一下通过构造方法是如何创建 DefaultFuture 的。我只把主要涉及到的属性展示出来:
public class DefaultFuture implements ResponseFuture {
private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>();
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
private final long id;
private final Channel channel;
private final Request request;
private final int timeout;
public DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// put into waiting map.
FUTURES.put(id, this);
CHANNELS.put(id, channel);
}
}
这个 id 是在创建 Request 的时候使用 AtomicLong#getAndIncrement 生成的。从 1 开始并且如果它一直增加直到生成负数也能保证这台机器这个值是唯一的,且不冲突的。符合唯一主键原则。 dubbo 默认同步变异步其实和异步调用一样,也是在 DubboInvoker#doInvoke 实现的。
DubboInvoker#doInvoke
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
关键就在 ResponseFuture#get 方法上面,下面我们来看一下这个方法的源码:
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
其实就是 while 循环,利用 java 的 lock 机制判断如果在超时时间范围内 DefaultFuture#response 如果赋值成不为空就返回响应,否则抛出 TimeoutException 异常。
还记得 consumer 接收 provider 响应的最后一步吗?就是 DefaultFuture#received,在 provider 端会带回 consumer请求的 id。我们来看一下它的具体处理逻辑:
public static void received(Channel channel, Response response) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}
它会从最开始通过构造函数传进去的 DefaultFuture#FUTURES 根据请求的 id 拿到 DefaultFuture ,然后根据这个 DefaultFuture 调用 DefaultFuture#doReceived 方法。通过 Java 里面的 lock 机制把 provider 的值赋值给 DefaultFuture#response。此时 consumer 也正在调用 DefaultFuture#get 方法进行阻塞,当这个 DefaultFuture#response 被赋值后,它的值就不为空。阻塞操作完成,且根据请求号的 id 把 consumer 端的 Request以及 Provider 端返回的 Response 关联了起来。
(26)dubbo的核心级概念-invoker
(27)网络通信–编码解码
什么是编码、解码?
- 编码(Encode)称为序列化(serialization),它将对象序列化为字节数组,用于网络传输、数据持久化或者其它用途。
- 解码(Decode)反序列化(deserialization)把从网络、磁盘等读取的字节数组还原成原始对象(通常是原始对象的拷贝),以方便后续的业务逻辑操作。
tcp 为什么会出现粘包 拆包的问题?
- 应用程序写入数据的字节大小大于套接字发送缓冲区的大小
- 可能是IP分片传输导致的,也可能是传输过程中丢失部分包导致出现的半包,还有可能就是一个包可能被分成了两次传输,在取数据的时候,先取到了一部分(还可能与接收的缓冲区大小有关系),总之就是一个数据包被分成了多次接收。
tcp 怎么解决粘包 拆包的问题?
- 消息的定长,例如定1000个字节
- 就是在包尾增加回车或空格等特殊字符作为切割,典型的FTP协议
- 将消息分为消息头消息体。例如 dubbo
下面我们来看一下 dubbo 的协议头约定:
dubbo 使用长度为 16 的 byte 数组作为协议头。1 个 byte 对应 8 位。所以 dubbo 的协议头有 128 位 (也就是上图的从 0 到 127)。我们来看一下这 128 位协议头分别代表什么意思。
0 ~ 7 : dubbo 魔数((short) 0xdabb) 高位,也就是 (short) 0xda。
8 ~ 15: dubbo 魔数((short) 0xdabb) 低位,也就是 (short) 0xbb。
16 ~ 20:序列化 id(Serialization id),也就是 dubbo 支持的序列化中的 contentTypeId,比如 Hessian2Serialization#ID 为 2
21 :是否事件(event )
22 : 是否 Two way 模式(Two way)。默认是 Two-way 模式,dubbo:method 标签的 return 属性配置为false,则是oneway模式
23 :标记是请求对象还是响应对象(Req/res)
24 ~ 31:response 的结果响应码 ,例如 OK=20
32 ~ 95:id(long),异步变同步的全局唯一ID,用来做consumer和provider的来回通信标记。
96 ~ 127: data length,请求或响应数据体的数据长度也就是消息头+请求数据的长度。用于处理 dubbo 通信的粘包与拆包问题。
1、 consumer请求编码
consumer 在请求 provider 的时候需要把 Request 对象转化成 byte 数组,所以它是一个需要编码的过程。
----------1------consumer请求编码----------------------
-->NettyCodecAdapter.InternalEncoder.encode
-->DubboCountCodec.encode
-->ExchangeCodec.encode
-->ExchangeCodec.encodeRequest
-->DubboCodec.encodeRequestData
2、provider 请求解码
provider 在接收 consumer 请求的时候需要把 byte 数组转化成 Request 对象,所以它是一个需要解码的过程。
----------2------provider 请求解码----------------------
--NettyCodecAdapter.InternalDecoder.messageReceived
-->DubboCountCodec.decode
-->ExchangeCodec.decode
-->ExchangeCodec.decodeBody
3、provider响应结果编码
provider 在处理完成 consumer 请求需要响应结果的时候需要把 Response 对象转化成 byte 数组,所以它是一个需要编码的过程。
----------3------provider响应结果编码----------------------
-->NettyCodecAdapter.InternalEncoder.encode
-->DubboCountCodec.encode
-->ExchangeCodec.encode
-->ExchangeCodec.encodeResponse
-->DubboCodec.encodeResponseData//先写入一个字节 这个字节可能是RESPONSE_NULL_VALUE RESPONSE_VALUE RESPONSE_WITH_EXCEPTION
4、consumer响应结果解码
consumer 在接收 provider 响应的时候需要把 byte 数组转化成 Response 对象,所以它是一个需要解码的过程。
----------4------consumer响应结果解码----------------------
--NettyCodecAdapter.InternalDecoder.messageReceived
-->DubboCountCodec.decode
-->ExchangeCodec.decode
-->DubboCodec.decodeBody
-->DecodeableRpcResult.decode//根据RESPONSE_NULL_VALUE RESPONSE_VALUE RESPONSE_WITH_EXCEPTION进行响应的处理
dubbo的消息头是一个定长的 16个字节。
第1-2个字节:是一个魔数数字:就是一个固定的数字
第3个字节:序列号组件类型,它用于和客户端约定的序列号编码号
第四个字节:它是response的结果响应码 例如 OK=20
第5-12个字节:请求id:long型8个字节。异步变同步的全局唯一ID,用来做consumer和provider的来回通信标记。
第13-16个字节:消息体的长度,也就是消息头+请求数据的长度。