动手实现RPC
最近利用业于时间实现一个了一个简简单单的 RPC
demo,在这里过程中,遇到了几个问题,也收获一些东西,分享一下这个过程.
前期准备
RPC
是我们在企业开发中比较常见的,同时也是比较熟悉的,但是对于开发一个 RPC
来说,我们需要掌握一些基本的理论知识.
1、RPC
是网络通信(进程之间的通信),所以相比单进程的通信来的慢
2、网络传输的是 字节
,而不是 字符
,或者其他的传输媒介.
3、了解or掌握了 动态代理的运用
。
本质是传输信息,将 我(消费端)
所知的信息通过通信告诉你 (服务端)
,然后你给我返回最终的信息.
动手实现
有了上述的前期准备之后,先剖析一下我们是如何使用 RPC
,服务提供端需要将服务暴露出去,服务消费端需要接入需要的服务。这个过程如下所示,我的讲述也围绕这个过程展开.
服务端暴露服务 —> 消费端引用服务 —> 动态代理 —> 序列化 —> 网络请求 —-> 服务端处理 —> 序列化返回 —> 消费端返回结果.
服务暴露
服务暴露就是将服务发布到(zookeeper)上,提供给其他的消费者访问,当消费者拿到这个信息之后,就可以连接服务端,并发起请求。
暴露就是需要将该服务的基本信息发布出去,这里列举的主要信息有 服务机器host
服务监听端口
服务接口
服务序列化方式
服务权重
服务版本
等
代码如下
1、服务暴露的信息
public class Provider implements Serializable, Cloneable {
private String serviceName;
private String host;
private Integer port;
private String version;
private Integer weight;
private String serialization;
get and set
}
2、暴露服务的过程,这里以 zookeeper
为例,主要是将服务以节点的形式添加到 zk
上
@Override
public void registerService(List<Provider> providerList) {
assert zkClient != null;
providerList.parallelStream().forEach(provider -> {
String host = provider.getHost();
Integer port = provider.getPort();
String serviceName = provider.getServiceName();
String version = provider.getVersion();
Integer weight = provider.getWeight();
String serverPath = root_path + "/" + serviceName + root_provider;
if (!zkClient.exists(serverPath)) {
zkClient.createPersistent(serverPath, true);
}
String finalInfo = host + split + port + split + serviceName + split + version + split + weight + split + provider.getSerialization();
String path = serverPath + "/" + finalInfo;
if (!zkClient.exists(path)) {
log.info("注册服务:{}到ZooKeeper", serverPath);
zkClient.createEphemeral(path);
} else {
log.warn("服务:{}已被注册", serverPath);
}
});
}
然后可以通过 zkCli
命令查看zk上服务的状况.
[zk: localhost:2181(CONNECTED) 0] ls /fuck/top.huzhurong.fuck.UserService/provider
[]
消费端引用服务和动态代理
消费端引用服务端方式大多数还是通过 Spring
的自定义标签引入.使用方式如下.
<fuck:reference id="test" interface="top.huzhurong.fuck.UserService" version="0.0.1"/>
这样就引用了 top.huzhurong.fuck.UserService
这个版本为 0.0.1
的服务. 在使用Spring的时候,我们需要通过使用动态代理去走网络,调用服务上的接口,那么就不可能想普通的 bean
一样去配置,一般说来都是通过 FactoryBean
来进行配置,因为在 getObject
中可以进行 bean的定制化
(大部分的框架也是FactoryBean引入的)。
FactoryBean的使用方式如下
public class ProxyBean implements FactoryBean, InitializingBean {
private Class name;
private Object object;
@Override
public Object getObject() {
return object;
}
@Override
public Class<?> getObjectType() {
return name;
}
@Override
public void afterPropertiesSet() {
this.build();
}
private void build() {
//动态代理产生一个代理bean
this.object = Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{this.name}, (proxy, method, args) -> {
//在invoke中定制我们的服务,可以走tcp/http等等
if (method.getName().equalsIgnoreCase("name")) {
return "调用name方法";
}
if (method.getName().equalsIgnoreCase("toString")) {
return "调用toString方法";
}
return "111";
});
}
}
这里只是一个简单的说明服务引用的过程,具体的过程如下
- 注册消费节点到zookeeper
- 获取服务者列表
- 订阅该服务接口
- 搭建动态代理
篇幅有限,我就介绍下搭建动态代理,其他三个还挺简单的。
Object object = Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{Class.forName(this.interfaceName)}
, new FuckRpcInvocationHandler(this));
class FuckRpcInvocationHandler implements InvocationHandler {
//字段
//构造方法
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//获取服务列表
List<Provider> all = ProviderSet.getAll(this.className);
//软负载均衡
Provider provider = loadBalance.getProvider(all);
String info = provider.buildIfno();
SocketChannel channel = ChannelMap.get(info);
if (channel == null) {
//建立TCP连接
Client client = new NettyClient(provider, this.serialization);
client.connect(provider.getHost(), provider.getPort());
channel = ChannelMap.get(info);
}
SocketChannel finalChannel = channel;
Future<Response> submit = TempResultSet.executorService.submit(() -> {
//请求写入tcp通道
finalChannel.writeAndFlush(request);
//这里可以改造成 CountDownLatch,可以比 ;; 循环要好
for (; ; ) {
Response response = TempResultSet.get(request.getRequestId());
if (response != null) {
return response;
}
}
});
Response response = submit.get(this.timeout, TimeUnit.SECONDS);
}
}
动态代理做的主要是,获取服务列表,软件负载处理,建立tcp连接,通信这几步.
动态代理仅仅是实际调用的时候才会进入invoke方法,实例化不进入,每一次代理调用都会进入
invoke
序列化和网络请求
序列化网上都文章很多,我也只是支持了 protostuff
和 jdk
序列化方式. 本身很难,都是封装之后还是ok的,重点介绍一下网络传输的处理
当解决完序列化之后,就是网络传输了,都知道网络传输的是字节,但是怎么去使用网络进行透明传输我们都很头痛,而netty在使用上降低了我们的入门难度,其简单的api可以快速的进行上手,如果你还没有通过,可以看看netty的example就可以动手写了。
在网络传输部分的处理中,一个是协议的处理,一个是如何将处理完的请求结果赋值给正确的请求对象.
1、协议的处理也非常简单,就是一个常规的length + data , 作为自定义协议,这种处理可以让我们快速的处理而不会纠结于协议的正确性
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) {
if (byteBuf.readableBytes() <= HEAD_LENGTH) {
return;
}
byteBuf.markReaderIndex();//标记位置
int dataLength = byteBuf.readInt();
if (byteBuf.readableBytes() < dataLength) {
byteBuf.resetReaderIndex();//可读取的数据不够
return;
}
byte[] dataArray = new byte[dataLength];
byteBuf.readBytes(dataArray);
Request request = serialization.deSerialize(dataArray, Request.class);
if (log.isDebugEnabled()) {
log.debug("接受到消费者请求:{},请求内容:{}", ctx.channel().toString(), request);
}
list.add(request);
}
2、如何将处理结果返回给正确的请求对象,我们都知道在执行请求的时候都需要一个 RI --> requestId
去标示这个请求,在我们的rpc当中也是如此,确保请求和响应的是一家人
public class Request implements Serializable {
private String requestId;//请求标示
private String serviceName;//服务名称
private String methodName;//方法名,Method不能被序列化
private Class<?>[] parameters;//参数类型,用于获取对于的执行方法
private Object[] args;//实际参数
}
3、服务处理,可以选择新建立线程池而不是直接使用 netty
的work io 线程池。
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Serializable serializable) {
if (serializable instanceof Request) {
Request request = (Request) serializable;
responseTask.execute(new ResponseTask(request, channelHandlerContext, this.applicationContext));
}
}
@Override
public void run() {
String serviceName = request.getServiceName();
String methodName = request.getMethodName();
Class<?>[] parameters = request.getParameters();
Object[] args = request.getArgs();
Response response = new Response();
response.setRequestId(request.getRequestId());
response.setSuccess(false);
try {
//获取服务
Object service = ServiceCache.getService(serviceName);
if (service == null) {
Class<?> aClass = ClassUtils.forName(serviceName, ClassUtils.getDefaultClassLoader());
service = applicationContext.getBean(aClass);
ServiceCache.put(serviceName, service);
}
//获取方法
Method method = service.getClass().getDeclaredMethod(methodName, parameters);
Object invoke = method.invoke(service, args);
System.out.println("invoke:" + invoke);
response.setSuccess(true);
response.setObject(invoke);
} catch (ClassNotFoundException | IllegalAccessException e) {
response.setException(e);
} catch (NoSuchMethodException e) {
e.printStackTrace();
response.setException(e);
} catch (InvocationTargetException e) {
e.printStackTrace();
response.setException(e.getTargetException());
}
//写入channel中
channelHandlerContext.writeAndFlush(response);
}
消费端返回结果
当服务端写入数据的时候,如果网络通畅,基本上一下就到了客户端,我们的处理也很简单,就是放到一个Map中,而消费线程一直在map中找这个数据
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Serializable serializable) {
if (serializable instanceof Response) {
Response response = (Response) serializable;
//写入map
TempResultSet.put(response.getRequestId(), response);
}
}
//循环写入
for (; ; ) {
Response response = TempResultSet.get(request.getRequestId());
if (response != null) {
return response;
}
}
到这里一个简单的rpc调用就可以起来了。
小结
一个简单的 rpc
就新鲜出炉了,但是其实还有很多可以改造的点,例如拦截(责任链+SPI
),例如观察者模式的运用,不过也是从中学习到了一个RPC基本的使用方法,还需要深入到了解线程池使用,这里的使用也是很粗制滥造的。不过能在几天之内写完还是很开心,项目在这里 ——> fuck-rpc 的简单实现,你也可以试试哦
2018-12-05