简介
Dubbo 框架本身就是一个 RPC 框架,不同的是消费者要连接的服务端 IP 与端口号不是硬编码在服务端的,而是从 zk 中读取到的。
一个 Dubbo 应用中会存在很多服务提供者与消费者。每个提供者都是一个 Netty Server,其会对外暴露自己所在主机的 IP 与 Port。每个消费者都是一个 Netty Client,其会通过连接相应主机的 IP 与 Port 来获取相应的服务。
服务提供者的 IP 与 Port 是如何对外暴露的呢?其会为自己所提供的服务起一个服务名称,一般为业务接口名。然后将该服务名称与对应提供者主机的 IP 与 Port 相绑定注册到zk 中。
服务消费者会从服务注册中心 zk 中查找自己所需要的服务名称,一般为业务接口名,然后获取到该服务名称对应的所有提供者主机信息,并通过负载均衡方式选取一个主机进行连接,获取相应服务。
自定义新增需求
当客户端通过负载均衡策略选择了某一提供者主机后,我们这里新增了一个需求:提供者主机中提供同一服务名称(接口名)的实现类有多个。这样,消费者可以指定其要调用的实现类。若消费者没有指定要调用的实现类,其会调用到注册到中第一个注册的实现类。
为了实现提供者端业务接口可以有多个实现类供客户端选择,这里要求实现类名必须是一个前辍 prefix 后是业务接口名。这样,消费者在进行消费时,可以通过前辍来指定要调用的是哪个实现类。
思路定义
- 核心工具类:springboot-dubbo-core
- ZKConstant:主要定义连接zk时的字符串常量定义。
- Invocation:RPC接口远程调用的信息。
- LoadBalance:负载均衡接口定义。
- RandomLoadBalance:负载均衡-随机接口实现。
- GcService:业务接口定义。
- 服务端:springboot-dubbo-clinet
- ServiceDiscovery:服务发现具体定义。
- ZKServiceDiscovery:服务发现具体实现。
- RpcProxy:使用代理调用RPC远程接口。
- RpcClientHandler:Netty的客户端业务处理器。
- RpcClinetStartApplication:客户度启动入口。
服务端:springboot-dubbo-server
- RegistryCenter: zk注册具体定义。
- ZKRegistryCenter:RegistryCenter的具体实现,连接ZK并注册暴露的RPC接口。
- RpcServer:将指定的目录下的RPC接口加载到缓存中,并进行注册到zk中。
- RpcServerHandler:主要处理RPC的连接请求。
- RpcServerStartApplication:服务端启动入口。
项目结构图
编码实现
API包
ZKConstant
package com.gc.core;
/**
* @description: 注册中心zk常量配置
* @author: GC
* @create: 2020-11-16 15:16
**/
public class ZKConstant {
//连接集群机器
public final static String ZK_CLUSTER = "127.0.0.1:2181";
//创建默认跟节点
public final static String ZK_DUBBO_ROOT_PATH = "/gcdubbo";
}
Invocation
package com.gc.core;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import java.io.Serializable;
/**
* @description: 封装调用远程RPC信息
* @author: GC
* @create: 2020-11-16 15:19
**/
@Data
@Builder
@AllArgsConstructor
public class Invocation implements Serializable {
//接口名
private String className;
//方法名
private String methodName;
//参数类型列表
private Class<?>[] paramTypes;
//参数值列表
private Object[] paramValues;
//业务接口的前缀
private String prefix;
}
LoadBalance
package com.gc.loadbalance;
import java.util.List;
public interface LoadBalance {
/**
*
* @param invokerPaths 所有提供者主机列表
* @return 负载均衡后的结果
*/
String choose(List<String> invokerPaths);
}
RandomLoadBalance
package com.gc.loadbalance;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Random;
/**
* 随机负载均衡策略
*/
@Component
public class RandomLoadBalance implements LoadBalance {
@Override
public String choose(List<String> invokerPaths) {
int index = new Random().nextInt(invokerPaths.size());
return invokerPaths.get(index);
}
}
GcService
package com.gc.service;
/**
* @description: 用户个人信息
* @author: GC
* @create: 2020-11-16 15:50
**/
public interface GcService {
/**
* 获取GC的信息
* @param name
* @return
*/
public String getGcInfo(String name);
}
pom
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
<scope>compile</scope>
</dependency>
客户端
ServiceDiscovery
package com.gc.discovery;
/**
* @description: 服务发现规范定义
* @author: GC
* @create: 2020-11-16 16:14
**/
public interface ServiceDiscovery {
String discovery(String serviceName);
}
ZKServiceDiscovery
package com.gc.discovery;
import com.gc.core.ZKConstant;
import com.gc.loadbalance.LoadBalance;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @description: 注册具体实现
* @author: GC
* @create: 2020-11-16 16:14
**/
@Component
public class ZKServiceDiscovery implements ServiceDiscovery {
@Autowired
private LoadBalance loadBalance;
//ZK的工具类客户端
private CuratorFramework client;
//读取Server端的主机
private List<String> servers;
public ZKServiceDiscovery() {
client = CuratorFrameworkFactory.builder()
.connectString(ZKConstant.ZK_CLUSTER)//连接注册中心的地址
.connectionTimeoutMs(10000) //连接超时上线
.sessionTimeoutMs(4000) //会话过期时间
//连接失败时的重试策略。最多10次,每连接一次sheel一秒
.retryPolicy(new ExponentialBackoffRetry(1000, 10))
.build();
client.start();
}
@Override
public String discovery(String serviceName) {
try {
//配置的根路径 + 服务名称的类路径
String servicePath = ZKConstant.ZK_DUBBO_ROOT_PATH + "/" + serviceName;
servers = client.getChildren()
//在该路径在注册一个子节点列表变更监听器。
.usingWatcher((CuratorWatcher) event -> {
servers = client.getChildren().forPath(servicePath);
}).forPath(servicePath);//获得该路径下的所有服务节点
if(servers.size() == 0) {
return null;
}
} catch (Exception e) {
e.printStackTrace();
}
return loadBalance.choose(servers);
}
}
RpcProxy
package com.gc.client;
import com.gc.core.Invocation;
import com.gc.discovery.ServiceDiscovery;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
/**
* @description: 使用代理调用远程接口
* @author: GC
* @create: 2020-11-16 17:55
**/
@Component
public class RpcProxy {
@Autowired
private ServiceDiscovery serviceDiscovery;
public <T> T create(Class<?> clazz, String prefix){
return (T)Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//如果是本地方法,则直接本地调用
if (Object.class.equals(method.getDeclaringClass())) {
return method.invoke(args);
}
//否则使用RPC远程调用
return rpcInvoke(clazz, method, args, prefix);
}
});
}
/**
* @param clazz 调用的类
* @param method 方法名称
* @param args 参数
* @param prefix 前缀
* @return Object 远程调用结果集
*/
private Object rpcInvoke(Class<?> clazz, Method method, Object[] args, String prefix) {
RpcClientHandler rpcClientHandler = new RpcClientHandler();
EventLoopGroup clinetLoopGroup = new NioEventLoopGroup();
//客户端连接服务器的地址
String serviceAddress = null;
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(clinetLoopGroup)
//开启Nagle算法,它默认是关闭的
//该算法的目的就是为了尽可能发送大块数据,避免网络中充斥着许多小数据块。
.option(ChannelOption.TCP_NODELAY, true)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
pipeline.addLast(rpcClientHandler);
}
});
/**
* 服务发现
* 拿到类名称,从zk中读取到远程RPC接口的暴露地址。用于本次Netty的连接地址
*/
serviceAddress = serviceDiscovery.discovery(clazz.getName());
//如果没有拿到连接地址,那么本次连接终止
if (serviceAddress == null) {
return null;
}
String ip = serviceAddress.split(":")[0];
String port = serviceAddress.split(":")[1];
ChannelFuture future = bootstrap.connect(ip, Integer.parseInt(port)).sync();
//组装调用信息,传递给服务端
future.channel().writeAndFlush(
Invocation.builder()
.className(clazz.getName())
.methodName(method.getName())
.paramTypes(method.getParameterTypes())
.prefix(prefix)
.paramValues(args)
.build()
).sync();
future.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
clinetLoopGroup.shutdownGracefully();
}
return rpcClientHandler.getResult();
}
}
RpcClientHandler
package com.gc.client;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* @description: 使用代理调用远程接口返回的结果集
* @author: GC
* @create: 2020-11-16 17:55
**/
public class RpcClientHandler extends SimpleChannelInboundHandler<Object> {
private Object result;
public Object getResult() {
return result;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
this.result = msg;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
RpcClinetStartApplication
package com.gc;
import com.gc.client.RpcProxy;
import com.gc.service.GcService;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import javax.annotation.Resource;
@SpringBootApplication
public class RpcClinetStartApplication implements CommandLineRunner {
@Resource
private RpcProxy proxy;
public static void main(String[] args) {
SpringApplication.run(RpcClinetStartApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
GcService infoService = proxy.create(GcService.class, "Info");
System.out.println(infoService.getGcInfo("阿伟"));
GcService planInfoService = proxy.create(GcService.class, "PlanInfo");
System.err.println(planInfoService.getGcInfo("GC"));
}
}
pom
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- api依赖 -->
<dependency>
<groupId>com.gc.core</groupId>
<artifactId>springboot-dubbo-core</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<!--curator依赖-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
<!-- netty-all依赖 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.36.Final</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
服务端
RegistryCenter
package com.gc.registry;
/**
* @description: 注册规范定义
* @author: GC
* @create: 2020-11-16 16:14
**/
public interface RegistryCenter {
/**
*
* @param serviceName 注册到注册中心的服务名称,一般为业务接口名
* @param serviceAddress 提供该服务的主机的ip:port
*/
void register(String serviceName, String serviceAddress) throws Exception;
}
ZKRegistryCenter
package com.gc.registry;
import com.gc.core.ZKConstant;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.zookeeper.CreateMode;import org.springframework.stereotype.Component;
/**
* @description: 注册规范实现
* @author: GC
* @create: 2020-11-16 16:15
**/
@Component
public class ZKRegistryCenter implements RegistryCenter {
private CuratorFramework client;
public ZKRegistryCenter(){
// 创建并初始化zk的客户端
client = CuratorFrameworkFactory.builder()
.connectString(ZKConstant.ZK_CLUSTER)//连接到当前集群地址
.connectionTimeoutMs(10000) //连接超时时间
.sessionTimeoutMs(4000) //一次会话失效时间
//当连接失败时,重试的连接策略,每1秒连接1次,最多连接10次
.retryPolicy(new ExponentialBackoffRetry(1000, 10)).build();
client.start();
}
@Override
public void register(String serviceName, String serviceAddress) throws Exception {
String pathRoot = ZKConstant.ZK_DUBBO_ROOT_PATH + "/" + serviceName;
//创建临时节点
String hostPath = pathRoot + "/" + serviceAddress;
if(client.checkExists().forPath(hostPath) == null){
String host =client.create()
.creatingParentsIfNeeded() //如果当前要创建的子节点还没有父节点,那么就在此时创建
.withMode(CreateMode.EPHEMERAL)//创建的节点类型,临时节点
.forPath(hostPath);//创建节点路径
System.err.println("创建节点:"+host);
}
}
}
RpcServer
package com.gc.server;
import com.gc.registry.RegistryCenter;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolver;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import lombok.Data;
import org.springframework.stereotype.Component;
import java.io.File;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @description: 将暴露的RPC接口注册到解析并以临时节点注册到zk中
* @author: GC
* @create: 2020-11-16 16:54
**/
@Component
@Data
public class RpcServer {
// 服务注册表
private Map<String, Object> registryMap = new HashMap<>();
//获取到指定包的需要暴露的RPC接口全限定类名
private List<String> classCache = new ArrayList<>();
//ZK的连接,和创建节点工具类
private RegistryCenter center;
//当前服务端的ip:prot
private String serviceAddress;
// 添加上无参构造器
public RpcServer() {
}
public RpcServer(RegistryCenter center, String serviceAddress) {
this.center = center;
this.serviceAddress = serviceAddress;
}
public void publish(String basePackage) throws Exception {
// 将指定包下的提供者类名写入到classCache中进行缓存
getProviderClass(basePackage);
// 将提供者名与实例的对应关系写入到注册表,并完成到zk的注册
doRegister();
}
//该方法和手写tomcat是一样的原理
private void getProviderClass(String basePackage) {
// 加载指定的包为URL
URL resource = this.getClass().getClassLoader().getResource(basePackage.replaceAll("\\.", "/"));
// 若没有指定的资源,则直接结束
if (resource == null) {
return;
}
// 将URL资源转化为File
File dir = new File(resource.getFile());
// 遍历指定包及其子孙包中的所有文件,查找.class文件
for (File file : dir.listFiles()) {
if (file.isDirectory()) {
// 若当前file为目录,则递归
getProviderClass(basePackage + "." + file.getName());
} else if (file.getName().endsWith(".class")) {
// 去年.class的扩展名,获取到简单类名
String fileName = file.getName().replace(".class", "");
classCache.add(basePackage + "." + fileName);
}
}
}
private void doRegister() throws Exception {
// 若指定包下没有任何实现类,则直接结束
if (classCache.size() == 0) {
return;
}
for (String className : classCache) {
//根据类名获取这个类对象
Class<?> clazz = Class.forName(className);
// 将实现类名与实现类实例写入到map
registryMap.put(className, clazz.newInstance());
//获取接口名称
Class<?>[] interfaces = clazz.getInterfaces();
if (interfaces.length == 0) {
return;
}
// 将服务名称与提供者主机地址写入到zk(也就是当前主机的ip和port)。
center.register(interfaces[0].getName(), serviceAddress);
}
}
//注册完后,启动Netty。用来做RPC间的通信信息传递工作
public void start() throws Exception {
EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(parentGroup, childGroup)
//如果出现并发,来不及处理的连接。方法队列里面。长度最大是1024(这里面的连接都是已经通过三次握手了的)
.option(ChannelOption.SO_BACKLOG, 1024)
//上方的队列,如果存在空闲监测。开启这个配置,那么达到空闲监测的时候后,这些长连接不会被清楚
.childOption(ChannelOption.SO_KEEPALIVE, true)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
pipeline.addLast(new RpcServerHandler(registryMap));
}
});
String host = serviceAddress.split(":")[0];
String port = serviceAddress.split(":")[1];
ChannelFuture future = serverBootstrap.bind(host, Integer.valueOf(port)).sync();
System.out.println("Doubbo服务Server启动成功");
future.channel().closeFuture().sync();
}finally {
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
}
}
RpcServerHandler
package com.gc.server;
import com.gc.core.Invocation;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.internal.StringUtil;
import java.util.Map;
/**
* @description: 服务端自定义业务处理器
* @author: GC
* @create: 2020-11-16 17:20
**/
public class RpcServerHandler extends SimpleChannelInboundHandler<Invocation> {
private Map<String, Object> registryMap;
//已经像注册中心注册过的RPC接口
public RpcServerHandler(Map<String, Object> registryMap) {
this.registryMap = registryMap;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Invocation msg) throws Exception {
Object result = "404";
// 获取要访问的业务接口名
String interfaceName = msg.getClassName();
// 获取接口的简单类名
String simpleInterfaceName = interfaceName.substring(interfaceName.lastIndexOf(".") + 1);
// 获取接口所在的包名
String basePackage = interfaceName.substring(0, interfaceName.lastIndexOf("."));
// 获取用户要访问的提供者实现类的前辍
String prefix = msg.getPrefix();
// 构建客户端要访问的提供者的key
String key = basePackage + "." + prefix + simpleInterfaceName;
// 若没有指定前辍,则从registryMap中查找第一个指定接口的实现类
if (StringUtil.isNullOrEmpty(prefix)) {
// 查找第一个以接口名结尾的实现类名
for (String registryKey : registryMap.keySet()) {
if (registryKey.endsWith(simpleInterfaceName)) {
key = registryKey;
break;
}
}
}
// 判断注册表中是否存在指定名称(接口名)的服务
if (registryMap.containsKey(key)) {
// 从注册表中获取指定的提供者
Object provider = registryMap.get(key);
// 按照客户端要求进行本地方法调用
result = provider.getClass()
.getMethod(msg.getMethodName(), msg.getParamTypes())
.invoke(provider, msg.getParamValues());
}
//将调用方法返回的值回复给Clinet
ctx.writeAndFlush(result);
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
}
}
自定义业务逻辑
package com.gc.service;
/**
* @description: 用户个人信息具体实现
* @author: GC
* @create: 2020-11-16 15:54
**/
public class InfoGcService implements GcService {
public String getGcInfo(String name) {
return "GC的信息获取成功:年纪100。真实姓名:"+name;
}
}
package com.gc.service;
/**
* @description: GC的计划
* @author: GC
* @create: 2020-11-16 15:57
**/
public class PlanInfoGcService implements GcService {
@Override
public String getGcInfo(String name) {
return name+"的计划目前是学习Netty";
}
}
RpcServerStartApplication
package com.gc;
import com.gc.registry.RegistryCenter;
import com.gc.server.RpcServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RpcServerStartApplication implements CommandLineRunner {
@Autowired
private RpcServer rpcServer;
@Autowired
private RegistryCenter center;
public static void main(String[] args) {
SpringApplication.run(RpcServerStartApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
//注入ZK工具类,在此时传递
rpcServer.setCenter(center);
//在此时设置IP:PORT 传递给Netty使用
rpcServer.setServiceAddress("127.0.0.1:8888");
//将该包下的接口以RPC的形式暴露出去到注册中心中
rpcServer.publish("com.gc.service");
rpcServer.start();
}
}
pom
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- api依赖 -->
<dependency>
<groupId>com.gc.core</groupId>
<artifactId>springboot-dubbo-core</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<!--curator依赖-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
<!-- netty-all依赖 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.36.Final</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
测试运行
总结
API包
既然我们使用的是分布式的微服务架构,那么它肯定是一个大项目,被分成了很多个小项目。那么这其中,有一些公用的自定义工具类,或者其它的一些配置。就需要建立一个全局的工具类项目,让它可以以pom文件的方式引入到其它的微服务当中就可以使用了。而不是每一个服务都取重写这些工具类,这样会常用整个项目的代码的冗余。
在API包里面我们主要做的事情有:
- 远程调用的信息封装类。在注册中心中要创建的节点跟节点和连接地址。
- 负载均衡客户端定义以及负载均衡中的随机算法实现。
- 具体需要实现的业务接口定义。
客户端
客户端又可以被称为消费者,既服务端即是提供者。当客户端需要调用服务端的接口时,需要从zk中拿到到该业务接口的调用地址,然后去进行调用。在去zk中拿信息的时候还会给当前路径注册一个Watcher监听器,来监听该节点的内容数据变更。
在客户端里面我们主要做的事情有:
- 在做服务发现的时候,读取zk节点数据的时候会注册一个监听器。读到节点数据并解析为本次RPC远程调用地址。
- 创建一个代理对象去调用具体的方法,如果调用的方法是本地方法,那么就使用invoke( ) 进行调用,否则就使用RPC远程调用。
- 封装调用消息,使用writeAndFlush( ) 传递给服务端,然后RpcClientHandler中的getResult( ) 获取本次调用的接口。
服务端
服务端又可以成为提供者,既客户端即是消费者。当接收到客户单的远程调用后执行本地方法调用并返回结果,服务端启动的时候会扫描指定包路径下的业务类,将暴露的RPC接口注册到解析并以临时节点注册到zk中。
在服务端主要做的事情有:
- 扫描指定包路径下的类写成节点加载到zk做服务注册。
- 在Netty的自定义业务处理器中,接受到客户端发过来的消息并解析出具体路径进行本地方法调用,然后使用writeAndFlush( ) 返回具体的结果给客户端。