简介

  1. Dubbo 框架本身就是一个 RPC 框架,不同的是消费者要连接的服务端 IP 与端口号不是硬编码在服务端的,而是从 zk 中读取到的。
  2. 一个 Dubbo 应用中会存在很多服务提供者与消费者。每个提供者都是一个 Netty Server,其会对外暴露自己所在主机的 IP Port。每个消费者都是一个 Netty Client,其会通过连接相应主机的 IP Port 来获取相应的服务。
  3. 服务提供者的 IP Port 是如何对外暴露的呢?其会为自己所提供的服务起一个服务名称,一般为业务接口名。然后将该服务名称与对应提供者主机的 IP Port 相绑定注册到zk 中。
  4. 服务消费者会从服务注册中心 zk 中查找自己所需要的服务名称,一般为业务接口名,然后获取到该服务名称对应的所有提供者主机信息,并通过负载均衡方式选取一个主机进行连接,获取相应服务。

自定义新增需求

  1. 当客户端通过负载均衡策略选择了某一提供者主机后,我们这里新增了一个需求:提供者主机中提供同一服务名称(接口名)的实现类有多个。这样,消费者可以指定其要调用的实现类。若消费者没有指定要调用的实现类,其会调用到注册到中第一个注册的实现类。
  2. 为了实现提供者端业务接口可以有多个实现类供客户端选择,这里要求实现类名必须是一个前辍 prefix 后是业务接口名。这样,消费者在进行消费时,可以通过前辍来指定要调用的是哪个实现类。

思路定义

  • 核心工具类:springboot-dubbo-core
    1. ZKConstant:主要定义连接zk时的字符串常量定义。
    2. Invocation:RPC接口远程调用的信息。
    3. LoadBalance:负载均衡接口定义。
    4. RandomLoadBalance:负载均衡-随机接口实现。
    5. GcService:业务接口定义。
  • 服务端:springboot-dubbo-clinet
    1. ServiceDiscovery:服务发现具体定义。
    2. ZKServiceDiscovery:服务发现具体实现。
    3. RpcProxy:使用代理调用RPC远程接口。
    4. RpcClientHandler:Netty的客户端业务处理器。
    5. RpcClinetStartApplication:客户度启动入口。
  • 服务端:springboot-dubbo-server

    1. RegistryCenter: zk注册具体定义。
    2. ZKRegistryCenter:RegistryCenter的具体实现,连接ZK并注册暴露的RPC接口。
    3. RpcServer:将指定的目录下的RPC接口加载到缓存中,并进行注册到zk中。
    4. RpcServerHandler:主要处理RPC的连接请求。
    5. RpcServerStartApplication:服务端启动入口。

项目结构图

Netty高级应用实战篇(下) - 图1

Netty高级应用实战篇(下) - 图2

Netty高级应用实战篇(下) - 图3

编码实现

API包

ZKConstant

  1. package com.gc.core;
  2. /**
  3. * @description: 注册中心zk常量配置
  4. * @author: GC
  5. * @create: 2020-11-16 15:16
  6. **/
  7. public class ZKConstant {
  8. //连接集群机器
  9. public final static String ZK_CLUSTER = "127.0.0.1:2181";
  10. //创建默认跟节点
  11. public final static String ZK_DUBBO_ROOT_PATH = "/gcdubbo";
  12. }

Invocation

  1. package com.gc.core;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Builder;
  4. import lombok.Data;
  5. import java.io.Serializable;
  6. /**
  7. * @description: 封装调用远程RPC信息
  8. * @author: GC
  9. * @create: 2020-11-16 15:19
  10. **/
  11. @Data
  12. @Builder
  13. @AllArgsConstructor
  14. public class Invocation implements Serializable {
  15. //接口名
  16. private String className;
  17. //方法名
  18. private String methodName;
  19. //参数类型列表
  20. private Class<?>[] paramTypes;
  21. //参数值列表
  22. private Object[] paramValues;
  23. //业务接口的前缀
  24. private String prefix;
  25. }

LoadBalance

  1. package com.gc.loadbalance;
  2. import java.util.List;
  3. public interface LoadBalance {
  4. /**
  5. *
  6. * @param invokerPaths 所有提供者主机列表
  7. * @return 负载均衡后的结果
  8. */
  9. String choose(List<String> invokerPaths);
  10. }

RandomLoadBalance

  1. package com.gc.loadbalance;
  2. import org.springframework.stereotype.Component;
  3. import java.util.List;
  4. import java.util.Random;
  5. /**
  6. * 随机负载均衡策略
  7. */
  8. @Component
  9. public class RandomLoadBalance implements LoadBalance {
  10. @Override
  11. public String choose(List<String> invokerPaths) {
  12. int index = new Random().nextInt(invokerPaths.size());
  13. return invokerPaths.get(index);
  14. }
  15. }

GcService

  1. package com.gc.service;
  2. /**
  3. * @description: 用户个人信息
  4. * @author: GC
  5. * @create: 2020-11-16 15:50
  6. **/
  7. public interface GcService {
  8. /**
  9. * 获取GC的信息
  10. * @param name
  11. * @return
  12. */
  13. public String getGcInfo(String name);
  14. }

pom

  1. <dependency>
  2. <groupId>org.projectlombok</groupId>
  3. <artifactId>lombok</artifactId>
  4. <version>1.18.16</version>
  5. <scope>compile</scope>
  6. </dependency>

客户端

ServiceDiscovery

  1. package com.gc.discovery;
  2. /**
  3. * @description: 服务发现规范定义
  4. * @author: GC
  5. * @create: 2020-11-16 16:14
  6. **/
  7. public interface ServiceDiscovery {
  8. String discovery(String serviceName);
  9. }

ZKServiceDiscovery

  1. package com.gc.discovery;
  2. import com.gc.core.ZKConstant;
  3. import com.gc.loadbalance.LoadBalance;
  4. import org.apache.curator.framework.CuratorFramework;
  5. import org.apache.curator.framework.CuratorFrameworkFactory;
  6. import org.apache.curator.framework.api.CuratorWatcher;
  7. import org.apache.curator.retry.ExponentialBackoffRetry;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.stereotype.Component;
  10. import java.util.List;
  11. /**
  12. * @description: 注册具体实现
  13. * @author: GC
  14. * @create: 2020-11-16 16:14
  15. **/
  16. @Component
  17. public class ZKServiceDiscovery implements ServiceDiscovery {
  18. @Autowired
  19. private LoadBalance loadBalance;
  20. //ZK的工具类客户端
  21. private CuratorFramework client;
  22. //读取Server端的主机
  23. private List<String> servers;
  24. public ZKServiceDiscovery() {
  25. client = CuratorFrameworkFactory.builder()
  26. .connectString(ZKConstant.ZK_CLUSTER)//连接注册中心的地址
  27. .connectionTimeoutMs(10000) //连接超时上线
  28. .sessionTimeoutMs(4000) //会话过期时间
  29. //连接失败时的重试策略。最多10次,每连接一次sheel一秒
  30. .retryPolicy(new ExponentialBackoffRetry(1000, 10))
  31. .build();
  32. client.start();
  33. }
  34. @Override
  35. public String discovery(String serviceName) {
  36. try {
  37. //配置的根路径 + 服务名称的类路径
  38. String servicePath = ZKConstant.ZK_DUBBO_ROOT_PATH + "/" + serviceName;
  39. servers = client.getChildren()
  40. //在该路径在注册一个子节点列表变更监听器。
  41. .usingWatcher((CuratorWatcher) event -> {
  42. servers = client.getChildren().forPath(servicePath);
  43. }).forPath(servicePath);//获得该路径下的所有服务节点
  44. if(servers.size() == 0) {
  45. return null;
  46. }
  47. } catch (Exception e) {
  48. e.printStackTrace();
  49. }
  50. return loadBalance.choose(servers);
  51. }
  52. }

RpcProxy

  1. package com.gc.client;
  2. import com.gc.core.Invocation;
  3. import com.gc.discovery.ServiceDiscovery;
  4. import io.netty.bootstrap.Bootstrap;
  5. import io.netty.channel.*;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.SocketChannel;
  8. import io.netty.channel.socket.nio.NioSocketChannel;
  9. import io.netty.handler.codec.serialization.ClassResolvers;
  10. import io.netty.handler.codec.serialization.ObjectDecoder;
  11. import io.netty.handler.codec.serialization.ObjectEncoder;
  12. import org.springframework.beans.factory.annotation.Autowired;
  13. import org.springframework.stereotype.Component;
  14. import java.lang.reflect.InvocationHandler;
  15. import java.lang.reflect.Method;
  16. import java.lang.reflect.Proxy;
  17. /**
  18. * @description: 使用代理调用远程接口
  19. * @author: GC
  20. * @create: 2020-11-16 17:55
  21. **/
  22. @Component
  23. public class RpcProxy {
  24. @Autowired
  25. private ServiceDiscovery serviceDiscovery;
  26. public <T> T create(Class<?> clazz, String prefix){
  27. return (T)Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, new InvocationHandler() {
  28. @Override
  29. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  30. //如果是本地方法,则直接本地调用
  31. if (Object.class.equals(method.getDeclaringClass())) {
  32. return method.invoke(args);
  33. }
  34. //否则使用RPC远程调用
  35. return rpcInvoke(clazz, method, args, prefix);
  36. }
  37. });
  38. }
  39. /**
  40. * @param clazz 调用的类
  41. * @param method 方法名称
  42. * @param args 参数
  43. * @param prefix 前缀
  44. * @return Object 远程调用结果集
  45. */
  46. private Object rpcInvoke(Class<?> clazz, Method method, Object[] args, String prefix) {
  47. RpcClientHandler rpcClientHandler = new RpcClientHandler();
  48. EventLoopGroup clinetLoopGroup = new NioEventLoopGroup();
  49. //客户端连接服务器的地址
  50. String serviceAddress = null;
  51. try {
  52. Bootstrap bootstrap = new Bootstrap();
  53. bootstrap.group(clinetLoopGroup)
  54. //开启Nagle算法,它默认是关闭的
  55. //该算法的目的就是为了尽可能发送大块数据,避免网络中充斥着许多小数据块。
  56. .option(ChannelOption.TCP_NODELAY, true)
  57. .channel(NioSocketChannel.class)
  58. .handler(new ChannelInitializer<SocketChannel>() {
  59. @Override
  60. protected void initChannel(SocketChannel ch) throws Exception {
  61. ChannelPipeline pipeline = ch.pipeline();
  62. pipeline.addLast(new ObjectEncoder());
  63. pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
  64. pipeline.addLast(rpcClientHandler);
  65. }
  66. });
  67. /**
  68. * 服务发现
  69. * 拿到类名称,从zk中读取到远程RPC接口的暴露地址。用于本次Netty的连接地址
  70. */
  71. serviceAddress = serviceDiscovery.discovery(clazz.getName());
  72. //如果没有拿到连接地址,那么本次连接终止
  73. if (serviceAddress == null) {
  74. return null;
  75. }
  76. String ip = serviceAddress.split(":")[0];
  77. String port = serviceAddress.split(":")[1];
  78. ChannelFuture future = bootstrap.connect(ip, Integer.parseInt(port)).sync();
  79. //组装调用信息,传递给服务端
  80. future.channel().writeAndFlush(
  81. Invocation.builder()
  82. .className(clazz.getName())
  83. .methodName(method.getName())
  84. .paramTypes(method.getParameterTypes())
  85. .prefix(prefix)
  86. .paramValues(args)
  87. .build()
  88. ).sync();
  89. future.channel().closeFuture().sync();
  90. }catch (Exception e){
  91. e.printStackTrace();
  92. }finally {
  93. clinetLoopGroup.shutdownGracefully();
  94. }
  95. return rpcClientHandler.getResult();
  96. }
  97. }

RpcClientHandler

  1. package com.gc.client;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import io.netty.channel.SimpleChannelInboundHandler;
  4. /**
  5. * @description: 使用代理调用远程接口返回的结果集
  6. * @author: GC
  7. * @create: 2020-11-16 17:55
  8. **/
  9. public class RpcClientHandler extends SimpleChannelInboundHandler<Object> {
  10. private Object result;
  11. public Object getResult() {
  12. return result;
  13. }
  14. @Override
  15. protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
  16. this.result = msg;
  17. }
  18. @Override
  19. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  20. cause.printStackTrace();
  21. ctx.close();
  22. }
  23. }

RpcClinetStartApplication

  1. package com.gc;
  2. import com.gc.client.RpcProxy;
  3. import com.gc.service.GcService;
  4. import org.springframework.boot.CommandLineRunner;
  5. import org.springframework.boot.SpringApplication;
  6. import org.springframework.boot.autoconfigure.SpringBootApplication;
  7. import javax.annotation.Resource;
  8. @SpringBootApplication
  9. public class RpcClinetStartApplication implements CommandLineRunner {
  10. @Resource
  11. private RpcProxy proxy;
  12. public static void main(String[] args) {
  13. SpringApplication.run(RpcClinetStartApplication.class, args);
  14. }
  15. @Override
  16. public void run(String... args) throws Exception {
  17. GcService infoService = proxy.create(GcService.class, "Info");
  18. System.out.println(infoService.getGcInfo("阿伟"));
  19. GcService planInfoService = proxy.create(GcService.class, "PlanInfo");
  20. System.err.println(planInfoService.getGcInfo("GC"));
  21. }
  22. }

pom

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter</artifactId>
  4. </dependency>
  5. <!-- api依赖 -->
  6. <dependency>
  7. <groupId>com.gc.core</groupId>
  8. <artifactId>springboot-dubbo-core</artifactId>
  9. <version>0.0.1-SNAPSHOT</version>
  10. </dependency>
  11. <!--curator依赖-->
  12. <dependency>
  13. <groupId>org.apache.curator</groupId>
  14. <artifactId>curator-framework</artifactId>
  15. <version>2.12.0</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.apache.curator</groupId>
  19. <artifactId>curator-recipes</artifactId>
  20. <version>2.12.0</version>
  21. </dependency>
  22. <!-- netty-all依赖 -->
  23. <dependency>
  24. <groupId>io.netty</groupId>
  25. <artifactId>netty-all</artifactId>
  26. <version>4.1.36.Final</version>
  27. </dependency>
  28. <dependency>
  29. <groupId>org.projectlombok</groupId>
  30. <artifactId>lombok</artifactId>
  31. <optional>true</optional>
  32. </dependency>

服务端

RegistryCenter

  1. package com.gc.registry;
  2. /**
  3. * @description: 注册规范定义
  4. * @author: GC
  5. * @create: 2020-11-16 16:14
  6. **/
  7. public interface RegistryCenter {
  8. /**
  9. *
  10. * @param serviceName 注册到注册中心的服务名称,一般为业务接口名
  11. * @param serviceAddress 提供该服务的主机的ip:port
  12. */
  13. void register(String serviceName, String serviceAddress) throws Exception;
  14. }

ZKRegistryCenter

  1. package com.gc.registry;
  2. import com.gc.core.ZKConstant;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.zookeeper.CreateMode;import org.springframework.stereotype.Component;
  3. /**
  4. * @description: 注册规范实现
  5. * @author: GC
  6. * @create: 2020-11-16 16:15
  7. **/
  8. @Component
  9. public class ZKRegistryCenter implements RegistryCenter {
  10. private CuratorFramework client;
  11. public ZKRegistryCenter(){
  12. // 创建并初始化zk的客户端
  13. client = CuratorFrameworkFactory.builder()
  14. .connectString(ZKConstant.ZK_CLUSTER)//连接到当前集群地址
  15. .connectionTimeoutMs(10000) //连接超时时间
  16. .sessionTimeoutMs(4000) //一次会话失效时间
  17. //当连接失败时,重试的连接策略,每1秒连接1次,最多连接10次
  18. .retryPolicy(new ExponentialBackoffRetry(1000, 10)).build();
  19. client.start();
  20. }
  21. @Override
  22. public void register(String serviceName, String serviceAddress) throws Exception {
  23. String pathRoot = ZKConstant.ZK_DUBBO_ROOT_PATH + "/" + serviceName;
  24. //创建临时节点
  25. String hostPath = pathRoot + "/" + serviceAddress;
  26. if(client.checkExists().forPath(hostPath) == null){
  27. String host =client.create()
  28. .creatingParentsIfNeeded() //如果当前要创建的子节点还没有父节点,那么就在此时创建
  29. .withMode(CreateMode.EPHEMERAL)//创建的节点类型,临时节点
  30. .forPath(hostPath);//创建节点路径
  31. System.err.println("创建节点:"+host);
  32. }
  33. }
  34. }

RpcServer

  1. package com.gc.server;
  2. import com.gc.registry.RegistryCenter;
  3. import io.netty.bootstrap.ServerBootstrap;
  4. import io.netty.channel.*;
  5. import io.netty.channel.nio.NioEventLoop;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.SocketChannel;
  8. import io.netty.channel.socket.nio.NioServerSocketChannel;
  9. import io.netty.handler.codec.serialization.ClassResolver;
  10. import io.netty.handler.codec.serialization.ClassResolvers;
  11. import io.netty.handler.codec.serialization.ObjectDecoder;
  12. import io.netty.handler.codec.serialization.ObjectEncoder;
  13. import lombok.Data;
  14. import org.springframework.stereotype.Component;
  15. import java.io.File;
  16. import java.net.URL;
  17. import java.util.ArrayList;
  18. import java.util.HashMap;
  19. import java.util.List;
  20. import java.util.Map;
  21. /**
  22. * @description: 将暴露的RPC接口注册到解析并以临时节点注册到zk中
  23. * @author: GC
  24. * @create: 2020-11-16 16:54
  25. **/
  26. @Component
  27. @Data
  28. public class RpcServer {
  29. // 服务注册表
  30. private Map<String, Object> registryMap = new HashMap<>();
  31. //获取到指定包的需要暴露的RPC接口全限定类名
  32. private List<String> classCache = new ArrayList<>();
  33. //ZK的连接,和创建节点工具类
  34. private RegistryCenter center;
  35. //当前服务端的ip:prot
  36. private String serviceAddress;
  37. // 添加上无参构造器
  38. public RpcServer() {
  39. }
  40. public RpcServer(RegistryCenter center, String serviceAddress) {
  41. this.center = center;
  42. this.serviceAddress = serviceAddress;
  43. }
  44. public void publish(String basePackage) throws Exception {
  45. // 将指定包下的提供者类名写入到classCache中进行缓存
  46. getProviderClass(basePackage);
  47. // 将提供者名与实例的对应关系写入到注册表,并完成到zk的注册
  48. doRegister();
  49. }
  50. //该方法和手写tomcat是一样的原理
  51. private void getProviderClass(String basePackage) {
  52. // 加载指定的包为URL
  53. URL resource = this.getClass().getClassLoader().getResource(basePackage.replaceAll("\\.", "/"));
  54. // 若没有指定的资源,则直接结束
  55. if (resource == null) {
  56. return;
  57. }
  58. // 将URL资源转化为File
  59. File dir = new File(resource.getFile());
  60. // 遍历指定包及其子孙包中的所有文件,查找.class文件
  61. for (File file : dir.listFiles()) {
  62. if (file.isDirectory()) {
  63. // 若当前file为目录,则递归
  64. getProviderClass(basePackage + "." + file.getName());
  65. } else if (file.getName().endsWith(".class")) {
  66. // 去年.class的扩展名,获取到简单类名
  67. String fileName = file.getName().replace(".class", "");
  68. classCache.add(basePackage + "." + fileName);
  69. }
  70. }
  71. }
  72. private void doRegister() throws Exception {
  73. // 若指定包下没有任何实现类,则直接结束
  74. if (classCache.size() == 0) {
  75. return;
  76. }
  77. for (String className : classCache) {
  78. //根据类名获取这个类对象
  79. Class<?> clazz = Class.forName(className);
  80. // 将实现类名与实现类实例写入到map
  81. registryMap.put(className, clazz.newInstance());
  82. //获取接口名称
  83. Class<?>[] interfaces = clazz.getInterfaces();
  84. if (interfaces.length == 0) {
  85. return;
  86. }
  87. // 将服务名称与提供者主机地址写入到zk(也就是当前主机的ip和port)。
  88. center.register(interfaces[0].getName(), serviceAddress);
  89. }
  90. }
  91. //注册完后,启动Netty。用来做RPC间的通信信息传递工作
  92. public void start() throws Exception {
  93. EventLoopGroup parentGroup = new NioEventLoopGroup();
  94. EventLoopGroup childGroup = new NioEventLoopGroup();
  95. try {
  96. ServerBootstrap serverBootstrap = new ServerBootstrap();
  97. serverBootstrap.group(parentGroup, childGroup)
  98. //如果出现并发,来不及处理的连接。方法队列里面。长度最大是1024(这里面的连接都是已经通过三次握手了的)
  99. .option(ChannelOption.SO_BACKLOG, 1024)
  100. //上方的队列,如果存在空闲监测。开启这个配置,那么达到空闲监测的时候后,这些长连接不会被清楚
  101. .childOption(ChannelOption.SO_KEEPALIVE, true)
  102. .channel(NioServerSocketChannel.class)
  103. .childHandler(new ChannelInitializer<SocketChannel>() {
  104. @Override
  105. protected void initChannel(SocketChannel ch) throws Exception {
  106. ChannelPipeline pipeline = ch.pipeline();
  107. pipeline.addLast(new ObjectEncoder());
  108. pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
  109. pipeline.addLast(new RpcServerHandler(registryMap));
  110. }
  111. });
  112. String host = serviceAddress.split(":")[0];
  113. String port = serviceAddress.split(":")[1];
  114. ChannelFuture future = serverBootstrap.bind(host, Integer.valueOf(port)).sync();
  115. System.out.println("Doubbo服务Server启动成功");
  116. future.channel().closeFuture().sync();
  117. }finally {
  118. parentGroup.shutdownGracefully();
  119. childGroup.shutdownGracefully();
  120. }
  121. }
  122. }

RpcServerHandler

  1. package com.gc.server;
  2. import com.gc.core.Invocation;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.SimpleChannelInboundHandler;
  5. import io.netty.util.internal.StringUtil;
  6. import java.util.Map;
  7. /**
  8. * @description: 服务端自定义业务处理器
  9. * @author: GC
  10. * @create: 2020-11-16 17:20
  11. **/
  12. public class RpcServerHandler extends SimpleChannelInboundHandler<Invocation> {
  13. private Map<String, Object> registryMap;
  14. //已经像注册中心注册过的RPC接口
  15. public RpcServerHandler(Map<String, Object> registryMap) {
  16. this.registryMap = registryMap;
  17. }
  18. @Override
  19. protected void channelRead0(ChannelHandlerContext ctx, Invocation msg) throws Exception {
  20. Object result = "404";
  21. // 获取要访问的业务接口名
  22. String interfaceName = msg.getClassName();
  23. // 获取接口的简单类名
  24. String simpleInterfaceName = interfaceName.substring(interfaceName.lastIndexOf(".") + 1);
  25. // 获取接口所在的包名
  26. String basePackage = interfaceName.substring(0, interfaceName.lastIndexOf("."));
  27. // 获取用户要访问的提供者实现类的前辍
  28. String prefix = msg.getPrefix();
  29. // 构建客户端要访问的提供者的key
  30. String key = basePackage + "." + prefix + simpleInterfaceName;
  31. // 若没有指定前辍,则从registryMap中查找第一个指定接口的实现类
  32. if (StringUtil.isNullOrEmpty(prefix)) {
  33. // 查找第一个以接口名结尾的实现类名
  34. for (String registryKey : registryMap.keySet()) {
  35. if (registryKey.endsWith(simpleInterfaceName)) {
  36. key = registryKey;
  37. break;
  38. }
  39. }
  40. }
  41. // 判断注册表中是否存在指定名称(接口名)的服务
  42. if (registryMap.containsKey(key)) {
  43. // 从注册表中获取指定的提供者
  44. Object provider = registryMap.get(key);
  45. // 按照客户端要求进行本地方法调用
  46. result = provider.getClass()
  47. .getMethod(msg.getMethodName(), msg.getParamTypes())
  48. .invoke(provider, msg.getParamValues());
  49. }
  50. //将调用方法返回的值回复给Clinet
  51. ctx.writeAndFlush(result);
  52. ctx.close();
  53. }
  54. @Override
  55. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
  56. throws Exception {
  57. cause.printStackTrace();
  58. ctx.close();
  59. }
  60. }

自定义业务逻辑

  1. package com.gc.service;
  2. /**
  3. * @description: 用户个人信息具体实现
  4. * @author: GC
  5. * @create: 2020-11-16 15:54
  6. **/
  7. public class InfoGcService implements GcService {
  8. public String getGcInfo(String name) {
  9. return "GC的信息获取成功:年纪100。真实姓名:"+name;
  10. }
  11. }
  1. package com.gc.service;
  2. /**
  3. * @description: GC的计划
  4. * @author: GC
  5. * @create: 2020-11-16 15:57
  6. **/
  7. public class PlanInfoGcService implements GcService {
  8. @Override
  9. public String getGcInfo(String name) {
  10. return name+"的计划目前是学习Netty";
  11. }
  12. }

RpcServerStartApplication

  1. package com.gc;
  2. import com.gc.registry.RegistryCenter;
  3. import com.gc.server.RpcServer;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.boot.CommandLineRunner;
  6. import org.springframework.boot.SpringApplication;
  7. import org.springframework.boot.autoconfigure.SpringBootApplication;
  8. @SpringBootApplication
  9. public class RpcServerStartApplication implements CommandLineRunner {
  10. @Autowired
  11. private RpcServer rpcServer;
  12. @Autowired
  13. private RegistryCenter center;
  14. public static void main(String[] args) {
  15. SpringApplication.run(RpcServerStartApplication.class, args);
  16. }
  17. @Override
  18. public void run(String... args) throws Exception {
  19. //注入ZK工具类,在此时传递
  20. rpcServer.setCenter(center);
  21. //在此时设置IP:PORT 传递给Netty使用
  22. rpcServer.setServiceAddress("127.0.0.1:8888");
  23. //将该包下的接口以RPC的形式暴露出去到注册中心中
  24. rpcServer.publish("com.gc.service");
  25. rpcServer.start();
  26. }
  27. }

pom

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter</artifactId>
  4. </dependency>
  5. <!-- api依赖 -->
  6. <dependency>
  7. <groupId>com.gc.core</groupId>
  8. <artifactId>springboot-dubbo-core</artifactId>
  9. <version>0.0.1-SNAPSHOT</version>
  10. </dependency>
  11. <!--curator依赖-->
  12. <dependency>
  13. <groupId>org.apache.curator</groupId>
  14. <artifactId>curator-framework</artifactId>
  15. <version>2.12.0</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.apache.curator</groupId>
  19. <artifactId>curator-recipes</artifactId>
  20. <version>2.12.0</version>
  21. </dependency>
  22. <!-- netty-all依赖 -->
  23. <dependency>
  24. <groupId>io.netty</groupId>
  25. <artifactId>netty-all</artifactId>
  26. <version>4.1.36.Final</version>
  27. </dependency>
  28. <dependency>
  29. <groupId>org.projectlombok</groupId>
  30. <artifactId>lombok</artifactId>
  31. <optional>true</optional>
  32. </dependency>

测试运行

Netty高级应用实战篇(下) - 图4

Netty高级应用实战篇(下) - 图5

Netty高级应用实战篇(下) - 图6

总结

API包

  1. 既然我们使用的是分布式的微服务架构,那么它肯定是一个大项目,被分成了很多个小项目。那么这其中,有一些公用的自定义工具类,或者其它的一些配置。就需要建立一个全局的工具类项目,让它可以以pom文件的方式引入到其它的微服务当中就可以使用了。而不是每一个服务都取重写这些工具类,这样会常用整个项目的代码的冗余。
  2. API包里面我们主要做的事情有:
  • 远程调用的信息封装类。在注册中心中要创建的节点跟节点和连接地址。
  • 负载均衡客户端定义以及负载均衡中的随机算法实现。
  • 具体需要实现的业务接口定义。

客户端

  1. 客户端又可以被称为消费者,既服务端即是提供者。当客户端需要调用服务端的接口时,需要从zk中拿到到该业务接口的调用地址,然后去进行调用。在去zk中拿信息的时候还会给当前路径注册一个Watcher监听器,来监听该节点的内容数据变更。
  2. 在客户端里面我们主要做的事情有:
  • 在做服务发现的时候,读取zk节点数据的时候会注册一个监听器。读到节点数据并解析为本次RPC远程调用地址。
  • 创建一个代理对象去调用具体的方法,如果调用的方法是本地方法,那么就使用invoke( ) 进行调用,否则就使用RPC远程调用。
  • 封装调用消息,使用writeAndFlush( ) 传递给服务端,然后RpcClientHandler中的getResult( ) 获取本次调用的接口。

服务端

  1. 服务端又可以成为提供者,既客户端即是消费者。当接收到客户单的远程调用后执行本地方法调用并返回结果,服务端启动的时候会扫描指定包路径下的业务类,将暴露的RPC接口注册到解析并以临时节点注册到zk中。
  2. 在服务端主要做的事情有:
  • 扫描指定包路径下的类写成节点加载到zk做服务注册。
  • 在Netty的自定义业务处理器中,接受到客户端发过来的消息并解析出具体路径进行本地方法调用,然后使用writeAndFlush( ) 返回具体的结果给客户端。