一、引言
批量推理与单例推理唯一的区别就是发送时guest端发送多条数据、host处理时逐条处理,所以本文中我们以单例推理为例,分析在单例推理的过程中,fate-serving的各个子模块是如何相互协作的。
另外需要说明的是,对于fate-servinig-server模块,它既有作为guest端请求推理的功能,也有作为host端进行推理的能力,我们探究的主要时guest端与host端如何进行协作完成一次推理。
二、admin端如何请求到server端
2.1 前端请求推理流程
在部署完fate-serving在线推理模块后,我们通过浏览器抓包工具捕获推理过程中发送的请求
通过第二张图我们可以看到,前端通过post方式请求了 /api/validate/infernce
接口
我们在 fate-serving-admin
的controller层中找到对应的接口
我们发现这里使用了restful接口的方式,把 inference 这个值存到了callname中去,其他参数存到了params的map中
分析:
- 创建了一个BaseContext的实例,并存储了callName
创建了一个BaseContext的实例,这个BaseContext继承了Context接口,用于存储一些在线推理的基本信息,这里context通过setActionType和setBody设置了推理的类型,inference代表的就是单例推理
- 创建了InboundPackage实例,并存储params
创建了InboundPackage实例,这个inboundPackage可以理解为向外发送的请求的一个封装,而与之对应的就是OutboundPackage,用于存储返回信息的一个封装
- 调用validateServiceProvider的service方法,并获取返回结果OutboundPackage的实例
通过阅读源码可以看出,这个validateServiceProvider.service方法是发送请求的过程,中间可能会对数据进行一些处理,而它的参数包括了一个context上下文和inboundPackage请求参数
- 判断返回结果并转发给前端
通过以上的代码分析,我们可以看出validateServiceProvider.service是核心,我们点开这个类具体看看他都有什么
2.2 ……ServiceProvider的工作原理
总览fate-serving的目录结构,我们可以发现里面有各种各样的……ServiceProvider,我们接着上文通过一个validateServiceProvider来理解这些……ServiceProvide是如何工作的
2.1.1 ValidateServiceProvider
我们首先找到ValidateServiceProvider的位置
我们分析一下它的源码的核心部分:
- 类的声明:
这个类上有一个 @FateService 注解,并且它继承了AbstractAdminServiceProvider
类
- 方法声明
其中有两个核心方法,有 @FateServiceMethod 注解
我们可以注意到之前在外面调用到的validateServiceProvider.service方法并没有在这里有对应的service方法,所以service方法一定是在它继承过来的,而且还可以发现,这两个方法的参数类型与service方法相同,所以这两个方法很有可能在service中被调用了。
继续在它的父类 AbstractAdminServiceProvider
上找
2.1.2 AbstractServiceAdaptor
我们可以看到在, AbstractAdminServiceProvider
类上仅仅重写了一个 doService
源码分析:
- 获取了一个methodMap
AbstractAdminServiceProvider
类上没有getMethodMap方法,说明这个方法还是被继承过来的
- 通过context.getActionType 获取了actionType
我们在之前曾经也创建了一个BaseContext实例,并且通过setActionType设置了一个 “inference”
- 通过actionType获取了一个方法,并且通过invoke执行了这个方法
这个doService方法只是获取了一个方法并执行,我们需要继续向父类 AbstractServiceAdaptor
去寻找
2.1.3 AbstractServiceAdaptor
我们最终在 AbstractServiceAdaptor
找到了这个service方法
service方法非常长,但是大概只是分为了两个部分
- 调用preChain的dePreProcess方法,调用doService方法,调用postChain的doPostProcess方法
- 处理异常,打日志
我们主要关心的是第一步,那么通过第 2.1.2 章,我们知道了doService只是根据actionType调用了一个方法,但是我们发现这里只是声明了一个methodMap,所以我们要找到这个map里的方法都是哪里来的
此时我们注意到在 fate-serving-admin 的目录下还有一个叫做 FateServiceRegister
的类
2.1.4 FateServiceRegister
这个类一共实现了三个接口 ServiceRegister, ApplicationContextAware, ApplicationListener
其中:
- ApplicationContextAware
- 用于获取applicationContext
- ApplicationListener
- 通过设置内置事件,这里是 ApplicationReadyEvent ,在事件触发后调用 onApplicationEvent 方法
在这个类中获取了applicationContext重写了 onApplicationEvent 方法
我们先看一下 2.1.1 的ValidateServiceProvider
方便做对照
总结:
类上有@FateService注解
方法上有@FateServiceMethod注解
源码分析:
- 通过applicationContext获取了所有类型为
AbstractServiceAdaptor
的类的名字
AbstractServiceAdaptor也就是我们之前看到的ValidateServiceProvider
的父类的父类,在 fate-serving-admin 中,只有 ValidateServiceProvider
上有一个@service注解,被注册到了容器中,所以相当于获取到了 ValidateServiceProvider 类的名称
- 在for循环中根据第一步获取的名称和AbstractServiceAdaptor.class获取到了具体一个Bean,这个bean就是
ValidateServiceProvider
的一个实例
- 获取这个这个Bean的 @FateService 注解
- 获取这个Bean所有的方法,如果方法上是否有 @FateServiceMethod 注解,就将方法放到Bean的methodMap中去,map的key就是注解上的name,也就对应了context中的actionType
- 根据第三步获取的注解上的name,给获取到的bean设置serviceName服务名称,并且根据注解上的preChain预处理拦截器名称和postChain请求拦截器名称设置拦截器
看到这里也就解决了之前的方法从哪里来的问题,顺便也看到了拦截器从哪里来。
2.15 总结!!!
调用流程:
- preChain上的拦截方法
- 具体的某个方法
- postChain的拦截方法
- 在controller中我们用到了一个validateServiceProvider.service方法,其实这个方法最终就是调用的
ValidateServiceProvider
类中有@FateServiceMethod 的方法,context的actionType与哪个 @FateServiceMethod 注解的name相同,最终就会调用哪个方法。 - 在调用这个方法之前,调用了 @FateServiceMethod 注解preChain上所有拦截器的doPreProcess方法。
- 在调用这个方法之后,调用了 @FateServiceMethod 注解postChain上所有拦截器的doPostProcess方法。
最后要注意一下,…….ServiceProvider也可以直接重写doService方法,这样就会覆盖掉父类用于选择Method执行的过程直接执行重写的doService方法。
2.3 admin模块请求guest端server模块
通过2.2章明白了,单例推理最终调用的就是ValidateServiceProvider
的inference方法。
我们具体看一下这个方法都做了什么事情
接下来的内容使用到了grpc框架,如果对grpc不了解,需要先查看相关的资料,本文不做过多赘述,直接使用
inference方法代码解析:
- 获取serving端的ip和port
这个ip和port是在前端传过来的,也在之前提到的preChain中验证过存在,这里直接使用,不探究具体如何获取的,只需要知道这个ip和port是serving端的ip和port
- 检查参数
这里对前端传过来的参数进行检查,确定不为空(如果做前端的二次开发,自己定义传递的参数,这个地方也需要增加相应参数的检查)
- 将参数封装到 InferenceRequest 的实例中去
- grpc通过传入ip和port,调用在serving端声明的方法
这里的 InferenceServiceProto.InferenceMessage.Builder对应的就是.proto文件中的InferenceMessage
我们可以看到,这里一共定义了两个服务,我们调用的是其中的 inference 服务。
inference的参数和返回值都是InferenceMessage,我们把第三步的的inferenceRequest实例通过谷歌的ByteString转化成grpc数据传输时所用的bytes(java的btye与grpc的byte不是一个东西,必须要经过转化),在grpc提供服务端,相应的也需要将这个byte转化成原来的inferenceRequest对象,这个过程在第3.3.1章的GuestSingleInferenceProvider的preChain的其中一个拦截器有具体实现。
总结:admin端调用了grpc的一个方法,参数为前端传入的所有参数,如果不关注grpc注册过程,跳到 3.3章
三、server模块如何进行单例推理
3.1 准备工作
我们首先进入 server 端的启动器。
源码分析:
- parseConfig!!!
将resource目录下的serving-sever.properties文件解析到MetaInfo中去,MetaInfo是一个存储全局变量的类,用于自定义一些参数
- 该类实现了InitializingBean方法,用于在bean被注册之后通过执行被重写的afterPropertiesSet方法初始化Bean
在这个方法中,继承了SeverBuilder的FateServerBuilder类通过addService方法注册了几个grpc服务,监听的端口在配置文件中配置
由于在线推理的过程是从guest端到host端再到guest端的,我们在这里看一下GuestInferenceService具体如何实现服务。
3.3 GuestInferenceService对grpc接口实现
- 在GuestInferenceService中,分别为单例预测和批量预测自动装配了 …….Provider
- 之后在单例预测Inference方法中调用了GuestSingleInferenceProvider的方法,并将结果返回
这个inference方法实际上就是当初在admin端调用的inference方法,InferenceMessage为在 .proto 文件中定义的类型
此时InferenceMessage就是在admin端定义的InferenceRequest的二进制形式,所以后续肯定会有操作将他转换成普通的InferenceRequest对象
(方法上的@RegisterService 用于将这个方法注册进注册中心,3.2 中 SevingServer
初始化最下面的代码判断配置文件中是否使用注册中心,并注册中心Zookeeper中去,Zookeeper如何实现注册不是本次讨论的重点,暂时不深入下去)
我们继续点开 GuestSingleInferenceProvider
查看是如何实现推理的
3.4 GuestSingleInferenceProvider
如果对 ……Provider 工作原理不清楚,请回到 2.2 章回顾
GuestSingelInferenceProvider会先通过拦截器对数据进行预处理并放在context中
再执行doService方法,继续进行预测流程
3.4.1 拦截器预处理
这里首先经过了4个拦截器:
在经历拦截器前
context:是一个全新的 ServingServerContext
实例
inboundPackage:它的body是InferenceRequest的byte形式
- RequestOverloadBreaker(用于做请求过载的熔断)
这个类只是为了安全检测,此处不进行展开
context:是一个全新的 ServingServerContext
实例
inboundPackage:它的body是InferenceRequest的byte形式
- GuestSingleParamInterceptor(用于解析byte形式的InferenceRequest实例,解析后进行参数检查)
首先先将byte形式的InferenceRequest通过JsonUtil转换成InferenceRequest实例
之后给将inboundPackage的body设置为InferenceRequest的实例,这个实例包含了我们在admin端传过来的所有参数
接下来对这些参数进行了非空检查
最后给context设置了一个随机的CaseId,并将参数中的serviceId保存到其中
context:
- CaseId:一个随机的UUID
- serviceId:admin请求的serviceId
inboundPackage:InferenceRequest实例,包括了admin传来的全部数据
- GuestModelInterceptor(根据serviceId获取模型)
首先通过modelManager.getModelByServiceId获取了一个模型
然后然后做了一个流量控制
context:
- CaseId:一个随机的UUID
- serviceId:admin请求的serviceId
- model:根据serviceId获取的模型
inboundPackage:InferenceRequest实例,包括了admin传来的全部数据
- FederationRouterInterceptor(用于获取请求的路由信息使用zookeeper就上zookeeper里找,没使用就在配置中找,最终找到的是proxy的路由信息)
context:
- CaseId:一个随机的UUID
- serviceId:admin请求的serviceId
- model:根据serviceId获取的模型
- routerInfo:proxy端的路由信息
inboundPackage:InferenceRequest实例,包括了admin传来的全部数据
3.4.2 doSerivce
此处的核心有两个地方
- 向多个host端发起预测请求,并返回结果
- guest端进行本地预测,并将远程的结果与本地结果进行合并(将在后续章节3.8具体展开)
这里的本地预测不是等待host端的请求返回之后再开始的,而是在向host端发出请求之后就开始了,这里使用了future设计模式,保证了预测的同步进行,不会浪费时间
我们先进入图中 “1” 所标识的代码部分
3.4.3 向host端发起预测请求

这里的代码逻辑总体可以分为三步:
- 判断本地缓存中有没有要预测的,有的话直接拿
- 向host端发起请求获取一个 ListenableFuture对象
- 根据第二步的对象,返回一个AbstractFuture对象
第二步如何向host端发起请求是我们要关注的核心
第三步返回的对象,我们此时就默认它已经拿到了host端的结果,并且将在后面用于合并
我们此处对第二步进行展开,我们来看看这个async到底做了什么
第一行代码,获取了要发给host端用于grpc传输的Packet对象
接下来就是通过grpc,向host端发送请求了
(在此声明:这里是最终结果发给host端,实际上这个请求是发给guest端的proxy,再转发给host端的proxy,最终再转发给host端的proxy,在每个proxy端和host的server端,都有对DataTransferService服务的unaryCall接口的具体实现,此处省略掉proxy端的转发,直接进入host端的server)
我们知道此时请求已经向host端发出了,且host端的server已经接到这个这个请求了,在进入host端前,我们还需要看看这个Packet究竟传输了什么东西
我们点开第一行代码的build函数
Packet包括了:模型信息、guest端partyId、host端partId、要调用的方法名称(制定了这个请求是预测请求)、version、serviceId、applyId、CaseId、host端表明和命名空间,以及认证的信息(认证信息在proxy端进行加入和鉴权,此处是空的,且认证默认在配置文件中是不开启的)
3.5 guest端server模块总结
- 对admin模块传来的数据进行解析,并获取本地模型和远程模型信息
- 向host端发起预测请求
- 在本地进行预测,并将host端传来的结果与本地的结果合并
- 将最终的结果返回给admin模块
3.6 HostInferenceService对grpc接口实现
对于host端的server模块也有与 3.2章相同的grpc注册过程,我们直接看被注册的服务是如何实现的
guest端的server向host端的server发起了一个叫unaryCall的grpc请求,那么host端应该有这个unaryCall的具体实现。
- 自动装配了…Provider,用于提供预测服务
- 对传过来的数据进行处理
此处在context中放入了namespace,tableName,actionType(跑哪个方法,这里是单例预测),版本信息,CaseId
- 根据actionType获取结果,并将结果返回
由于我们请求的是单例预测,所以会进入第一个case,并且通过hostSingleInferenceProvider.service方法进行预测
3.7 HostSingleInferenceProvider
Provider工作流程:
- 跑拦截器
- 跑doService
3.7.1 拦截器预处理
这里首先经过了4个拦截器:
在经历拦截器前
context:是一个 ServingServerContext
实例
- CaseId:guest端传来的
- tableName:表名
- nameSpace:表的命名空间
- version:版本信息
- actionType:跑哪个方法
inboundPackage:它的body是InferenceRequest的byte形式,包括了 sendToRemoteFeatureData
- sendToRemoteFeatureData 用于根据指定的id或自定义的参数获取数据,自定义要改源码
- RequestOverloadBreaker(用于做请求过载的熔断)
这个类只是为了安全检测,此处不进行展开
context:
- CaseId:guest端传来的
- tableName:表名
- nameSpace:表的命名空间
- version:版本信息
- actionType:跑哪个方法
inboundPackage:它的body是InferenceRequest的byte形式,包括了 sendToRemoteFeatureData(byte)
- HostParamInterceptor(用于解析byte形式的InferenceRequest实例,解析后进行参数检查)
这里根据请求类型走的是else的代码块
首先先将byte形式的InferenceRequest通过JsonUtil转换成InferenceRequest实例
之后给将inboundPackage的body设置为InferenceRequest的实例,这个实例包含sendToRemoteFeatureData
接下来对这些参数进行了非空检查
context:
- CaseId:guest端传来的
- tableName:表名
- nameSpace:表的命名空间
- version:版本信息
- actionType:跑哪个方法
inboundPackage:InferenceRequest实例,包括了 sendToRemoteFeatureData
- HostModelInterceptor
首先通过guest传来的tablename和nameSpace
然后然后做了一个流量控制
context:
- CaseId:guest端传来的
- tableName:表名
- nameSpace:表的命名空间
- version:版本信息
- actionType:跑哪个方法
- model:用于推理的模型
inboundPackage:InferenceRequest实例,包括了 sendToRemoteFeatureData
- HostSingleFeatureAdaptorInterceptor
这个拦截器实现了一个InitializingBean,会在初始化的时候执行afterPropertiesSet方法
这里首先会获取一个Adaptor用于处理sendtoRemoteFeatureData中的参数,来获取用于预测的数据源
Adpator如何编写见:
https://www.yuque.com/docs/share/e221cf05-7017-499f-9609-b2eadafd6147?# 《FateServing Adapter开发》
然后执行doPreProcess方法进行拦截
Adptor获取了数据源,数据源被放进inferenceRequest中
context:
- CaseId:guest端传来的
- tableName:表名
- nameSpace:表的命名空间
- version:版本信息
- actionType:跑哪个方法
inboundPackage:InferenceRequest实例,包括了 sendToRemoteFeatureData、获取的数据源
3.7.2 执行federatedInference进行预测
这里HostSingleInferenceProvider由于没有对doService方法进行重写,所以会根据context中的ActionType选择执行的函数,我们进行的是单例预测,所以actionType为federatedInference
最终进入hostInference进行预测
3.8 双方预测过程与结果合并
上文 3.4.2 章所用到的guestInference与 3.7.2 章的hostInference最终都会在同一个类中处理
3.8.1 host端预测
host端执行hostInference方法
里面只执行一个本地预测singleLocalPredict方法,并将预测的结果返回给guest端
3.8.2 guest端预测
guest端执行了guestInference
首先进行本地预测singleLocalPredict
- 然后将多个host端的结果存进remoteResultMap中(通过future实现guest端与host端同步预测)
- 执行singleMerge将本地预测的结果与host端传来的结果进行合并
3.8.3 本地预测singleLocalPredict
这里首先获取了算法组件,然后通过每个算法组件节点以上一个组件的结果为参数进行处理
最终经过所有的组件后得到最终的结果