1. Guest端如何获取模型

    Package:com.webank.ai.fate.serving.guest.interceptors

    Class:GuestModelInterceptor

    Line:47

    1. guset端请求host端预测结果

    Package:com.webank.ai.fate.serving.rpc

    Class:DefaultFederatedRpcInvoker

    function1:Line146: singleInferenceRpcWithCache

    function2:Line170:async

    function3:Line65:build

    description:

    这里实际上是从GuestSingleInferenceProvider点进来的,function1首先查看缓存中是否有要查询的数据,没有才会想host端发起推理请求。function1中首先调用function2,function2是对host端发起grpc预测请求的一个封装,function2会调用function3对数据封装成进行数据转发的Package(这个Package是在proto中定义的,这里没有发现有任何加密的过程,说明数据在传输过程中仅仅是以byte形式的明文传递的),最终请求向host端,

    (此处实际的请求应该是向proxy端,我们此处忽略了中间的转发过程,proto文件为proxy.proto下的unaryCall服务,其中包括了模型信息、guest端partyId、host端partId、要调用的方法名称、version、serviceId、applyId、CaseId、host端表明和命名空间….)

    最终host端的server与proxy端一样,会有一个unaryCall方法来接收通过proxy转发的方法

    1. guest端的数据预测与合并

    Package:com.webank.ai.fate.serving.federatedml

    Class:PipelineModelProcessor

    Line:132

    description:

    1. 这里是guest端和host进行预测的一个类,132行是guest端进行预测的地方,首先它进行了本地的预测,然后获取了host端预测的结果,host端预测结果是以future模式呈现的,这样就做到了host端与guest端的同步训练(future设计模式),在所有结果都获取之后,会将这些结果通过singleMerge做一个合并,这时的结果就已经是最终的结果了

    下面是function3的全部代码,包括了数据是如何被存储的,返回值是 Proxy服务下的Package数据格式

    1. private Proxy.Packet build(Context context, RpcDataWraper rpcDataWraper) {
    2. Model model = ((ServingServerContext) context).getModel();
    3. Preconditions.checkArgument(model != null);
    4. Proxy.Packet.Builder packetBuilder = Proxy.Packet.newBuilder();
    5. packetBuilder.setBody(Proxy.Data.newBuilder().setValue(ByteString.copyFrom(JsonUtil.object2Json(rpcDataWraper.getData()).getBytes())).build());
    6. Proxy.Metadata.Builder metaDataBuilder = Proxy.Metadata.newBuilder();
    7. Proxy.Topic.Builder topicBuilder = Proxy.Topic.newBuilder();
    8. metaDataBuilder.setSrc(topicBuilder.setPartyId(String.valueOf(model.getPartId())).setRole(MetaInfo.PROPERTY_SERVICE_ROLE_NAME).setName(Dict.PARTNER_PARTY_NAME).build());
    9. metaDataBuilder.setDst(topicBuilder.setPartyId(String.valueOf(rpcDataWraper.getHostModel().getPartId())).setRole(MetaInfo.PROPERTY_SERVICE_ROLE_NAME).setName(Dict.PARTY_NAME).build());
    10. metaDataBuilder.setCommand(Proxy.Command.newBuilder().setName(rpcDataWraper.getRemoteMethodName()).build());
    11. String version = Long.toString(MetaInfo.CURRENT_VERSION);
    12. metaDataBuilder.setOperator(version);
    13. Proxy.Task.Builder taskBuilder = com.webank.ai.fate.api.networking.proxy.Proxy.Task.newBuilder();
    14. Proxy.Model.Builder modelBuilder = Proxy.Model.newBuilder();
    15. modelBuilder.setNamespace(rpcDataWraper.getHostModel().getNamespace());
    16. modelBuilder.setTableName(rpcDataWraper.getHostModel().getTableName());
    17. taskBuilder.setModel(modelBuilder.build());
    18. metaDataBuilder.setTask(taskBuilder.build());
    19. packetBuilder.setHeader(metaDataBuilder.build());
    20. Proxy.AuthInfo.Builder authBuilder = Proxy.AuthInfo.newBuilder();
    21. if (context.getCaseId() != null) {
    22. authBuilder.setNonce(context.getCaseId());
    23. }
    24. if (version != null) {
    25. authBuilder.setVersion(version);
    26. }
    27. if (context.getServiceId() != null) {
    28. authBuilder.setServiceId(context.getServiceId());
    29. }
    30. if (context.getApplyId() != null) {
    31. authBuilder.setApplyId(context.getApplyId());
    32. }
    33. packetBuilder.setAuth(authBuilder.build());
    34. return packetBuilder.build();
    35. }

    另外附上proxy.proto文件

    1. /*
    2. * Copyright 2019 The FATE Authors. All Rights Reserved.
    3. *
    4. * Licensed under the Apache License, Version 2.0 (the "License");
    5. * you may not use this file except in compliance with the License.
    6. * You may obtain a copy of the License at
    7. *
    8. * http://www.apache.org/licenses/LICENSE-2.0
    9. *
    10. * Unless required by applicable law or agreed to in writing, software
    11. * distributed under the License is distributed on an "AS IS" BASIS,
    12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13. * See the License for the specific language governing permissions and
    14. * limitations under the License.
    15. */
    16. syntax = "proto3";
    17. import "basic-meta.proto";
    18. package com.webank.ai.fate.api.networking.proxy;
    19. // metadata of event
    20. message Model {
    21. string namespace = 1;
    22. string tableName = 2;
    23. }
    24. // metadata of task
    25. message Task {
    26. string taskId = 1;
    27. Model model = 2;
    28. }
    29. message Topic {
    30. string name = 1;
    31. string partyId = 2;
    32. string role = 3;
    33. com.webank.ai.fate.api.core.Endpoint callback = 4; // implication of pub/sub model, necessary for http-based senario
    34. }
    35. // task admin command
    36. message Command {
    37. string name = 1;
    38. }
    39. message Conf {
    40. int64 overallTimeout = 1; // total timeout, in ms
    41. int64 completionWaitTimeout = 2; // timeout for waiting for complete, in ms
    42. int64 packetIntervalTimeout = 3; // timeout for packet interval, in ms
    43. int32 maxRetries = 4;
    44. }
    45. // metadata used for network data transfer
    46. message Metadata {
    47. Task task = 1; // task description
    48. Topic src = 2; // source topic
    49. Topic dst = 3; // destincation topic
    50. Command command = 4; // task managing command (if any)
    51. string operator = 5; // model operator
    52. int64 version = 6; // 接口版本
    53. int64 ack = 7; // stream ack (reserved)
    54. Conf conf = 8; // operation config
    55. }
    56. // includes key and value field, supporting sequential and random data transfer
    57. message Data {
    58. string key = 1; // compatible with list / dict
    59. bytes value = 2; // actual value
    60. }
    61. // authentication info
    62. message AuthInfo {
    63. int64 timestamp = 1; // timestamp of request, millisecond
    64. string appKey = 2;
    65. string signature = 3; // signature
    66. string applyId = 4;
    67. string nonce = 5;
    68. string version = 6;
    69. string serviceId = 7;
    70. }
    71. // data streaming packet
    72. message Packet {
    73. Metadata header = 1; // packet header
    74. Data body = 2; // packet body
    75. AuthInfo auth = 3; // authentication info, optional
    76. }
    77. // data transfer service
    78. service DataTransferService {
    79. rpc unaryCall (Packet) returns (Packet);
    80. }
    1. proxy收到数据后添加认证信息

    Package:com.webank.ai.fate.serving.proxy.rpc.services
    Class:UnaryCallService
    Line:69
    Description:
    这里只做了一件事,就是增加一个Auth信息,会判断这个消息是哪来的,如果是partyId相同可以判断是同一端的,这时候就添加,不同端只需要验证

    1. proxy验证Auth信息

    Package:com.webank.ai.fate.serving.proxy.security
    Class:DefaultAuthentication
    Line:49
    Description:
    判断Auth信息