1. 组件介绍

PyAlink 组件是 PAI-Designer 平台上的一个算法组件,该组件能够让用户通过编写PyAlink脚本的方式调用Alink算法,并且可以与其他 Designer 的算法组件无缝衔接,共同帮助客户完成业务链路的搭建及效果验证。有了这个组件,用户可以非常容易在PAI产品端调用Alink的所有算法。比如,通过该组件调用Alink的分类算法做分类,调用Alink的回归算法做回归,调用Alink的推荐算法做推荐,或者调用多种算法去解决一个更加复杂的业务问题。

2. 组件使用方式

下面将介绍如何在 PAI-Designer 产品中通过 PyAlink 组件将一个用 PyAlink 脚本实现的业务流程在产品中使用阿里云资源运行该业务流程。这里通过对notebook中的demo(用ItemCf模型对movielens数据集进行打分)使用PyAlink组件完成其在阿里云上运行来展开介绍。首先给出的是notebook中的demo脚本:

  1. from pyalink.alink import *
  2. useLocalEnv(2)
  3. PATH = "http://alink-test.oss-cn-beijing.aliyuncs.com/yuhe/movielens/"
  4. RATING_FILE = "ratings.csv"
  5. PREDICT_FILE = "predict.csv"
  6. RATING_SCHEMA_STRING = "user_id long, item_id long, rating int, ts long"
  7. ratingsData = CsvSourceBatchOp() \
  8. .setFilePath(PATH + RATING_FILE) \
  9. .setFieldDelimiter("\t") \
  10. .setSchemaStr(RATING_SCHEMA_STRING)
  11. predictData = CsvSourceBatchOp() \
  12. .setFilePath(PATH + PREDICT_FILE) \
  13. .setFieldDelimiter("\t") \
  14. .setSchemaStr(RATING_SCHEMA_STRING)
  15. itemCFModel = ItemCfTrainBatchOp() \
  16. .setUserCol("user_id").setItemCol("item_id") \
  17. .setRateCol("rating").linkFrom(ratingsData)
  18. itemCF = ItemCfRateRecommender() \
  19. .setModelData(itemCFModel) \
  20. .setItemCol("item_id") \
  21. .setUserCol("user_id") \
  22. .setReservedCols(["user_id", "item_id"]) \
  23. .setRecommCol("prediction_score")
  24. itemCF.transform(predictData).print()

下面分四步操作完成脚本在PAI-Designer 上运行。

Step1. 打开PAI-Designer,并创建工作流

image.png
image.png

Step2. 进入新创建的工作流,拉取PyAlink组件

image.png

Step3. 将notebook的 PyAlink 脚本改写并填到代码对话框中

image.png填写脚本内容如下:

  1. from pyalink.alink import *
  2. def main(sources, sinks, parameter):
  3. PATH = "http://alink-test.oss-cn-beijing.aliyuncs.com/yuhe/movielens/"
  4. RATING_FILE = "ratings.csv"
  5. PREDICT_FILE = "predict.csv"
  6. RATING_SCHEMA_STRING = "user_id long, item_id long, rating int, ts long"
  7. ratingsData = CsvSourceBatchOp() \
  8. .setFilePath(PATH + RATING_FILE) \
  9. .setFieldDelimiter("\t") \
  10. .setSchemaStr(RATING_SCHEMA_STRING)
  11. predictData = CsvSourceBatchOp() \
  12. .setFilePath(PATH + PREDICT_FILE) \
  13. .setFieldDelimiter("\t") \
  14. .setSchemaStr(RATING_SCHEMA_STRING)
  15. itemCFModel = ItemCfTrainBatchOp() \
  16. .setUserCol("user_id").setItemCol("item_id") \
  17. .setRateCol("rating").linkFrom(ratingsData);
  18. itemCF = ItemCfRateRecommender() \
  19. .setModelData(itemCFModel) \
  20. .setItemCol("item_id") \
  21. .setUserCol("user_id") \
  22. .setReservedCols(["user_id", "item_id"]) \
  23. .setRecommCol("prediction_score")
  24. result = itemCF.transform(predictData)
  25. result.link(sinks[0])
  26. BatchOperator.execute()

和notebook脚本的差别非常小,只是在数据source和sink的时候有细微差别。其中代码主体部分的修改只有一行,就是 result.link(sinks[0]) 这一行,该行将数据写出并在输出桩可以访问。由于这个demo中的数据是读取http文件,这个操作在PAI-Designer中和notebook是相同的,无需修改。

如果输入数是一个ODPS表则需要修改。此时可以直接通过输入桩读取ODPS表的数据,代码中sources[0]表示第一个输入桩对应的表,sources[1]表示第二个输入桩对应的表,依次类推,最多支持4个输入的ODPS表。具体示例代码如下:

  1. train_data = sources[0]
  2. test_data = sources[1]

输出和输入类似,但是是将数据写出到对应的输出桩。最多支持4个输出结果表。具体代码如下:

  1. result0.link(sinks[0])
  2. result1.link(sinks[1])

Step4. 执行组件并察看结果

在执行组件之前需要设置资源,目前支持DLC单机多并发和MaxCompute两种模式,如果数据量比较小且是在调试验证阶段,建议使用DLC单机多并发模式,如果是数据规模很大或者是实际生产任务,则建议使用MaxCompute模式。
image.png
鼠标选中组件并右键选择执行该节点,就可以执行该PyAlink任务。
image.png
执行完后,可以右键选择查看数据。image.png

3. 数据读入写出介绍

读入数据方式

  1. 读取ODPS表,通过输入桩的方式从上游传入,代码示例:

    1. train_data = sources[0]
    2. test_data = sources[1]
  2. 读取网络文件系统的数据,通过Alink的Source组件(CsvSourceBatchOp,AkSourceBatchOP)在代码中实现数据的读入

    1. 读入http格式的网络共享文件,代码示例:

      1. ratingsData = CsvSourceBatchOp() \
      2. .setFilePath(PATH + RATING_FILE) \
      3. .setFieldDelimiter("\t") \
      4. .setSchemaStr(RATING_SCHEMA_STRING)
    2. 读入OSS网络文件,该方式需要设置数据读取路径,设置方式是在控制台,选取对应OSS路径,设置完后,代码示例如下:

      1. model_data = AkSourceBatchOp().setFilePath("oss://xxxxxxxx/model_20220323.ak")

写出数据方式

  1. 写出ODPS表,通过输出桩的方式写出到下游,代码示例:

    1. result0.link(sinks[0])
    2. result1.link(sinks[1])
    3. BatchOperator.execute()
  2. 写出OSS网络文件,该方式需要设置数据写出路径,设置方式是在控制台,选取对应OSS路径,设置完后,代码示例如下:

    1. result.link(AkSinkBatchOp() \
    2. .setFilePath("oss://xxxxxxxx/model_20220323.ak") \
    3. .setOverwriteSink(True))
    4. BatchOperator.execute()

    其中 xxxxxxxx 是bucket名字 + 模型路径。

    4. 与其他Designer组件组合使用

    image.png
    上图给出的是PyAlink脚本与其他的算法组件组合使用的例子,这个例子展示了PyAlink的输入输出桩和其他组件无任何差别,可以相互连接共同使用。

    5. PyAlink 组件训练生成的模型如何部署到EAS

    1. 生成待部署的模型

    目前PyAlink生成PipelineModel可以部署到EAS,下面给出一个生成PipelineModel的脚本示例: ```python from pyalink.alink import *

def main(sources, sinks, parameter): PATH = “http://alink-test.oss-cn-beijing.aliyuncs.com/yuhe/movielens/“ RATING_FILE = “ratings.csv” PREDICT_FILE = “predict.csv” RATING_SCHEMA_STRING = “user_id long, item_id long, rating int, ts long”

  1. ratingsData = CsvSourceBatchOp() \
  2. .setFilePath(PATH + RATING_FILE) \
  3. .setFieldDelimiter("\t") \
  4. .setSchemaStr(RATING_SCHEMA_STRING)
  5. predictData = CsvSourceBatchOp() \
  6. .setFilePath(PATH + PREDICT_FILE) \
  7. .setFieldDelimiter("\t") \
  8. .setSchemaStr(RATING_SCHEMA_STRING)
  9. itemCFModel = ItemCfTrainBatchOp() \
  10. .setUserCol("user_id").setItemCol("item_id") \
  11. .setRateCol("rating").linkFrom(ratingsData);
  12. itemCF = ItemCfRateRecommender() \
  13. .setModelData(itemCFModel) \
  14. .setItemCol("item_id") \
  15. .setUserCol("user_id") \
  16. .setReservedCols(["user_id", "item_id"]) \
  17. .setRecommCol("prediction_score")
  18. model = PipelineModel(itemCF)
  19. model.save().link(AkSinkBatchOp() \
  20. .setFilePath("oss://weibozhao/model.ak") \
  21. .setOverwriteSink(True))
  22. BatchOperator.execute()
  1. 上面的脚本会生成一个模型文件,存储在oss中,该模型文件可以够直接部署到EAS
  2. <a name="yGnyJ"></a>
  3. ### 2. 生成EAS配置文件
  4. 配置文件可以通过下面的脚本生成。不妨假设生成的配置文件为config.json
  5. ```python
  6. # EAS的配置文件
  7. import json
  8. # 生成 EAS 模型配置
  9. model_config = {}
  10. model_config['inputDataSchema'] = "id long, movieid long" # EAS接受数据的schema
  11. model_config['modelVersion'] = "v0.2"
  12. eas_config = {
  13. "name": "recomm_demo",
  14. "model_path": "http://weibozhao.oss-cn-hangzhou-zmf.aliyuncs.com/model/recomm_model.ak",
  15. "processor": "pai_desiginer_alink_processor",
  16. "metadata": {
  17. "instance": 1,
  18. "memory": 2048,
  19. "region":"cn-beijing"
  20. },
  21. "model_config": model_config
  22. }
  23. print(json.dumps(eas_config, indent=4))

config文件中的关键字解释:
name 是该部署模型的名字,用户自己定义。
model_path 是模型的OSS路径。
processor 是推理时调用的Jar包。
其他一些参数可以参考EAS部署文档:https://help.aliyun.com/document_detail/111031.html?spm=a2c4g.11186623.6.763.6c307773fEXVaJ

3. 将模型部署到EAS

  1. # 下载 eascmd: https://help.aliyun.com/document_detail/195500.html?spm=a2c4g.11186623.6.728.6d7437553LnENf
  2. # 配置好 ak、endpoint等信息
  3. # 将上面的输出保存到文件 config.json ,然后在命令行运行下面的命令
  4. ./eascmd64 -i {EAS AccessKeyId} -k {EAS AccessKeySecret} -e pai-eas.cn-beijing.aliyuncs.com create config.json