- Guest端如何获取模型
Package:com.webank.ai.fate.serving.guest.interceptors
Class:GuestModelInterceptor
Line:47
- guset端请求host端预测结果
Package:com.webank.ai.fate.serving.rpc
Class:DefaultFederatedRpcInvoker
function1:Line146: singleInferenceRpcWithCache
function2:Line170:async
function3:Line65:build
description:
这里实际上是从GuestSingleInferenceProvider点进来的,function1首先查看缓存中是否有要查询的数据,没有才会想host端发起推理请求。function1中首先调用function2,function2是对host端发起grpc预测请求的一个封装,function2会调用function3对数据封装成进行数据转发的Package(这个Package是在proto中定义的,这里没有发现有任何加密的过程,说明数据在传输过程中仅仅是以byte形式的明文传递的),最终请求向host端,
(此处实际的请求应该是向proxy端,我们此处忽略了中间的转发过程,proto文件为proxy.proto下的unaryCall服务,其中包括了模型信息、guest端partyId、host端partId、要调用的方法名称、version、serviceId、applyId、CaseId、host端表明和命名空间….)
最终host端的server与proxy端一样,会有一个unaryCall方法来接收通过proxy转发的方法
- guest端的数据预测与合并
Package:com.webank.ai.fate.serving.federatedml
Class:PipelineModelProcessor
Line:132
description:
这里是guest端和host进行预测的一个类,132行是guest端进行预测的地方,首先它进行了本地的预测,然后获取了host端预测的结果,host端预测结果是以future模式呈现的,这样就做到了host端与guest端的同步训练(future设计模式),在所有结果都获取之后,会将这些结果通过singleMerge做一个合并,这时的结果就已经是最终的结果了
下面是function3的全部代码,包括了数据是如何被存储的,返回值是 Proxy服务下的Package数据格式
private Proxy.Packet build(Context context, RpcDataWraper rpcDataWraper) {Model model = ((ServingServerContext) context).getModel();Preconditions.checkArgument(model != null);Proxy.Packet.Builder packetBuilder = Proxy.Packet.newBuilder();packetBuilder.setBody(Proxy.Data.newBuilder().setValue(ByteString.copyFrom(JsonUtil.object2Json(rpcDataWraper.getData()).getBytes())).build());Proxy.Metadata.Builder metaDataBuilder = Proxy.Metadata.newBuilder();Proxy.Topic.Builder topicBuilder = Proxy.Topic.newBuilder();metaDataBuilder.setSrc(topicBuilder.setPartyId(String.valueOf(model.getPartId())).setRole(MetaInfo.PROPERTY_SERVICE_ROLE_NAME).setName(Dict.PARTNER_PARTY_NAME).build());metaDataBuilder.setDst(topicBuilder.setPartyId(String.valueOf(rpcDataWraper.getHostModel().getPartId())).setRole(MetaInfo.PROPERTY_SERVICE_ROLE_NAME).setName(Dict.PARTY_NAME).build());metaDataBuilder.setCommand(Proxy.Command.newBuilder().setName(rpcDataWraper.getRemoteMethodName()).build());String version = Long.toString(MetaInfo.CURRENT_VERSION);metaDataBuilder.setOperator(version);Proxy.Task.Builder taskBuilder = com.webank.ai.fate.api.networking.proxy.Proxy.Task.newBuilder();Proxy.Model.Builder modelBuilder = Proxy.Model.newBuilder();modelBuilder.setNamespace(rpcDataWraper.getHostModel().getNamespace());modelBuilder.setTableName(rpcDataWraper.getHostModel().getTableName());taskBuilder.setModel(modelBuilder.build());metaDataBuilder.setTask(taskBuilder.build());packetBuilder.setHeader(metaDataBuilder.build());Proxy.AuthInfo.Builder authBuilder = Proxy.AuthInfo.newBuilder();if (context.getCaseId() != null) {authBuilder.setNonce(context.getCaseId());}if (version != null) {authBuilder.setVersion(version);}if (context.getServiceId() != null) {authBuilder.setServiceId(context.getServiceId());}if (context.getApplyId() != null) {authBuilder.setApplyId(context.getApplyId());}packetBuilder.setAuth(authBuilder.build());return packetBuilder.build();}
另外附上proxy.proto文件
/** Copyright 2019 The FATE Authors. All Rights Reserved.** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/syntax = "proto3";import "basic-meta.proto";package com.webank.ai.fate.api.networking.proxy;// metadata of eventmessage Model {string namespace = 1;string tableName = 2;}// metadata of taskmessage Task {string taskId = 1;Model model = 2;}message Topic {string name = 1;string partyId = 2;string role = 3;com.webank.ai.fate.api.core.Endpoint callback = 4; // implication of pub/sub model, necessary for http-based senario}// task admin commandmessage Command {string name = 1;}message Conf {int64 overallTimeout = 1; // total timeout, in msint64 completionWaitTimeout = 2; // timeout for waiting for complete, in msint64 packetIntervalTimeout = 3; // timeout for packet interval, in msint32 maxRetries = 4;}// metadata used for network data transfermessage Metadata {Task task = 1; // task descriptionTopic src = 2; // source topicTopic dst = 3; // destincation topicCommand command = 4; // task managing command (if any)string operator = 5; // model operatorint64 version = 6; // 接口版本int64 ack = 7; // stream ack (reserved)Conf conf = 8; // operation config}// includes key and value field, supporting sequential and random data transfermessage Data {string key = 1; // compatible with list / dictbytes value = 2; // actual value}// authentication infomessage AuthInfo {int64 timestamp = 1; // timestamp of request, millisecondstring appKey = 2;string signature = 3; // signaturestring applyId = 4;string nonce = 5;string version = 6;string serviceId = 7;}// data streaming packetmessage Packet {Metadata header = 1; // packet headerData body = 2; // packet bodyAuthInfo auth = 3; // authentication info, optional}// data transfer serviceservice DataTransferService {rpc unaryCall (Packet) returns (Packet);}
- proxy收到数据后添加认证信息
Package:com.webank.ai.fate.serving.proxy.rpc.services
Class:UnaryCallService
Line:69
Description:
这里只做了一件事,就是增加一个Auth信息,会判断这个消息是哪来的,如果是partyId相同可以判断是同一端的,这时候就添加,不同端只需要验证
- proxy验证Auth信息
Package:com.webank.ai.fate.serving.proxy.security
Class:DefaultAuthentication
Line:49
Description:
判断Auth信息
