ONNX(Open Neural Network Exchange,开放神经网络交换)是为人工智能模型(深度学习和传统ML)提供的一种开放格式,可使模型在不同框架之间进行转移。详见 https://github.com/onnx/onnx

Alink提供了OnnxModelPredictBatchOp、OnnxModelPredictStreamOp和�OnnxModelPredictor组件,分别处理批式场景、流式场景和进行Pipeline封装。
��
各组件都需要指定 ONNX 模型的modelPath模型路径参数。模型路径可以是以下形式:

  • 本地文件:file:// 加绝对路径,例如 file:///tmp/dnn.py
  • Java 包中的资源文件:res:// 加路径,例如 res:///dnn.py
  • http/https 文件:http://https:// 路径;
  • OSS 文件:oss:// 加路径和 Endpoint 和 access key
    等信息,例如oss://bucket/xxx/xxx/xxx.py?host=xxx&access_key_id=xxx&access_key_secret=xxx
  • HDFS 文件:hdfs:// 加路径;

参与模型预测的数据通过参数 selectedCols 设置,需要注意以下几点:

  • ONNX 模型使用 input name 来标识模型输入桩的,因此需要设置 inputNames,与 selectedCols 一一对应,表明某列对应某输入桩。inputNames 不填写时,默认与列名一致。
  • 仅支持输入桩为 Tensor 类型,不支持 SequencesMaps 类型。
  • 所选择的列的类型需要是float, double, int, long, byte, string 类型及其对应的 Alink Tensor 类型。

模型输出信息通过参数 outputSchemaStr 指定,包括输出列名以及名称,需要注意以下几点:

  • ONNX 模型使用 output name 来标识模型输出桩的,因此需要设置 outputNames,与 outputSchemaStr 一一对应,表明某列对应某输入桩。outputNames 不填写时,默认与列名一致。
  • 仅支持输出桩为 Tensor 类型,不支持 SequencesMaps 类型。
  • outputSchemaStr 填写的输出类型需要是对应的输出桩类型,例如 输出桩类型 为 Float 类型的 Tensor 时,对应的 Alink 类型可以是 TENSOR 或者 FLOAT_TENSOR,当输出仅包含一个元素时,还可以是 FLOAT

组件使用的是 ONNX 1.11.0 版本,当有 GPU 时,自动使用 GPU 进行推理,否则使用 CPU 进行推理。

25.7.1 批式任务中使用ONNX模型

使用OnnxModelPredictBatchOp组件,可以加载ONNX模型进行批式预测。关于该组件的详细说明参见Alink文档 https://www.yuque.com/pinshu/alink_doc/onnxmodelpredictbatchop .
使用ONNX模型前,还需要将输入数据列的类型转换为Tensor格式,可以使用VectorToTensorBatchOp组件。具体代码如下所示:

  1. AkSourceBatchOp()\
  2. .setFilePath(Chap13_DATA_DIR + Chap13_DENSE_TEST_FILE)\
  3. .link(
  4. VectorToTensorBatchOp()\
  5. .setTensorDataType("float")\
  6. .setTensorShape([1, 1, 28, 28])\
  7. .setSelectedCol("vec")\
  8. .setOutputCol("tensor")\
  9. .setReservedCols(["label"])
  10. )\
  11. .link(
  12. OnnxModelPredictBatchOp()\
  13. .setModelPath(
  14. "https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/cnn_mnist_pytorch.onnx")\
  15. .setSelectedCols(["tensor"])\
  16. .setInputNames(["0"])\
  17. .setOutputNames(["21"])\
  18. .setOutputSchemaStr("probabilities FLOAT_TENSOR")
  19. )\
  20. .link(
  21. UDFBatchOp()\
  22. .setFunc(get_max_index)\
  23. .setSelectedCols(["probabilities"])\
  24. .setOutputCol("pred")
  25. )\
  26. .lazyPrint(3)\
  27. .link(
  28. EvalMultiClassBatchOp()\
  29. .setLabelCol("label")\
  30. .setPredictionCol("pred")\
  31. .lazyPrintMetrics()
  32. )
  33. BatchOperator.execute()

这里用到了一个自定义函数,具体定义如下:

  1. import numpy as np
  2. @udf(input_types=[AlinkDataTypes.TENSOR()], result_type=AlinkDataTypes.INT())
  3. def get_max_index(tensor: np.ndarray):
  4. return tensor.argmax().item()

批式任务的运行结果为:
image.png

  1. -------------------------------- Metrics: --------------------------------
  2. Accuracy:0.9904 Macro F1:0.9904 Micro F1:0.9904 Kappa:0.9893
  3. |Pred\Real| 9| 8| 7|...| 2| 1| 0|
  4. |---------|---|---|----|---|----|----|---|
  5. | 9|988| 3| 3|...| 0| 0| 0|
  6. | 8| 1|962| 0|...| 1| 0| 1|
  7. | 7| 6| 2|1014|...| 3| 0| 1|
  8. | ...|...|...| ...|...| ...| ...|...|
  9. | 2| 0| 2| 3|...|1021| 0| 0|
  10. | 1| 3| 0| 6|...| 3|1134| 0|
  11. | 0| 2| 4| 0|...| 1| 0|978|

25.7.2 流式任务中使用ONNX模型

使用OnnxModelPredictStreamOp组件,可以加载ONNX模型进行批式预测。关于该组件的详细说明参见Alink文档 https://www.yuque.com/pinshu/alink_doc/onnxmodelpredictstreamop .
使用ONNX模型前,还需要将输入数据列的类型转换为Tensor格式,可以使用VectorToTensorStreamOp组件。具体代码如下所示:

  1. AkSourceStreamOp()\
  2. .setFilePath(Chap13_DATA_DIR + Chap13_DENSE_TEST_FILE)\
  3. .link(
  4. VectorToTensorStreamOp()\
  5. .setTensorDataType("float")\
  6. .setTensorShape([1, 1, 28, 28])\
  7. .setSelectedCol("vec")\
  8. .setOutputCol("tensor")\
  9. .setReservedCols(["label"])
  10. )\
  11. .link(
  12. OnnxModelPredictStreamOp()\
  13. .setModelPath("https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/cnn_mnist_pytorch.onnx")\
  14. .setSelectedCols(["tensor"])\
  15. .setInputNames(["0"])\
  16. .setOutputNames(["21"])\
  17. .setOutputSchemaStr("probabilities FLOAT_TENSOR")
  18. )\
  19. .link(
  20. UDFStreamOp()\
  21. .setFunc(get_max_index)\
  22. .setSelectedCols(["probabilities"])\
  23. .setOutputCol("pred")
  24. )\
  25. .sample(0.001)\
  26. .print()
  27. StreamOperator.execute()

运行结果为:
image.png

25.7.3 Pipeline中使用ONNX模型

学习了如何在批式任务和流式任务中使用ONNX模型,我们很容易在Pipeline中使用ONNX模型进行预测,只要将其中的批式/流式组件对应到Pipeline组件即可。具体代码如下:

  1. PipelineModel(
  2. VectorToTensor()\
  3. .setTensorDataType("float")\
  4. .setTensorShape([1, 1, 28, 28])\
  5. .setSelectedCol("vec")\
  6. .setOutputCol("tensor")\
  7. .setReservedCols(["label"]),
  8. OnnxModelPredictor()\
  9. .setModelPath("https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/cnn_mnist_pytorch.onnx")\
  10. .setSelectedCols(["tensor"])\
  11. .setInputNames(["0"])\
  12. .setOutputNames(["21"])\
  13. .setOutputSchemaStr("probabilities FLOAT_TENSOR")
  14. ).save(Chap13_DATA_DIR + PIPELINE_ONNX_MODEL, True)
  15. BatchOperator.execute()
  16. PipelineModel\
  17. .load(Chap13_DATA_DIR + PIPELINE_ONNX_MODEL)\
  18. .transform(
  19. AkSourceStreamOp()\
  20. .setFilePath(Chap13_DATA_DIR + Chap13_DENSE_TEST_FILE)
  21. )\
  22. .link(
  23. UDFStreamOp()\
  24. .setFunc(get_max_index)\
  25. .setSelectedCols(["probabilities"])\
  26. .setOutputCol("pred")
  27. )\
  28. .sample(0.001)\
  29. .print()
  30. StreamOperator.execute()

运行结果为:
image.png

25.7.4 LocalPredictor中使用ONNX模型

除了通过Alink任务使用ONNX模型,也可以使用LocalPredictor进行嵌入式预测。示例代码如下,首先从数据集中抽取一行数据,输入数据的SchemaStr为“vec string, label int”;然后通过导入上一节保存的Pipeline模型,并设置输入数据的SchemaStr,得到LocalPredictor类型的实例localPredictor;如果不确定预测结果各列的含义,可以打印输出localPredictor的OutputSchema;使用localPredictor的map方法获得预测结果。

  1. source = AkSourceBatchOp().setFilePath(Chap13_DATA_DIR + Chap13_DENSE_TEST_FILE)
  2. print(source.getSchemaStr())
  3. df = source.firstN(1).collectToDataframe()
  4. row = [df.iat[0,0], df.iat[0,1].item()]
  5. localPredictor = LocalPredictor(Chap13_DATA_DIR + PIPELINE_ONNX_MODEL, "vec string, label int")
  6. print(localPredictor.getOutputSchemaStr())
  7. r = localPredictor.map(row)
  8. print(str(r[0]) + " | " + str(r[2]))

运行结果为:

  1. vec VARCHAR, label INT
  2. label INT, tensor ANY<com.alibaba.alink.common.linalg.tensor.FloatTensor>, probabilities ANY<com.alibaba.alink.common.linalg.tensor.FloatTensor>
  3. 2 | FloatTensor(1,10)
  4. [[-1397.7644 -999.7633 0.0 ... -1533.649 -1142.1455 -1608.5127]]