FATE Pipeline

Pipeline是一种高级的python API,它允许使用者以顺序方式对FATE任务进行设计、启动和查询。FATEPipeline具有用户友好的特点,且与FATE命令行工具保持一致。用户可以通过向Pipeline中添加组件来自定义任务工作流,然后通过一次调用即可启动任务。此外, Pipeline提供了训练后进行预测和查询信息的功能。通过运行简单示例可以了解FATEPipeline的工作方式。参与方id和工作模式的默认值可能需要根据部署设置进行修改。

  1. python pipeline-mini-demo.py config.yaml

更多有关Pipeline的示例,请参考examples

FATE任务是有向无环图

FATE任务是由算法组件节点组成的。FATEPipeline提供了易于使用的工具,用于配置任务的顺序和设置。FATE以模块化风格编写。模块被设计为具有输入输出数据和模型。因此,通过设置一个模块的输出为另一个模块的输入,可将两个模块相连接。通过观察FATE的各个模块如何处理数据,我们可以看出FATE任务实际上是由一系列子任务组成的。例如,在前文提及的简单示例中,guest方的数据首先通过Reader模块读取,然后导入至DataIO模块。通过将数据输入至Intersection模块,可寻找到guest方和host方的交叠id。最终,基于数据训练得到HeteroLR模型。每个提及的模块都是关于数据的一个小任务,组合起来就构成了一个模型训练任务。

除了给出的简单示例,一个任务可能包含多个数据集和模型。更多有关Pipeline的示例,请参考examples

安装Pipeline

PipelineCLI

在安装FATE客户端成功后,用户需为Pipeline配置服务器信息和日志目录。Pipeline提供了命令行工具用于快速设置。运行以下命令以获取更多信息。

  1. pipeline init --help

Pipeline接口

组件

FATE模块被包装在PipelineAPI的component中。每个组件可以输入输出DataModel。组件参数可在初始化时便捷地进行设定。未指定的参数将采用默认值。所有组件均有可随意设置的name。组件的名称是其标识符,因此其名称在一个Pipeline中必须唯一。为便于追踪,我们建议每个组件的名称包含一个数字尾缀。

每个组件具有输入和/或输出Data和/或Model。有关如何使用组件的细节,请参考guide

一个采用指定参数值进行组件初始化的示例如下:

  1. hetero_lr_0 = HeteroLR(name="hetero_lr_0", early_stop="weight_diff", max_iter=10, early_stopping_rounds=2, validation_freqs=2)

输入

输入封装了组件全部的输入,包括DataModel输入。要访问组件的input,请设置其input属性:

  1. input_all = dataio_0.input

输出

输出封装了组件的全部输出结果,包括DataModel输出。要访问组件的output,请设置其output属性:

  1. output_all = dataio_0.output

数据

Data包装了组件全部数据类型的输入输出。FATEPipeline包括包含5种data,每种用于不同场景。更多相关信息,请参考here

模型

Model定义了组件的模型输入和输出。类似于Data,两种models可用于不同目的。更多相关信息,请参考here

构建Pipeline

以下是构建Pipeline的概要指南。请参考简单示例以获取解释性的演示。

初始化Pipeline后,应制定任务参与方和发起方。以下是一个初始化Pipeline的示例:

  1. pipeline = PipeLine()
  2. pipeline.set_initiator(role='guest', party_id=9999)
  3. pipeline.set_roles(guest=9999, host=10000, arbiter=10000)

需要设置Reader进行读取数据,以便其他组件可以处理数据。定义Reader组件:

  1. reader_0 = Reader(name="reader_0")

在大多数情况下,DataIO位于Reader之后,用于将数据转换为DataInstance格式,该格式后续可用于数据工程和模型训练。部分组件(如UnionIntersection)可以直接在非DataInstance表上运行。

各参与方可通过设置get_party_instance,独立配置所有的Pipeline组件。例如,guest方可通过如下命令配置DataIO组件:

  1. dataio_0 = DataIO(name="dataio_0")
  2. guest_component_instance = dataio_0.get_party_instance(role='guest', arty_id=9999)
  3. guest_component_instance.component_param(with_label=True, output_format="dense")

可使用add_component将组件加入至Pipeline中。将DataIO组件加入至先前已创建的Pipeline的命令如下:

  1. pipeline.add_component(dataio_0, data=Data(data=reader_0.output.data))

以Keras方式构建FATE NN 模型

在Pipeline中,用户能够以Keras方式构建NN结构。以Homo-NN为例:

首先,导入Keras,定义你的nn结构:

  1. from tensorflow.keras import optimizers
  2. from tensorflow.keras.layers import Dense
  3. layer_0 = Dense(units=6, input_shape=(10,), activation="relu")
  4. layer_1 = Dense(units=1, activation="sigmoid")

然后,像Keras中使用序列类型一样,将nn层加入至Homo-NN模型中:

  1. from pipeline.component.homo_nn import HomoNN
  2. # set parameter
  3. homo_nn_0 = HomoNN(name="homo_nn_0", max_iter=10, batch_size=-1, early_stop={"early_stop": "diff", "eps": 0.0001})
  4. homo_nn_0.add(layer_0)
  5. homo_nn_0.add(layer_1)

设置优化器,编译Homo-NN模型:

  1. homo_nn_0.compile(optimizer=optimizers.Adam(learning_rate=0.05), metrics=["Hinge", "accuracy", "AUC"], loss="binary_crossentropy")

将其加入至Pipeline中:

  1. pipeline.add_component(homo_nn, data=Data(train_data=dataio_0.output.data))

初始化运行任务参数

为了训练或预测,用户需初始化运行环境,如“backend”和“work_mode”,

  1. from pipeline.runtime.entity import JobParameters
  2. job_parameters = JobParameters(backend=Backend.EGGROLL, work_mode=WorkMode.STANDALONE

运行Pipeline

在添加全部组件后,用于在运行设计好的任务前,需要首先编译Pipeline。编译后,Pipeline可在相适应的BackendWorkMode下进行拟合(运行训练任务)。

  1. pipeline.compile()
  2. pipeline.fit(job_parameters)

任务查询

FATEPipeline提供了API用于查询组件信息,包括数据、模型和概要。所有查询API均具有与FlowPy相匹配的名称,而Pipeline检索查询结果并将结果直接返回给用户。

  1. summary = pipeline.get_component("hetero_lr_0").get_summary()

部署组件

在完成Pipeline拟合后,可在新的数据集上进行预测。在预测前,首先需要部署必要的组件。这一步中,预测Pipeline选择组件用于预测任务。

  1. pipeline.deploy_component([dataio_0, hetero_lr_0])

基于Pipeline预测

首先,初始化一个新的Pipeline,然后指定用于预测的数据源。

  1. predict_pipeline = PipeLine()
  2. predict_pipeline.add_component(reader_0)
  3. predict_pipeline.add_component(pipeline,
  4. data=Data(predict_input={pipeline.dataio_0.input.data: reader_0.output.data}))

然后可在新的Pipeline上执行预测任务。

  1. predict_pipeline.predict(job_parameters)

此外,由于Pipeline是模块化的,用户可在执行预测前为原Pipeline添加新的组件。

  1. predict_pipeline.add_component(evaluation_0, data=Data(data=pipeline.hetero_lr_0.output.data))
  2. predict_pipeline.predict(job_parameters)

保存和恢复Pipeline

可使用dump接口保存Pipeline。

  1. pipeline.dump("pipeline_saved.pkl")

可使用load_model_from_file接口恢复Pipeline。

  1. from pipeline.backend.pipeline import PineLine
  2. PipeLine.load_model_from_file("pipeline_saved.pkl")

Pipeline的概要信息

可使用describe接口获取Pipeline信息,该接口可打印创建时间、拟合或预测状态以及构造的dsl(如存在)。

  1. pipeline.describe()

上传数据

Pipeline提供了上传本地数据表格的功能。请参考upload_demo获取快速示例。请注意,可以一次性添加所有上传数据,并且用于执行上传的Pipeline可以是训练Pipeline或预测Pipeline(或演示中的单独Pipeline)。

Pipeline vs. CLI

在过去的版本中,用户通过命令行接口与FATE交互,通常是手动配置conf和dsl的json文件。手动配置枯燥且易错。FATEPipeline在编译时自动生成任务配置文件,从而允许快速进行任务设计实验。