一、概述
服务发布时,接口的实现类是会被JavassistProxyFactory封装成Invoker实例,然后交由Protocol$Adaptive去发布export。Protocol$Adaptive会选择RegistryProtocol类实现服务发布。但是RegistryProtocol也是被封装的,要经过ProtocolListenerWrapper、QosProtocolWrapper、ProtocolFilterWrapper之后,才会调用到RegistryProtocol。而且在ProtocolFilterWrapper中还要经过层层过滤器的封装。所以远程rpc时,要经过层层filter过滤之后才会最终调用到实现类。这其中就包括经典的异常处理。
二、源码分析
1、DubboProtocol
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs)
...
//protocol指的是Protocol$Adaptive
Exporter<?> exporter = protocol.export(wrapperInvoker);
...
}
2、Protocol$Adaptive
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
//通过业务配置extName获取对应的实现类,调用getExtension方法生成Protocol实例。
//我们调试一下getExtension方法
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
}
3、ExtensionLoader
1)进入getExtension方法
2)调用createExtension方法,对invoker进行封装,调用链为ProtocolListenerWrapper -> QosProtocolWrapper
-> ProtocolFilterWrapper -> RegistryProtocol。最终进入RegistryProtocol的doLocalExport方法,在这里将
会针对dubbo协议进行封装转换,形成和上面相似的调用链,但在ProtocolFilterWrapper中会执行filter封装
4、RegistryProtocol
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
5、ProtocolFilterWrapper
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) {
// onError callback
if (filter instanceof ListenableFilter) {
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
listener.onError(e, invoker, invocation);
}
}
throw e;
}
return asyncResult;
}
@Override
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
//last和filters被封装进CallbackRegistrationInvoker中
return new CallbackRegistrationInvoker<>(last, filters);
}
static class CallbackRegistrationInvoker<T> implements Invoker<T> {
private final Invoker<T> filterInvoker;
private final List<Filter> filters;
public CallbackRegistrationInvoker(Invoker<T> filterInvoker, List<Filter> filters) {
this.filterInvoker = filterInvoker;
this.filters = filters;
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult = filterInvoker.invoke(invocation);
asyncResult = asyncResult.whenCompleteWithContext((r, t) -> {
for (int i = filters.size() - 1; i >= 0; i--) {
Filter filter = filters.get(i);
// onResponse callback
if (filter instanceof ListenableFilter) {
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
if (t == null) {
listener.onResponse(r, filterInvoker, invocation);
} else {
listener.onError(t, filterInvoker, invocation);
}
}
} else {
filter.onResponse(r, filterInvoker, invocation);
}
}
});
return asyncResult;
}
@Override
public Class<T> getInterface() {
return filterInvoker.getInterface();
}
@Override
public URL getUrl() {
return filterInvoker.getUrl();
}
@Override
public boolean isAvailable() {
return filterInvoker.isAvailable();
}
@Override
public void destroy() {
filterInvoker.destroy();
}
}