- Guest端如何获取模型
Package:com.webank.ai.fate.serving.guest.interceptors
Class:GuestModelInterceptor
Line:47
- guset端请求host端预测结果
Package:com.webank.ai.fate.serving.rpc
Class:DefaultFederatedRpcInvoker
function1:Line146: singleInferenceRpcWithCache
function2:Line170:async
function3:Line65:build
description:
这里实际上是从GuestSingleInferenceProvider点进来的,function1首先查看缓存中是否有要查询的数据,没有才会想host端发起推理请求。function1中首先调用function2,function2是对host端发起grpc预测请求的一个封装,function2会调用function3对数据封装成进行数据转发的Package(这个Package是在proto中定义的,这里没有发现有任何加密的过程,说明数据在传输过程中仅仅是以byte形式的明文传递的),最终请求向host端,
(此处实际的请求应该是向proxy端,我们此处忽略了中间的转发过程,proto文件为proxy.proto下的unaryCall服务,其中包括了模型信息、guest端partyId、host端partId、要调用的方法名称、version、serviceId、applyId、CaseId、host端表明和命名空间….)
最终host端的server与proxy端一样,会有一个unaryCall方法来接收通过proxy转发的方法
- guest端的数据预测与合并
Package:com.webank.ai.fate.serving.federatedml
Class:PipelineModelProcessor
Line:132
description:
这里是guest端和host进行预测的一个类,132行是guest端进行预测的地方,首先它进行了本地的预测,然后获取了host端预测的结果,host端预测结果是以future模式呈现的,这样就做到了host端与guest端的同步训练(future设计模式),在所有结果都获取之后,会将这些结果通过singleMerge做一个合并,这时的结果就已经是最终的结果了
下面是function3的全部代码,包括了数据是如何被存储的,返回值是 Proxy服务下的Package数据格式
private Proxy.Packet build(Context context, RpcDataWraper rpcDataWraper) {
Model model = ((ServingServerContext) context).getModel();
Preconditions.checkArgument(model != null);
Proxy.Packet.Builder packetBuilder = Proxy.Packet.newBuilder();
packetBuilder.setBody(Proxy.Data.newBuilder().setValue(ByteString.copyFrom(JsonUtil.object2Json(rpcDataWraper.getData()).getBytes())).build());
Proxy.Metadata.Builder metaDataBuilder = Proxy.Metadata.newBuilder();
Proxy.Topic.Builder topicBuilder = Proxy.Topic.newBuilder();
metaDataBuilder.setSrc(topicBuilder.setPartyId(String.valueOf(model.getPartId())).setRole(MetaInfo.PROPERTY_SERVICE_ROLE_NAME).setName(Dict.PARTNER_PARTY_NAME).build());
metaDataBuilder.setDst(topicBuilder.setPartyId(String.valueOf(rpcDataWraper.getHostModel().getPartId())).setRole(MetaInfo.PROPERTY_SERVICE_ROLE_NAME).setName(Dict.PARTY_NAME).build());
metaDataBuilder.setCommand(Proxy.Command.newBuilder().setName(rpcDataWraper.getRemoteMethodName()).build());
String version = Long.toString(MetaInfo.CURRENT_VERSION);
metaDataBuilder.setOperator(version);
Proxy.Task.Builder taskBuilder = com.webank.ai.fate.api.networking.proxy.Proxy.Task.newBuilder();
Proxy.Model.Builder modelBuilder = Proxy.Model.newBuilder();
modelBuilder.setNamespace(rpcDataWraper.getHostModel().getNamespace());
modelBuilder.setTableName(rpcDataWraper.getHostModel().getTableName());
taskBuilder.setModel(modelBuilder.build());
metaDataBuilder.setTask(taskBuilder.build());
packetBuilder.setHeader(metaDataBuilder.build());
Proxy.AuthInfo.Builder authBuilder = Proxy.AuthInfo.newBuilder();
if (context.getCaseId() != null) {
authBuilder.setNonce(context.getCaseId());
}
if (version != null) {
authBuilder.setVersion(version);
}
if (context.getServiceId() != null) {
authBuilder.setServiceId(context.getServiceId());
}
if (context.getApplyId() != null) {
authBuilder.setApplyId(context.getApplyId());
}
packetBuilder.setAuth(authBuilder.build());
return packetBuilder.build();
}
另外附上proxy.proto文件
/*
* Copyright 2019 The FATE Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto3";
import "basic-meta.proto";
package com.webank.ai.fate.api.networking.proxy;
// metadata of event
message Model {
string namespace = 1;
string tableName = 2;
}
// metadata of task
message Task {
string taskId = 1;
Model model = 2;
}
message Topic {
string name = 1;
string partyId = 2;
string role = 3;
com.webank.ai.fate.api.core.Endpoint callback = 4; // implication of pub/sub model, necessary for http-based senario
}
// task admin command
message Command {
string name = 1;
}
message Conf {
int64 overallTimeout = 1; // total timeout, in ms
int64 completionWaitTimeout = 2; // timeout for waiting for complete, in ms
int64 packetIntervalTimeout = 3; // timeout for packet interval, in ms
int32 maxRetries = 4;
}
// metadata used for network data transfer
message Metadata {
Task task = 1; // task description
Topic src = 2; // source topic
Topic dst = 3; // destincation topic
Command command = 4; // task managing command (if any)
string operator = 5; // model operator
int64 version = 6; // 接口版本
int64 ack = 7; // stream ack (reserved)
Conf conf = 8; // operation config
}
// includes key and value field, supporting sequential and random data transfer
message Data {
string key = 1; // compatible with list / dict
bytes value = 2; // actual value
}
// authentication info
message AuthInfo {
int64 timestamp = 1; // timestamp of request, millisecond
string appKey = 2;
string signature = 3; // signature
string applyId = 4;
string nonce = 5;
string version = 6;
string serviceId = 7;
}
// data streaming packet
message Packet {
Metadata header = 1; // packet header
Data body = 2; // packet body
AuthInfo auth = 3; // authentication info, optional
}
// data transfer service
service DataTransferService {
rpc unaryCall (Packet) returns (Packet);
}
- proxy收到数据后添加认证信息
Package:com.webank.ai.fate.serving.proxy.rpc.services
Class:UnaryCallService
Line:69
Description:
这里只做了一件事,就是增加一个Auth信息,会判断这个消息是哪来的,如果是partyId相同可以判断是同一端的,这时候就添加,不同端只需要验证
- proxy验证Auth信息
Package:com.webank.ai.fate.serving.proxy.security
Class:DefaultAuthentication
Line:49
Description:
判断Auth信息