Dubbo SPI(Service Provider Interface)增强了 JDK 中提供的标准 SPI 功能,在 Dubbo 中的很多核心接口都是通过实现扩展点接口来提供服务的。Dubbo 解决了 JDK 标准的 SPI 的以下问题:
- JDK 标准的 SPI 会一次性实例化扩展点的所有实现类,如果有些扩展点实现类初始化很耗时,但当前又没用上它的功能,那么加载就很浪费资源。而 Dubbo SPI 则是在具体用某一个实现类时才会对具体实现类进行实例化。
- JDK 标准的 SPI 如果某个扩展点加载失败,是不会友好地向用户通知具体异常。
- Dubbo SPI 增加了对扩展点 IoC 和 AOP 的支持,一个扩展点可以直接使用 setter 方法注入其他扩展点,也可以对扩展点使用 Wrapper 类进行功能增强。
自定义扩展点
下面以 Dubbo 的 Protocol 接口为例,介绍如何自定义扩展点实现:
@SPI("dubbo")public interface Protocol {...}
扩展点接口的类都含有 @SPI 注解,这里注解中的 dubbo 值说明 Protocol 扩展接口 SPI 的默认实现为 DubboProtocol。如果我们想自己写一个 Protocol 扩展接口的实现类,则需要在实现类所在的 Jar 包内的 META-INF/dubbo/ 目录下创建一个名为 org.apache.dubbo.rpc.Protocol 的文本文件,内容为:
myprotocol=com.xxx.xxx.MyProtocol
那如何使用我们自定义的扩展实现呢?在 Dubbo 配置模块中,扩展点接口均有对应的配置属性或 XML 标签,我们可以在指定组件的 name 属性上赋值我们自定义的实现类名称,如下通过标签指定了使用哪个扩展:
<dubbo:protocol name="myprotocol" />
注意,这里的 name 必须要与 META-INF/dubbo/ 目录下的 org.apache.dubbo.rpc.Protocol 配置文件中的等号左侧的 key 的名字一致。
1. IOC 自动注入
当通过 ExtensionLoader 获取扩展接口实例时,在创建完对象实例后还会调用 injectExtension 方法来注入其依赖的扩展点实例,具体代码如下:
private T injectExtension(T instance) {try {// 遍历该对象实例的所有 setter 方法for (Method method : instance.getClass().getMethods()) {if (!isSetter(method)) {continue;}// 如果某个setter方法被@DisableInject注解修饰,表示不需要自动注入if (method.getAnnotation(DisableInject.class) != null) {continue;}// 如果第一个参数类型是原始类型,则不需要自动注入Class<?> pt = method.getParameterTypes()[0];if (ReflectUtils.isPrimitives(pt)) {continue;}// 查看setter方法设置的变量是不是有扩展接口实现try {// 获取setter对应的属性名,比如 setVersion,则返回 versionString property = getSetterProperty(method);// 查看该属性是否存在扩展实现Object object = objectFactory.getExtension(pt, property);// 如果存在则反射调用setter方法进行属性赋值if (object != null) {method.invoke(instance, object);}} catch (Exception e) {logger.error("Failed to inject via method " + method.getName() + " of interface " + type.getName() + ": " + e.getMessage(), e);}}} catch (Exception e) {logger.error(e.getMessage(), e);}return instance;}
2. AOP 增强
在 Spring AOP 中,我们可以使用多个切面对指定类的方法进行增强,在 Dubbo 中也提供了类似的功能。在 Dubbo 中你可以指定多个 Wrapper 类对指定的扩展点的实现类的方法进行增强。
那 Wrapper 类是如何被加载的呢?
private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name,boolean overridden) throws NoSuchMethodException {......if (isWrapperClass(clazz)) {cacheWrapperClass(clazz);}......}// Wrapper类必须有一个原始类型入参的构造器private boolean isWrapperClass(Class<?> clazz) {try {clazz.getConstructor(type);return true;} catch (NoSuchMethodException e) {return false;}}
可以看到,当在加载扩展接口的实现类的时候,如果配置文件中指定的类的全限定名对应的是一个 Wrapper 类,则会将其缓存在 cachedWrapperClasses 中。之后,在实例化对象实例时,会将 Wrapper 类也进行实例化,并将原始类型的对象实例通过 IOC 机制注入到 Wrapper 实例中,之后返回的就是这个 Wrapper 实例。
private T createExtension(String name, boolean wrap) {Class<?> clazz = getExtensionClasses().get(name);......// 如果有包装类型if (wrap) {List<Class<?>> wrapperClassesList = new ArrayList<>();if (cachedWrapperClasses != null) {wrapperClassesList.addAll(cachedWrapperClasses);// 根据@Activate注解中的order属性排序wrapperClassesList.sort(WrapperComparator.COMPARATOR);Collections.reverse(wrapperClassesList);}if (CollectionUtils.isNotEmpty(wrapperClassesList)) {for (Class<?> wrapperClass : wrapperClassesList) {// @Wrapper注解可提供一些判断条件Wrapper wrapper = wrapperClass.getAnnotation(Wrapper.class);if (wrapper == null|| (ArrayUtils.contains(wrapper.matches(), name) && !ArrayUtils.contains(wrapper.mismatches(), name))) {// 通过IOC机制注入原始类型对应的实例对象instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));}}}}......}
总结一下,Wrapper 包装类必须实现扩展接口,并且要提供一个原接口实现类对象的构造器,这样就可以在调用接口方法前后,注入我们自定义的逻辑,并可以通过原接口实现类完成原始逻辑的执行。
下面以 ProtocolFilterWrapper 为例讲解一下,该类属于 Protocol 接口实现类的包装类:
// 通过@Activate注解定义执行该Wrapper顺序@Activate(order = 100)public class ProtocolFilterWrapper implements Protocol {private final Protocol protocol;// 有一个原始接口实现类的实例的入参的构造器public ProtocolFilterWrapper(Protocol protocol) {if (protocol == null) {throw new IllegalArgumentException("protocol == null");}this.protocol = protocol;}private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {Invoker<T> last = invoker;List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);if (!filters.isEmpty()) {for (int i = filters.size() - 1; i >= 0; i--) {final Filter filter = filters.get(i);last = new FilterNode<T>(invoker, last, filter);}}return last;}@Overridepublic int getDefaultPort() {// 不进行扩展,直接调用原方法return protocol.getDefaultPort();}@Overridepublic <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {if (UrlUtils.isRegistry(invoker.getUrl())) {return protocol.export(invoker);}// 在执行原始的export方法前,调用Filter拦截器链return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));}......}
该 Wrapper 在 META-INF/dubbo/internal 路径下的 org.apache.dubbo.rpc.Protocol 文件中进行了定义:
filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
@Adaptive 适配器
Dubbo 框架在使用扩展点功能时是对接口进行依赖的,而一个扩展接口对应了一系列的扩展实现类。那么如何选择使用哪一个扩展接口的实现类呢?其实是使用适配器模式来做的。还是以 Protocol 接口为例:
@SPI("dubbo")public interface Protocol {@Adaptive<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;@Adaptive<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;}
可以看到,在方法上修饰了 @Adaptive 注解,这个注解表示对应的方法可以被适配器所适配。当调用 ExtensionLoader 类的 getAdaptiveExtension 方法时,Dubbo 会使用动态编译技术为接口 Protocol 生成一个适配器类 Protocol$Adaptive 的对象实例,这个适配类的内容如下:
package org.apache.dubbo.rpc;import org.apache.dubbo.common.extension.ExtensionLoader;public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");org.apache.dubbo.common.URL url = arg0.getUrl();String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);return extension.export(arg0);}public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {if (arg1 == null) throw new IllegalArgumentException("url == null");org.apache.dubbo.common.URL url = arg1;String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);return extension.refer(arg0, arg1);}public java.util.List getServers() {throw new UnsupportedOperationException("The method public default java.util.List org.apache.dubbo.rpc.Protocol.getServers() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");}public void destroy() {throw new UnsupportedOperationException("The method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");}public int getDefaultPort() {throw new UnsupportedOperationException("The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");}}
可以看到,这个适配器类只会针对修饰了 @Adaptive 注解的方法进行适配,其他方法直接抛出异常。在 Dubbo 框架中需要使用 Protocol 的实例时,实际上就是使用 Protocol$Adaptive 这个对象实例来获取具体的 SPI 实现类的。在适配类的 export 和 refer 方法中,会根据 Invoker 中 URL 里面的协议类型参数,通过 getExtension 方法根据扩展名称获取对应的扩展实现类,再调用其方法。这样,适配器类就可以在每次方法调用时,根据传递的协议参数的不同,加载不同的 Protocol 的 SPI 实现,具有极大的灵活性。
@Adaptive 注解可以修饰类和方法上,在修饰类的时候不会生成代理类,因为被修饰的这个类就作为代理类,当修饰在方法上的时候会生成代理类。生成代理类的源码如下:
private Class<?> createAdaptiveExtensionClass() {// 生成代码String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();ClassLoader classLoader = findClassLoader();org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();// 编译成Classreturn compiler.compile(code, classLoader);}
@Activate
ExtensionLoader 不仅提供了获取某一个扩展接口实现类的方法,还提供了获取扩展接口全部实现类的方法。比如 ProtocolFilterWrapper 中的 buildInvokerChain 方法在建立 Filter 责任链时,需要把属于某一 group 的所有 Filter 都放到责任链里,其通过如下方法来获取属于某个组的 Filter 扩展实现类的:
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
比如,当服务提供端启动时只会加载 group 为 provider 的 Filter 扩展实现类:
@Activate(group = CommonConstants.PROVIDER, value = EXECUTES_KEY)public class ExecuteLimitFilter implements Filter, Filter.Listener {......}
当服务消费端启动时只会加载 group 为 consumer 的 Filter 扩展实现类:
@Activate(group = CONSUMER, value = ACTIVES_KEY)public class ActiveLimitFilter implements Filter, Filter.Listener {......}
@Activate 注解有三个属性,group 表示修饰在哪个端,是 provider 还是 consumer,value 表示在 URL 参数中出现才会被激活,order 表示实现类的顺序。只有 group 和 value 都满足,该 Filter 才会被执行。
下面分析下 getActivateExtension 方法的具体逻辑:
public List<T> getActivateExtension(URL url, String key, String group) {String value = url.getParameter(key);return getActivateExtension(url, StringUtils.isEmpty(value) ? null : COMMA_SPLIT_PATTERN.split(value), group);}public List<T> getActivateExtension(URL url, String[] values, String group) {List<T> activateExtensions = new ArrayList<>();List<String> names = values == null ? new ArrayList<>(0) : asList(values);if (!names.contains(REMOVE_VALUE_PREFIX + DEFAULT_KEY)) {// 获取扩展接口对应的所有扩展实现getExtensionClasses();// 在获取扩展实现的过程中,会解析@Activate注解信息// cachedActivates的key为扩展接口实现类的扩展名,value为扩展接口实现类上的@Activate注解元信息for (Map.Entry<String, Object> entry : cachedActivates.entrySet()) {String name = entry.getKey();Object activate = entry.getValue();String[] activateGroup, activateValue;// 如果含有@Activate注解,则获取注解的group和value值if (activate instanceof Activate) {activateGroup = ((Activate) activate).group();activateValue = ((Activate) activate).value();} else if (activate instanceof com.alibaba.dubbo.common.extension.Activate) {// 兼容activateGroup = ((com.alibaba.dubbo.common.extension.Activate) activate).group();activateValue = ((com.alibaba.dubbo.common.extension.Activate) activate).value();} else {continue;}// 如果当前扩展实现的组与我们传递的group匹配,且value值在URL组存在if (isMatchGroup(group, activateGroup)&& !names.contains(name)&& !names.contains(REMOVE_VALUE_PREFIX + name)&& isActive(activateValue, url)) {activateExtensions.add(getExtension(name));}}activateExtensions.sort(ActivateComparator.COMPARATOR);}......return activateExtensions;}
