Dispatcher
dubbo的Dispatcher策略:
- all 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。
- direct 所有消息都不派发到线程池,全部在 IO 线程上直接执行。
- message 只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在 IO 线程上执行。
- execution 只请求消息派发到线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在 IO 线程上执行。
- connection 在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。
Dispatcher的操作基本都是基于内部维护的一个线程池。线程池的构造是基于ThreadPool对象实现。以这个场景为例,在熟悉这个场景的同时,了解一下dubbo的SPI机制。
在provider中添加一个配置
在Dispatcher类中,dubbo根据配置来选择具体的dispatcher实现类,我们配置的是all,进入AllDispatcher
AllDispatcherl类直接调用了AllChannelHandler 。AllChannelHandler 又直接调用了父类WrappedChannelHandler的构造方法。所以我们在WrappedChannelHandler类中打下断点
AllChannelHandler 中对于connected, disconnected, received等事件都是通过线程池cexecutor来完成的,这个cexecutor的构造在它的父类—WrappedChannelHandler 的构造方法中完成。url对象作为参数被传入了线程池的构造方法。
启动系统,断点走到了线程池的构造 方法中。作为参数的url对象长这样
ExtensionLoader.getExtensionLoader()即通过spi动态的获取配置对应的ThreadPool类,这里我们配置的是cached,查看dubbo的spi配置文件
如果SPI生效,那么cached对应的应该是CachedThreadPool 这个类。在CachedThreadPool里也打一个断点,验证一下猜测对不对
果然,在释放掉WrappedChannelHandler构造方法里的断点之后,断点走到了CachedThreadPool 里。说明spi成功的根据配置走到了对应的类。方法也很简单,就是新建了一个ThreadPoolExecutor,ThreadPoolExecutor是jdk自带的线程池实现类。
SPI
ExtensionLoader.getExtensionLoader() ,记住这个方法,是dubbo框架里SPI的一个典型运用,通过给定的扩展加载器来加载需要的类。这里传入了ThreadPool的class对象,即通过配置来加载。
spi技术类似于插件,通过配置文件来对接口的实现进行配置和扩展,将程序装配的控制权移到系统代码之外。java spi的具体约定为:当服务的提供者,提供了服务接口的一种实现之后,在jar包的META-INF/services/目录里同时创建一个以服务接口命名的文件。该文件里就是实现该服务接口的具体实现类。而当外部程序装配这个模块的时候,就能通过该jar包META-INF/services/里的配置文件找到具体的实现类名,并装载实例化,完成模块的注入。
还是以ThreadPool的加载为例来看看dubbo的SPI是怎么把类给加载进来的。从ExtensionLoader.getAdaptiveExtension一路追踪。
可以看到实例是在createAdaptiveExtension方法中被加载出来。进入这个方法
先用getAdaptiveExtensionClass().newInstance()获取目标实例,然后传入injectExtension
injectExtension这个方法稍后再分析,先进入getAdaptiveExtensionClass方法中
getAdaptiveExtensionClass
在这个方法中先调用getExtensionClasses方法将配置文件里的类缓存,然后调用createAdaptiveExtensionClass。
getExtensionClasses
进入到getExtensionClasses方法中
方法中又调用了loadExtensionClasses方法。进入loadExtensionClasses方法。loadExtensionClasses主要是获取类上的SPI注解,并拿到注解的值。随后调用loadFile对配置目录进行扫描
重点关注loadFile方法
loadFile根据已经配置完成的目录加上当前的类名,加载出文件的内容,最后解析得到一个url对象。url对象中包含了我们所需要读取的配置文件的路径等信息
接下来就是BufferReader,文件按行读取
解析内容后用Class.forName,创建类的实例
将配置文件里的类全部读出来然后统一实例化,并且缓存到extensionClasses里
回到getAdaptiveExtensionClass方法中,代码进入到createAdaptiveExtensionClass
createAdaptiveExtensionClass
此时程序执行进入到createAdaptiveExtensionClass方法。
首先进入到createAdaptiveExtensionClassCode方法获取目标类的代理类的代码。
type,也就是我们要加载的类是ThreadPool类。首先反射获取了类所有的方法,并且逐一判断方法有没有Adaptive注解,很显然,ThreadPool有一个方法getExecutor,且方法上存在Adaptive注解。完全符合条件,代码继续执行。
接下来就开始咔咔拼代码,我们可以看到拼完之后的,整个字符串基本上就是一个类的开头。为目标类的代理类开始写代码。
类的头写完了,接下来开始写类里的代码。循环method数组,判断这个方法是否有Adaptive注解,从断点我们可以得知,这个方法上注解的值是threadpool
注解不为空代码继续下行,进行参数的校验。如果参数中存在URL类型的参数,那么需要做一个特殊处理,即参数不能为null,添加一个参数的校验,为null抛出异常。
很不巧的是我们的ThreadPool的getExecute方法的参数正好是URL对象。于是代码中就会被加上了这么一句。
接下来是一些判断,例如注解的值是不是空的,参数类型是不是Invocation等等,不巧的是ThreadPool都不符合,所以全部跳过。接下来就是正式开始用我们的参数加载返回值了
从上面这串代码可以看到,对于ThreadPool,dubbo有一个默认的配置,是fixed,也就是说如果你不配,那就是初始化fixed。另外对于Invocation类型,代码的写法也有所不同,难怪要先进行判断。
接下来又是一系列的代码拼装,空值校验之类的
拼装完之后,代码已经被写成了下边这样
至此 ,createAdaptiveExtensionClassCode执行完毕,成功获取到代理类的所有代码。随后将这堆字符串原地编译,编译出代理类并返回。
终于,代码又回到了createAdaptiveExtension中。随后执行injectExtension方法
这个方法中的操作也不复杂,判断是否有属性需要注入,如果有的话就注入
遗憾的是,我们的ThreadPool类并没有这样的方法。
所以最终返回的实例就是前一步编译完成的代理对象。ExtensionLoader.getAdaptiveExtension方法到此结束。
ExtensionLoader是dubbo对SPI机制的一个封装。基本上所有的扩展点特性都在这个类里实现。了解了这个类也基本上就明白了SPI机制的实现