HSF是一个RPC的中间件,微服务之间的远程调用,是现在各类微服务之间通信的主要采用的一个中间件。既然是远程调用,其中与其相关的序列化、网络通信知识必不可少。

    前置知识

    1、对象序列化。
    java里把对象转换成字节码叫做对象的序列化。把字节码恢复陈对象叫做对象的反序列化。
    为什么要这么做?主要为了方便对象持久化(对象的字节序列一直存在硬盘的一个文件里,随时可以整出来)+网络传输对象。
    如何序列化?实现Serializable接口

    1. public class person implements Serializable {
    2. // 序列化uid,用来保证版本一致性。
    3. private static final long serialVersionUID = -5809782578272943999L;
    4. // ...
    5. }
    6. public class Test{
    7. private static void SerializaPerson() throws FileNotFoundException, IOException {
    8. Person person = new Person();
    9. // set...
    10. ObjectOutputStream oo = new ObjectOutputStream(new FileOuputStream(new File("D://xxx.txt")));
    11. oo.writeObject(person);
    12. }
    13. private static Person DeserializePerson() throws Exception, IOException {
    14. ObjectInputStream ois = new ObjectInputStream(new FileInputStream(new File("D://xxx.txt")));
    15. Person person = (Person) ois.readObject();
    16. return person;

    Java序列化和Hessian序列化的区别?Java序列化会把所有的信息,包含类的继承关系都序列化了,所以效率较低,而且字节流比较大。Hessian序列化着重于数据,附带简单的类型信息方法,比如Integer a = 1,简化为I 1这样的流。Hessian把所有的对象属性当成一个Map来序列化,包含了基本的类型描述和数据内容。如果遇到重复的对象,还会插入一个引用块来避免多次序列化和反序列,所以相当于提高了效率和节省了空间。
    Hessian2序列化(Dubbo的默认序列化器),参考https://www.cnblogs.com/yougewe/articles/10491667.html
    HSF在1.0的时候只支持Java和Hessian的序列化方式,2.0的时候增加了Hessian2, json, kyro的方式。
    其中,kyro的序列化方式效率最高但适配最差,而java原生序列化方式效率最低但适配最好;hession在性能上处于kyro以及java原生之间,效果还是不错的;

    1. Netty
      Netty是一个先进的NIO的解决方案,完善了java中的NIO并对其进行了优化。
      这有一个极简的采用Netty搭建的一个RPC服务的案例:https://www.v2ex.com/amp/t/447130

    HSF简单说明:
    image.png
    上面是一个更加清晰的图,描述了各个角色都能做什么。
    对Provider,可以声明和发布服务。
    对Consumer,主要是获取服务,在代理这里,不同的代理方式会产生差异。
    对配置中心,保存了很多的三元组,一方面接受Provider的注册服务,另一方面对消费者连接和心跳推送服务。
    除了上述的三个实体之外,还有另外一些参与的实体:

    1. 持久化配置,主要基于Diamond,存储HSF中使用到的治理规则,包括路由规则、归组规则、权重规则等,从而根据规则对调用过程的选址逻辑进行干预。
    2. 元数据存储中心,用于存储HSF服务对应的方法列表以及参数结构等信息,该部分不会对HSF服务的调用过程产生影响。
    3. HSFOPS,在线的进行上述的各类管理,而且可以在线进行测试。

    image.png
    HSF发布流程:
    一共三个阶段:1. 首先Spring容器初始化,这时候主要监听上下文的事件, 2. HSFSpringProviderBean初始化,这里主要是元数据的初始化和存储,3. 发布服务,这里主要调用RPC协议发布服务,然后ConfigServer注册服务发布元数据,最后存储元数据。

    1. Spring容器初始化时发生了什么?

    1)调用了setApplicationContext方法。
    HSFSpringProviderBean实现了ApplicationContextAware接口,意味着Spring容器初始化后,HSFSpringProviderBean中的setApplicationContext方法将被调用,同时获取Spring容器ApplicationContext。

    1. /* HSFSpringProviderBean.java */
    2. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    3. ...
    4. if (applicationContext instanceof AbstractApplicationContext){
    5. ...
    6. Method method = AbstractApplicationContext.class.getDeclareMethod("addListener",
    7. new Class<?>[] {ApplicationListener.class});
    8. method.setAccessible(true);
    9. // 注册HSFApplicationListener
    10. method.invoke(applicationContext, new Object[] {new HSFApplicationListener() });
    11. isInSpringCOntainer = true;
    12. }
    13. }

    2)监听Spring事件,触发onApplicationEvent

    1. /* HSFSpringProviderBean.java */
    2. private final class HSFApplicationListener implements ApplicationListener {
    3. @Override
    4. public void onApplicationEvent(ApplicationEvent event) {
    5. if (event instanceof ContextRefreshedEvent) {
    6. // 如果是刷新事件,发布服务,更新服务状态
    7. HSFSpringProviderBean.this.providerBean.publish();
    8. setAppInitedStatus();
    9. } else if (event instanceof ContextClosedEvent) {
    10. // 如果是关闭事件,关闭HSF服务器
    11. if (AppInfoUtils.appRunning.compareAndSet(true, false)) {
    12. providerBean.shutdownHSFServer();
    13. }
    14. }
    15. }
    16. }

    实例化HSFApiProviderBean,并且设置各个元数据:

    1. /* HSFSpringProviderBean.java */
    2. public final class HSFSpringProviderBean implements InitializingBean, ApplicationContextAware {
    3. ...
    4. private final HSFApiProviderBean providerBean = new HSFApiProviderBean();
    5. ...
    6. }

    其中构造函数里包含服务元数据ServiceMetadata的相关属性,包括版本号、组别、超时时间等

    1. /* HSFApiProviderBean.java */
    2. private final ServiceMetadata metadata = new ServiceMetadata(true);
    3. ...
    4. public HSFApiProviderBean(){
    5. metadata.setVersion("1.0.0");
    6. metadata.setGroup("HSF");
    7. // 默认客户端调用超时时间
    8. metadata.addProperty(TRConstraints.TIMEOUT_TYPE_KEY, "3000");
    9. // 默认客户端连接空闲超时时间
    10. metadata.addProperty(TRConstraints.IDLE_TIMEOUT_KEY, "10");
    11. // 序列化类型,默认Hessian
    12. metadata.addProperty(TRConstraints.SERIALIZE_TYPE_KEY, HSFContraints.HESSIAN_SERIALIZE);
    13. metadata.addProperty(HSFContraints.RPC_VERSION, "2.0");
    14. metadata.addProperty(HSFContraints.PREFER_SERIALIZIER, HSFServiceContainer.getInstance(ConfigurationService.class).getPreferedSerializer());
    15. }

    HSFSpringProviderBean实现了initialingBean,意味着Bean被初始化时,Spring设置完Bean所有属性后,将触发afterPropertiesSet方法

    1. /* HSFSpringProviderBean.java */
    2. public final class HSFSpringProviderBean implements InitializingBean, ApplicationContextAware {
    3. ...
    4. public void afterPropertiesSet() throws Exception {
    5. init();
    6. }
    7. }

    init里在做什么:

    1. /* HSFSpringProviderBean.java */
    2. public void init() throws Exception {
    3. // 避免多次初始化
    4. if (!providerBean.getInited().compareAndSet(false, true)) {return;}
    5. // 初始化HSF日志
    6. LoggerInit.initHSFLog();
    7. // 初始化AppName
    8. AppInfoUtils.initAppNames(providerBean.getMetadata());
    9. // 初始化Spas
    10. SpasInit.initSpas();
    11. // 检查配置,初始化服务名称
    12. providerBean.checkConfig();
    13. // 发布服务,如果providerBean不在Spring容器中,则发布服务
    14. publishIfNotInSpringContainer();
    15. }
    16. private void publishIfNotInSpringContainer(){
    17. if(!isInSpringContainer){
    18. LOGGER.warn("[SpringProviderBean]不是在Spring容器中创建,不推荐使用");
    19. providerBean.publish();
    20. }
    21. }
    1. 发布服务

    HSF服务发布入口:HSFApiProviderBean.publish()

    /* HSFApiProviderBean.java */
    public void publish(){
        if (!isPublished.compareAndSet(false, true)) {return;}
        ...
        // 获取ProcessService实例,将服务元数据作为参数,调用publish
        HSFServiceContainer.getInstance(ProcessService.class).publish(metadata);
    }
    

    根据metadata获取RPC协议,调用响应的RPC服务完成服务发布:

    /* ProcessComponent.java */
    
    @Override
    public void publish(ServiceMetadata metadata) throws HSFException {
        ...
        // 根据相应RPC服务实现完成服务发布
        rpcProtocolService.registerProvider(rpcProtocolType, metadata);
        ...
        hookService.prePublish(metadata);
        ...
        if (!metadata.isDelayedPublish()){
            // 向ConfigService注册服务,发布元数据
            metadataService.publish(metadata);
        }
        ...
        hookService.afterPublish(metadata);
        // 存储元数据
        metadataInfoStoreService.store(metadata);
    }
    

    根据相应RPC服务实现完成服务发布:

    /* RPCprotocolTemplateComponent.java */
    @Override
    public void registerProvider(String protocol, ServiceMetadata metadata) throws HSFException {
        // 调用RemotingRPCProtocolComponent.registerProvider()
        rpcService.registerProvider(metadata);
    }
    

    调用RemotingRPCProtocolComponent.registerProvider():

    /* RemotingRPCProtocolComponent.java */
    
    public void registerProvider(ServiceMetadata metadata) throws HSFException {
        // 启动HSF Server
        if (isProviderStarted.compareAndSet(false, true)){
            providerServer.startHSFServer();
            // server.start(configService.getHSFServerPort())
            ...
        }
        // 分配线程池
        int corePoolSize = metadata.getCorePoolSize();
        int maxPoolSize  = metadata.getMaxPoolSize();
        if (corePoolSize > 0 && maxPoolSize > 0 && maxPoolSize >= corePoolSize) {
            providerServer.allocThreadPool(metadata.getUniqueName(), corePoolSize, maxPoolSize);
        }
    
        // 注册对象到HSF Server
        providerServer.addMetadata(metadata.getUniqueName(), metadata, metadata.getTarget());
    }
    

    最后NettyService.start(),本地开始监听端口,处理请求:

    /* NettyServer.java */
    public void start(int listenPort) throws Exception {
        ServerBootstrap bootstrap = new ServerBootstrap();
        ...
        long tryBind = 3;
        while (tryBind > 0) {
            ChannelFuture cf = bootstrap.bind(new InetSocketAddress(bindHost, listenPort));
            cf.await();
            if (cf.isSuccess()) {
                LOGGER.warn("Server started, listen at: " + listenPort);
                retunr;
            } else {
                tryBind --;
            }
        }
    }
    

    服务发布后,服务端如何处理远程调用:
    发布服务后,Server(HSF Provider)已经启动,并监听相应的端口等待Client(HSF Consumer)连接。
    当新请求来的时候,会出发NettyServerHandler的messageReceived()方法:

    /* NettyServerHandler.java */
    public void messageReceived(final ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        ...
        handleRequest(ctx, message);
    }
    
    private void handleRequest(final ChannelHandlerContext ctx, final Object message) {
        final Connection connect = new NettyConnection(ctx.getChannel());
        if (message instanceof List) {
            List<BaseRequest> messages = (List<BaseRequest>) message;
            for (Object messageObject : messages) {
                // 处理单一请求
                processOneRequest(messageObject, connection);
            }
        } else {
            processOneRequest(message, connection);
        }
    }
    
    private void processOneRequest(final Object message, final Connection connection){
        BaseRequest request = (BaseRequest) message;
        ServerHandler<BaseRequest> serverhandler = (ServerHandler<BaseRequest>) request.getServerHandler();
    
        if (serverhandler.getExecutor(request) == null) {
            BaseResponse responseWrapper = serverhandle.handleRequest(request, connection);
            connection.writeResponseToChannel(responseWrapper);
        } else {
            // 启动线程,处理请求
            serverhandler.getExecutor(request).execute(new HandlerRunnable(connection, request, serverhandler));
        }
        RemotingRuntimeInfoHolder.getInstance().increaseCountProcessRequests();
    }
    
    private static class HandlerRunnable implements Runnable {
        ...
        public void run() {
            ...
            // 调用RPCServerHandler.handleRequest根据相应RPC协议处理请求
            BaseResponse responseWrapper = serverHandler.handleRequest(message, connection);
            ...
            connection.writeResponseToChannel(responseWrapper);
        }
    }
    

    调用RPCServerHandler.handleRequest根据相应RPC协议处理请求:

    /* ProviderProcessor.java */
    public void handleRequest(final HSFRequest hsfRequest, final ServerOutput outputStream) {
        final String serviceUniqueName = hsfRequest.getTargetServiceUniqueName();
        final String methodName = hsfRequest.getMethodName();
        final Connection connection = outputStream.getConnection();
        final String clientIp = connection.getPeerIp();
    
        ...
    
        ProviderServiceModel psm = ApplicationModel.instance().getProvidedServiceModel(serviceUniqueName);
    
        HSFResponse hsfResponse = null;
    
        if (hsfResponse == null) {
            // 序列化协议
            final byte protocol = hsfRequest.getSerializeType();
            if (TRConstraints.TOP_SERIALIZE == protocol){
                hsfResponse = handleTopRequest(hsfRequest, serviceUniqueName, methodName, outputStream);
            } else {
                hsfResponse = handleRequest0(hsfRequest, outputStream, null, null);
            }
            ThreadLocalUtil.remove();
            if (hsfResponse != null) {
                handleEagleEyeResponseSend(hsfResponse);
                int responseSize = outputStream.writeHSFResponse(hsfResponse);
                logService.responseSize(responseSize);
            }
        }
    }
    
    private HSFResponse handleRequest0(final HSFRequest hsfRequest, final ServerOutput output, 
                                       final Object[] originalArgs, final String[] originalArgTypes) {
        final String serviceUniqueName = hsfRequest.getTargetServiceUniqueName();
        final ProviderServiceModel serviceModel = applicationModel.getProvidedServiceModel(serviceUniqueName);
        final HSFResponse hsfResponse = new HSFResponse();
        final String remoteHost = output.getConnection().getPeerIp();
    
        ...
    
        if (null == serviceModel){
            hsfResponse.setErrorMsg("[HSF-Provider] 未找到业务服务,服务名称:" + serviceUniqueName);
            return hsfResponse;
        }
        ...
        ClassLoader tcl = Thread.currentThread().getContextClassLoader();
        ProviderMethodModel methodModel = null;
        // 获取方法名
        String methodName = hsfRequest.getMethodName();
        ...
        if (needInvoke) {
            // 获得方法参数
            Object[] methodArgs = hsfRequest.getMethodArgs();
            // 调用业务应用时,切换为应用Class Loader
            Thread.currentThread().setContextClassLoader(serviceModel.getMetadata().getServicePojoClassLoader());
            // 通过动态代理得到结果
            Object appResp = workerMethod.invoke(serviceModel.getServiceInstance(), methodArgs);
            ...
            hsfResponse.setAppResponse(appResp);
            retobj = appResp;
        }
    }
    

    返回结果为:

    /* NettyConnection.java */
    public void writeResponseToChannel(final BaseResponse response) {
        if (response != null) {
            ChannelFuture wf = channel.writeAndFlush(response);
            wf.addListener(new ChannelFutureListener() {
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()){
                        LOGGER.error();
                    }
                    if(!channel.isActive()) {
                        channel.close();
                    }
                }
            });
        }
    }