动手实现RPC

最近利用业于时间实现一个了一个简简单单的 RPC demo,在这里过程中,遇到了几个问题,也收获一些东西,分享一下这个过程.

前期准备

RPC 是我们在企业开发中比较常见的,同时也是比较熟悉的,但是对于开发一个 RPC 来说,我们需要掌握一些基本的理论知识.

1、RPC 是网络通信(进程之间的通信),所以相比单进程的通信来的慢

2、网络传输的是 字节 ,而不是 字符,或者其他的传输媒介.

3、了解or掌握了 动态代理的运用

本质是传输信息,将 我(消费端) 所知的信息通过通信告诉你 (服务端) ,然后你给我返回最终的信息.

动手实现

有了上述的前期准备之后,先剖析一下我们是如何使用 RPC ,服务提供端需要将服务暴露出去,服务消费端需要接入需要的服务。这个过程如下所示,我的讲述也围绕这个过程展开.

服务端暴露服务 —> 消费端引用服务 —> 动态代理 —> 序列化 —> 网络请求 —-> 服务端处理 —> 序列化返回 —> 消费端返回结果.

服务暴露

服务暴露就是将服务发布到(zookeeper)上,提供给其他的消费者访问,当消费者拿到这个信息之后,就可以连接服务端,并发起请求。
暴露就是需要将该服务的基本信息发布出去,这里列举的主要信息有 服务机器host 服务监听端口 服务接口 服务序列化方式 服务权重 服务版本
代码如下

1、服务暴露的信息

  1. public class Provider implements Serializable, Cloneable {
  2. private String serviceName;
  3. private String host;
  4. private Integer port;
  5. private String version;
  6. private Integer weight;
  7. private String serialization;
  8. get and set
  9. }

2、暴露服务的过程,这里以 zookeeper 为例,主要是将服务以节点的形式添加到 zk

  1. @Override
  2. public void registerService(List<Provider> providerList) {
  3. assert zkClient != null;
  4. providerList.parallelStream().forEach(provider -> {
  5. String host = provider.getHost();
  6. Integer port = provider.getPort();
  7. String serviceName = provider.getServiceName();
  8. String version = provider.getVersion();
  9. Integer weight = provider.getWeight();
  10. String serverPath = root_path + "/" + serviceName + root_provider;
  11. if (!zkClient.exists(serverPath)) {
  12. zkClient.createPersistent(serverPath, true);
  13. }
  14. String finalInfo = host + split + port + split + serviceName + split + version + split + weight + split + provider.getSerialization();
  15. String path = serverPath + "/" + finalInfo;
  16. if (!zkClient.exists(path)) {
  17. log.info("注册服务:{}到ZooKeeper", serverPath);
  18. zkClient.createEphemeral(path);
  19. } else {
  20. log.warn("服务:{}已被注册", serverPath);
  21. }
  22. });
  23. }

然后可以通过 zkCli 命令查看zk上服务的状况.

  1. [zk: localhost:2181(CONNECTED) 0] ls /fuck/top.huzhurong.fuck.UserService/provider
  2. []

消费端引用服务和动态代理

消费端引用服务端方式大多数还是通过 Spring 的自定义标签引入.使用方式如下.

  1. <fuck:reference id="test" interface="top.huzhurong.fuck.UserService" version="0.0.1"/>

这样就引用了 top.huzhurong.fuck.UserService 这个版本为 0.0.1 的服务. 在使用Spring的时候,我们需要通过使用动态代理去走网络,调用服务上的接口,那么就不可能想普通的 bean
一样去配置,一般说来都是通过 FactoryBean 来进行配置,因为在 getObject 中可以进行 bean的定制化(大部分的框架也是FactoryBean引入的)。

FactoryBean的使用方式如下

  1. public class ProxyBean implements FactoryBean, InitializingBean {
  2. private Class name;
  3. private Object object;
  4. @Override
  5. public Object getObject() {
  6. return object;
  7. }
  8. @Override
  9. public Class<?> getObjectType() {
  10. return name;
  11. }
  12. @Override
  13. public void afterPropertiesSet() {
  14. this.build();
  15. }
  16. private void build() {
  17. //动态代理产生一个代理bean
  18. this.object = Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{this.name}, (proxy, method, args) -> {
  19. //在invoke中定制我们的服务,可以走tcp/http等等
  20. if (method.getName().equalsIgnoreCase("name")) {
  21. return "调用name方法";
  22. }
  23. if (method.getName().equalsIgnoreCase("toString")) {
  24. return "调用toString方法";
  25. }
  26. return "111";
  27. });
  28. }
  29. }

这里只是一个简单的说明服务引用的过程,具体的过程如下

  • 注册消费节点到zookeeper
  • 获取服务者列表
  • 订阅该服务接口
  • 搭建动态代理

篇幅有限,我就介绍下搭建动态代理,其他三个还挺简单的。

  1. Object object = Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{Class.forName(this.interfaceName)}
  2. , new FuckRpcInvocationHandler(this));
  3. class FuckRpcInvocationHandler implements InvocationHandler {
  4. //字段
  5. //构造方法
  6. @Override
  7. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  8. //获取服务列表
  9. List<Provider> all = ProviderSet.getAll(this.className);
  10. //软负载均衡
  11. Provider provider = loadBalance.getProvider(all);
  12. String info = provider.buildIfno();
  13. SocketChannel channel = ChannelMap.get(info);
  14. if (channel == null) {
  15. //建立TCP连接
  16. Client client = new NettyClient(provider, this.serialization);
  17. client.connect(provider.getHost(), provider.getPort());
  18. channel = ChannelMap.get(info);
  19. }
  20. SocketChannel finalChannel = channel;
  21. Future<Response> submit = TempResultSet.executorService.submit(() -> {
  22. //请求写入tcp通道
  23. finalChannel.writeAndFlush(request);
  24. //这里可以改造成 CountDownLatch,可以比 ;; 循环要好
  25. for (; ; ) {
  26. Response response = TempResultSet.get(request.getRequestId());
  27. if (response != null) {
  28. return response;
  29. }
  30. }
  31. });
  32. Response response = submit.get(this.timeout, TimeUnit.SECONDS);
  33. }
  34. }

动态代理做的主要是,获取服务列表,软件负载处理,建立tcp连接,通信这几步.

动态代理仅仅是实际调用的时候才会进入invoke方法,实例化不进入,每一次代理调用都会进入 invoke

序列化和网络请求

序列化网上都文章很多,我也只是支持了 protostuffjdk 序列化方式. 本身很难,都是封装之后还是ok的,重点介绍一下网络传输的处理

当解决完序列化之后,就是网络传输了,都知道网络传输的是字节,但是怎么去使用网络进行透明传输我们都很头痛,而netty在使用上降低了我们的入门难度,其简单的api可以快速的进行上手,如果你还没有通过,可以看看netty的example就可以动手写了。

在网络传输部分的处理中,一个是协议的处理,一个是如何将处理完的请求结果赋值给正确的请求对象.

1、协议的处理也非常简单,就是一个常规的length + data , 作为自定义协议,这种处理可以让我们快速的处理而不会纠结于协议的正确性

  1. @Override
  2. protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) {
  3. if (byteBuf.readableBytes() <= HEAD_LENGTH) {
  4. return;
  5. }
  6. byteBuf.markReaderIndex();//标记位置
  7. int dataLength = byteBuf.readInt();
  8. if (byteBuf.readableBytes() < dataLength) {
  9. byteBuf.resetReaderIndex();//可读取的数据不够
  10. return;
  11. }
  12. byte[] dataArray = new byte[dataLength];
  13. byteBuf.readBytes(dataArray);
  14. Request request = serialization.deSerialize(dataArray, Request.class);
  15. if (log.isDebugEnabled()) {
  16. log.debug("接受到消费者请求:{},请求内容:{}", ctx.channel().toString(), request);
  17. }
  18. list.add(request);
  19. }

2、如何将处理结果返回给正确的请求对象,我们都知道在执行请求的时候都需要一个 RI --> requestId 去标示这个请求,在我们的rpc当中也是如此,确保请求和响应的是一家人

  1. public class Request implements Serializable {
  2. private String requestId;//请求标示
  3. private String serviceName;//服务名称
  4. private String methodName;//方法名,Method不能被序列化
  5. private Class<?>[] parameters;//参数类型,用于获取对于的执行方法
  6. private Object[] args;//实际参数
  7. }

3、服务处理,可以选择新建立线程池而不是直接使用 netty 的work io 线程池。

  1. @Override
  2. protected void channelRead0(ChannelHandlerContext channelHandlerContext, Serializable serializable) {
  3. if (serializable instanceof Request) {
  4. Request request = (Request) serializable;
  5. responseTask.execute(new ResponseTask(request, channelHandlerContext, this.applicationContext));
  6. }
  7. }
  8. @Override
  9. public void run() {
  10. String serviceName = request.getServiceName();
  11. String methodName = request.getMethodName();
  12. Class<?>[] parameters = request.getParameters();
  13. Object[] args = request.getArgs();
  14. Response response = new Response();
  15. response.setRequestId(request.getRequestId());
  16. response.setSuccess(false);
  17. try {
  18. //获取服务
  19. Object service = ServiceCache.getService(serviceName);
  20. if (service == null) {
  21. Class<?> aClass = ClassUtils.forName(serviceName, ClassUtils.getDefaultClassLoader());
  22. service = applicationContext.getBean(aClass);
  23. ServiceCache.put(serviceName, service);
  24. }
  25. //获取方法
  26. Method method = service.getClass().getDeclaredMethod(methodName, parameters);
  27. Object invoke = method.invoke(service, args);
  28. System.out.println("invoke:" + invoke);
  29. response.setSuccess(true);
  30. response.setObject(invoke);
  31. } catch (ClassNotFoundException | IllegalAccessException e) {
  32. response.setException(e);
  33. } catch (NoSuchMethodException e) {
  34. e.printStackTrace();
  35. response.setException(e);
  36. } catch (InvocationTargetException e) {
  37. e.printStackTrace();
  38. response.setException(e.getTargetException());
  39. }
  40. //写入channel中
  41. channelHandlerContext.writeAndFlush(response);
  42. }

消费端返回结果

当服务端写入数据的时候,如果网络通畅,基本上一下就到了客户端,我们的处理也很简单,就是放到一个Map中,而消费线程一直在map中找这个数据

  1. @Override
  2. protected void channelRead0(ChannelHandlerContext channelHandlerContext, Serializable serializable) {
  3. if (serializable instanceof Response) {
  4. Response response = (Response) serializable;
  5. //写入map
  6. TempResultSet.put(response.getRequestId(), response);
  7. }
  8. }
  9. //循环写入
  10. for (; ; ) {
  11. Response response = TempResultSet.get(request.getRequestId());
  12. if (response != null) {
  13. return response;
  14. }
  15. }

到这里一个简单的rpc调用就可以起来了。

小结

一个简单的 rpc 就新鲜出炉了,但是其实还有很多可以改造的点,例如拦截(责任链+SPI),例如观察者模式的运用,不过也是从中学习到了一个RPC基本的使用方法,还需要深入到了解线程池使用,这里的使用也是很粗制滥造的。不过能在几天之内写完还是很开心,项目在这里 ——> fuck-rpc 的简单实现,你也可以试试哦

2018-12-05