一、概述

服务发布时,接口的实现类是会被JavassistProxyFactory封装成Invoker实例,然后交由Protocol$Adaptive去发布export。Protocol$Adaptive会选择RegistryProtocol类实现服务发布。但是RegistryProtocol也是被封装的,要经过ProtocolListenerWrapper、QosProtocolWrapper、ProtocolFilterWrapper之后,才会调用到RegistryProtocol。而且在ProtocolFilterWrapper中还要经过层层过滤器的封装。所以远程rpc时,要经过层层filter过滤之后才会最终调用到实现类。这其中就包括经典的异常处理。

二、源码分析

1、DubboProtocol

  1. private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs)
  2. ...
  3. //protocol指的是Protocol$Adaptive
  4. Exporter<?> exporter = protocol.export(wrapperInvoker);
  5. ...
  6. }

2、Protocol$Adaptive

  1. public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
  2. public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
  3. org.apache.dubbo.common.URL url = arg0.getUrl();
  4. String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
  5. if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
  6. //通过业务配置extName获取对应的实现类,调用getExtension方法生成Protocol实例。
  7. //我们调试一下getExtension方法
  8. org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
  9. return extension.export(arg0);
  10. }
  11. }

3、ExtensionLoader

  1. 1)进入getExtension方法
  2. 2)调用createExtension方法,对invoker进行封装,调用链为ProtocolListenerWrapper -> QosProtocolWrapper
  3. -> ProtocolFilterWrapper -> RegistryProtocol。最终进入RegistryProtocoldoLocalExport方法,在这里将
  4. 会针对dubbo协议进行封装转换,形成和上面相似的调用链,但在ProtocolFilterWrapper中会执行filter封装

dubbo之invoker封装过程 - 图1

4、RegistryProtocol

  1. private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
  2. String key = getCacheKey(originInvoker);
  3. return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
  4. Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
  5. return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
  6. });
  7. }

dubbo之invoker封装过程 - 图2

5、ProtocolFilterWrapper

  1. public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
  2. if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
  3. return protocol.export(invoker);
  4. }
  5. return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
  6. }
  7. private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
  8. Invoker<T> last = invoker;
  9. List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
  10. if (!filters.isEmpty()) {
  11. for (int i = filters.size() - 1; i >= 0; i--) {
  12. final Filter filter = filters.get(i);
  13. final Invoker<T> next = last;
  14. last = new Invoker<T>() {
  15. @Override
  16. public Class<T> getInterface() {
  17. return invoker.getInterface();
  18. }
  19. @Override
  20. public URL getUrl() {
  21. return invoker.getUrl();
  22. }
  23. @Override
  24. public boolean isAvailable() {
  25. return invoker.isAvailable();
  26. }
  27. @Override
  28. public Result invoke(Invocation invocation) throws RpcException {
  29. Result asyncResult;
  30. try {
  31. asyncResult = filter.invoke(next, invocation);
  32. } catch (Exception e) {
  33. // onError callback
  34. if (filter instanceof ListenableFilter) {
  35. Filter.Listener listener = ((ListenableFilter) filter).listener();
  36. if (listener != null) {
  37. listener.onError(e, invoker, invocation);
  38. }
  39. }
  40. throw e;
  41. }
  42. return asyncResult;
  43. }
  44. @Override
  45. public void destroy() {
  46. invoker.destroy();
  47. }
  48. @Override
  49. public String toString() {
  50. return invoker.toString();
  51. }
  52. };
  53. }
  54. }
  55. //last和filters被封装进CallbackRegistrationInvoker中
  56. return new CallbackRegistrationInvoker<>(last, filters);
  57. }
  58. static class CallbackRegistrationInvoker<T> implements Invoker<T> {
  59. private final Invoker<T> filterInvoker;
  60. private final List<Filter> filters;
  61. public CallbackRegistrationInvoker(Invoker<T> filterInvoker, List<Filter> filters) {
  62. this.filterInvoker = filterInvoker;
  63. this.filters = filters;
  64. }
  65. @Override
  66. public Result invoke(Invocation invocation) throws RpcException {
  67. Result asyncResult = filterInvoker.invoke(invocation);
  68. asyncResult = asyncResult.whenCompleteWithContext((r, t) -> {
  69. for (int i = filters.size() - 1; i >= 0; i--) {
  70. Filter filter = filters.get(i);
  71. // onResponse callback
  72. if (filter instanceof ListenableFilter) {
  73. Filter.Listener listener = ((ListenableFilter) filter).listener();
  74. if (listener != null) {
  75. if (t == null) {
  76. listener.onResponse(r, filterInvoker, invocation);
  77. } else {
  78. listener.onError(t, filterInvoker, invocation);
  79. }
  80. }
  81. } else {
  82. filter.onResponse(r, filterInvoker, invocation);
  83. }
  84. }
  85. });
  86. return asyncResult;
  87. }
  88. @Override
  89. public Class<T> getInterface() {
  90. return filterInvoker.getInterface();
  91. }
  92. @Override
  93. public URL getUrl() {
  94. return filterInvoker.getUrl();
  95. }
  96. @Override
  97. public boolean isAvailable() {
  98. return filterInvoker.isAvailable();
  99. }
  100. @Override
  101. public void destroy() {
  102. filterInvoker.destroy();
  103. }
  104. }

dubbo之invoker封装过程 - 图3