在本章的第2、3节介绍了使用Alink提供的深度学习组件KerasSequentialClassifier和KerasSequentialRegressor进行分类和回归模型的训练、预测。
实际应用中,经常需要使用TensorFlow或着PyTorch训练好的模型,对流式数据、批式数据进行预测。Alink提供了相应的流式、批式和Pipeline组件适配TensorFlow或着PyTorch模型。
本节重点介绍与PyTorch模型相关的操作。

25.5.1 生成PyTorch模型

本节所需的PyTorch模型文件mnist_model_pytorch.pt,已经被放到了OSS上,本节后面的实验会直接从网络读取该模型。https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/mnist_model_pytorch.pt

如果读者有兴趣,可以在PyTorch环境,运行下面代码便可生成PyTorch模型,从而被Alink相关组件使用。注意:PyTorch模型执需要打包为”.pt”文件,便于Alink相关组件导入模型。建议的打包示例代码在下面代码的最后部分。

  1. import torch
  2. from torchvision import datasets
  3. from torchvision.transforms import ToTensor
  4. train_data = datasets.MNIST(
  5. root="data",
  6. train=True,
  7. download=True,
  8. transform=ToTensor()
  9. )
  10. train_loader = torch.utils.data.dataloader.DataLoader(dataset=train_data, batch_size=64, shuffle=True)
  11. class Net(torch.nn.Module):
  12. def __init__(self):
  13. super(Net, self).__init__()
  14. self.conv1 = torch.nn.Sequential(
  15. torch.nn.Conv2d(1, 32, 3, 1, 1),
  16. torch.nn.ReLU(),
  17. torch.nn.MaxPool2d(2))
  18. self.conv2 = torch.nn.Sequential(
  19. torch.nn.Conv2d(32, 64, 3, 1, 1),
  20. torch.nn.ReLU(),
  21. torch.nn.MaxPool2d(2)
  22. )
  23. self.conv3 = torch.nn.Sequential(
  24. torch.nn.Conv2d(64, 64, 3, 1, 1),
  25. torch.nn.ReLU(),
  26. torch.nn.MaxPool2d(2)
  27. )
  28. self.dense = torch.nn.Sequential(
  29. torch.nn.Linear(64 * 3 * 3, 128),
  30. torch.nn.ReLU(),
  31. torch.nn.Linear(128, 10)
  32. )
  33. def forward(self, x):
  34. conv1_out = self.conv1(x)
  35. conv2_out = self.conv2(conv1_out)
  36. conv3_out = self.conv3(conv2_out)
  37. res = conv3_out.view(conv3_out.size(0), -1)
  38. out = self.dense(res)
  39. return out
  40. model = Net()
  41. print(model)
  42. optimizer = torch.optim.Adam(model.parameters())
  43. loss_func = torch.nn.CrossEntropyLoss()
  44. for epoch in range(5):
  45. print('epoch {}'.format(epoch + 1))
  46. train_loss = 0.
  47. train_acc = 0.
  48. for batch_x, batch_y in train_loader:
  49. batch_x, batch_y = torch.autograd.Variable(batch_x), torch.autograd.Variable(batch_y)
  50. out = model(batch_x)
  51. loss = loss_func(out, batch_y)
  52. train_loss += loss.item()
  53. pred = torch.max(out, 1)[1]
  54. train_correct = (pred == batch_y).sum()
  55. train_acc += train_correct.item()
  56. optimizer.zero_grad()
  57. loss.backward()
  58. optimizer.step()
  59. print('Train Loss: {:.6f}, Acc: {:.6f}'.format(train_loss / (len(
  60. train_data)), train_acc / (len(train_data))))
  61. traced = torch.jit.trace(model, (torch.rand(1, 1, 28, 28)))
  62. torch.jit.save(traced, "mnist_model_pytorch.pt")

输出模型及训练信息如下:

  1. Net(
  2. (conv1): Sequential(
  3. (0): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  4. (1): ReLU()
  5. (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  6. )
  7. (conv2): Sequential(
  8. (0): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  9. (1): ReLU()
  10. (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  11. )
  12. (conv3): Sequential(
  13. (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  14. (1): ReLU()
  15. (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  16. )
  17. (dense): Sequential(
  18. (0): Linear(in_features=576, out_features=128, bias=True)
  19. (1): ReLU()
  20. (2): Linear(in_features=128, out_features=10, bias=True)
  21. )
  22. )
  23. epoch 1
  24. Train Loss: 0.003327, Acc: 0.933000
  25. epoch 2
  26. Train Loss: 0.000876, Acc: 0.982667
  27. epoch 3
  28. Train Loss: 0.000592, Acc: 0.987783
  29. epoch 4
  30. Train Loss: 0.000466, Acc: 0.990467
  31. epoch 5
  32. Train Loss: 0.000403, Acc: 0.991733

25.5.2 批式任务中使用PyTorch模型

使用TorchModelPredictBatchOp组件,可以加载PyTorch模型进行批式预测。关于该组件的详细说明参见Alink文档 https://www.yuque.com/pinshu/alink_doc/torchmodelpredictbatchop .
使用PyTorch模型前,还需要将输入数据列的类型转换为Tensor格式,可以使用VectorToTensorBatchOp组件。具体代码如下所示:

  1. new AkSourceBatchOp()
  2. .setFilePath(Chap13.DATA_DIR + Chap13.DENSE_TEST_FILE)
  3. .link(
  4. new VectorToTensorBatchOp()
  5. .setTensorDataType("float")
  6. .setTensorShape(1, 1, 28, 28)
  7. .setSelectedCol("vec")
  8. .setOutputCol("tensor")
  9. .setReservedCols("label")
  10. )
  11. .link(
  12. new TorchModelPredictBatchOp()
  13. .setModelPath(
  14. "https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/mnist_model_pytorch.pt")
  15. .setSelectedCols("tensor")
  16. .setOutputSchemaStr("output_1 FLOAT_TENSOR")
  17. )
  18. .lazyPrint(3)
  19. .link(
  20. new UDFBatchOp()
  21. .setFunc(new GetMaxIndex())
  22. .setSelectedCols("output_1")
  23. .setOutputCol("pred")
  24. )
  25. .lazyPrint(3)
  26. .link(
  27. new EvalMultiClassBatchOp()
  28. .setLabelCol("label")
  29. .setPredictionCol("pred")
  30. .lazyPrintMetrics()
  31. );
  32. BatchOperator.execute();

这里用到了一个自定义函数,具体定义如下:

  1. public static class GetMaxIndex extends ScalarFunction {
  2. public int eval(FloatTensor tensor) {
  3. int k = 0;
  4. float max = tensor.getFloat(0, 0);
  5. for (int i = 1; i < 10; i++) {
  6. if (tensor.getFloat(0, i) > max) {
  7. k = i;
  8. max = tensor.getFloat(0, i);
  9. }
  10. }
  11. return k;
  12. }
  13. }

批式任务的运行结果为:

  1. label|tensor|output_1
  2. -----|------|--------
  3. 0|FloatTensor(1,1,28,28) |FloatTensor(1,10)
  4. |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[4044.1326 -3060.9893 -34.796455 ... -1278.3772 -2814.0508 -1284.4863]]
  5. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  6. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  7. | ... |
  8. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  9. | ... ... |
  10. 9|FloatTensor(1,1,28,28) |FloatTensor(1,10)
  11. |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[-2608.7097 -439.29208 414.57578 ... -343.33176 209.20328 2627.7]]
  12. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  13. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  14. | ... |
  15. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  16. | ... ... |
  17. 6|FloatTensor(1,1,28,28) |FloatTensor(1,10)
  18. |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[700.5958 -2671.2515 -10.615548 ... -3413.485 915.7342 -1291.0757]]
  19. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  20. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  21. | ... |
  22. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  23. | ... ... |
  24. label|tensor|output_1|pred
  25. -----|------|--------|----
  26. 0|FloatTensor(1,1,28,28) |FloatTensor(1,10) |0
  27. |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[4044.1326 -3060.9893 -34.796455 ... -1278.3772 -2814.0508 -1284.4863]]|
  28. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  29. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  30. | ... |
  31. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  32. | ... ... |
  33. 9|FloatTensor(1,1,28,28) |FloatTensor(1,10) |9
  34. |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[-2608.7097 -439.29208 414.57578 ... -343.33176 209.20328 2627.7]]|
  35. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  36. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  37. | ... |
  38. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  39. | ... ... |
  40. 6|FloatTensor(1,1,28,28) |FloatTensor(1,10) |6
  41. |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[700.5958 -2671.2515 -10.615548 ... -3413.485 915.7342 -1291.0757]]|
  42. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  43. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  44. | ... |
  45. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  46. | ... ... |
  47. -------------------------------- Metrics: --------------------------------
  48. Accuracy:0.9903 Macro F1:0.9902 Micro F1:0.9903 Kappa:0.9892
  49. |Pred\Real| 9| 8| 7|...| 2| 1| 0|
  50. |---------|---|---|----|---|----|----|---|
  51. | 9|992| 1| 4|...| 0| 0| 0|
  52. | 8| 2|965| 1|...| 0| 1| 1|
  53. | 7| 5| 2|1012|...| 2| 0| 1|
  54. | ...|...|...| ...|...| ...| ...|...|
  55. | 2| 2| 4| 9|...|1030| 3| 2|
  56. | 1| 0| 0| 2|...| 0|1128| 0|
  57. | 0| 0| 2| 0|...| 0| 0|973|

25.5.3 流式任务中使用PyTorch模型

使用TorchModelPredictStreamOp组件,可以加载PyTorch模型进行批式预测。关于该组件的详细说明参见Alink文档 https://www.yuque.com/pinshu/alink_doc/torchmodelpredictstreamop .
使用PyTorch模型前,还需要将输入数据列的类型转换为Tensor格式,可以使用VectorToTensorStreamOp组件。具体代码如下所示:

  1. new AkSourceStreamOp()
  2. .setFilePath(Chap13.DATA_DIR + Chap13.DENSE_TEST_FILE)
  3. .link(
  4. new VectorToTensorStreamOp()
  5. .setTensorDataType("float")
  6. .setTensorShape(1, 1, 28, 28)
  7. .setSelectedCol("vec")
  8. .setOutputCol("tensor")
  9. .setReservedCols("label")
  10. )
  11. .link(
  12. new TorchModelPredictStreamOp()
  13. .setModelPath(
  14. "https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/mnist_model_pytorch.pt")
  15. .setSelectedCols("tensor")
  16. .setOutputSchemaStr("output_1 FLOAT_TENSOR")
  17. )
  18. .link(
  19. new UDFStreamOp()
  20. .setFunc(new GetMaxIndex())
  21. .setSelectedCols("output_1")
  22. .setOutputCol("pred")
  23. )
  24. .sample(0.001)
  25. .print();
  26. StreamOperator.execute();

运行结果为:

  1. label|tensor|output_1|pred
  2. -----|------|--------|----
  3. 3|FloatTensor(1,1,28,28) |FloatTensor(1,10) |3
  4. |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[-1590.9832 -1487.7247 591.7295 ... -468.07892 671.602 -1350.5359]]|
  5. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  6. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  7. | ... |
  8. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  9. | ... ... |
  10. 8|FloatTensor(1,1,28,28) |FloatTensor(1,10) |8
  11. |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[-503.25494 -1483.4431 684.2502 ... -2311.5735 2921.0408 118.283745]]|
  12. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  13. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  14. | ... |
  15. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  16. | ... ... |
  17. 6|FloatTensor(1,1,28,28) |FloatTensor(1,10) |6
  18. |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[-375.05377 -1370.912 -769.6774 ... -3442.0344 -582.13983 -2177.5767]]|
  19. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  20. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  21. | ... |
  22. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  23. | ... ... |
  24. ......

25.5.4 Pipeline中使用PyTorch模型

学习了如何在批式任务和流式任务中使用PyTorch模型,我们很容易在Pipeline中使用PyTorch模型进行预测,只要将其中的批式/流式组件对应到Pipeline组件即可。具体代码如下:

  1. new PipelineModel(
  2. new VectorToTensor()
  3. .setTensorDataType("float")
  4. .setTensorShape(1, 1, 28, 28)
  5. .setSelectedCol("vec")
  6. .setOutputCol("tensor")
  7. .setReservedCols("label"),
  8. new TorchModelPredictor()
  9. .setModelPath(
  10. "https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/mnist_model_pytorch.pt")
  11. .setSelectedCols("tensor")
  12. .setOutputSchemaStr("output_1 FLOAT_TENSOR")
  13. ).save(Chap13.DATA_DIR + PIPELINE_PYTORCH_MODEL, true);
  14. BatchOperator.execute();
  15. PipelineModel
  16. .load(Chap13.DATA_DIR + PIPELINE_PYTORCH_MODEL)
  17. .transform(
  18. new AkSourceStreamOp()
  19. .setFilePath(Chap13.DATA_DIR + Chap13.DENSE_TEST_FILE)
  20. )
  21. .link(
  22. new UDFStreamOp()
  23. .setFunc(new GetMaxIndex())
  24. .setSelectedCols("output_1")
  25. .setOutputCol("pred")
  26. )
  27. .sample(0.001)
  28. .print();
  29. StreamOperator.execute();

运行结果为:

  1. label|tensor|output_1|pred
  2. -----|------|--------|----
  3. 4|FloatTensor(1,1,28,28) |FloatTensor(1,10) |4
  4. |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[-993.6407 -458.57587 1282.9576 ... -2273.686 -2319.7793 -2746.323]]|
  5. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  6. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  7. | ... |
  8. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  9. | ... ... |
  10. 1|FloatTensor(1,1,28,28) |FloatTensor(1,10) |1
  11. |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[-1004.3513 2189.4736 -777.72845 ... -1088.758 355.25262 -921.88556]]|
  12. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  13. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  14. | ... |
  15. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  16. | ... ... |
  17. 2|FloatTensor(1,1,28,28) |FloatTensor(1,10) |2
  18. |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[-1155.4056 -317.33963 4966.4814 ... -910.1352 -2354.1606 -3428.952]]|
  19. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  20. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  21. | ... |
  22. | [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
  23. | ... ... |
  24. ......

25.5.5 LocalPredictor中使用PyTorch模型

除了通过Alink任务使用PyTorch模型,也可以使用LocalPredictor进行嵌入式预测。示例代码如下,首先从数据集中抽取一行数据,输入数据的SchemaStr为“vec string, label int”;然后通过导入上一节保存的Pipeline模型,并设置输入数据的SchemaStr,得到LocalPredictor类型的实例localPredictor;如果不确定预测结果各列的含义,可以打印输出localPredictor的OutputSchema;使用localPredictor的map方法获得预测结果。

  1. AkSourceBatchOp source = new AkSourceBatchOp()
  2. .setFilePath(Chap13.DATA_DIR + Chap13.DENSE_TEST_FILE);
  3. System.out.println(source.getSchema());
  4. Row row = source.firstN(1).collect().get(0);
  5. LocalPredictor localPredictor
  6. = new LocalPredictor(Chap13.DATA_DIR + PIPELINE_PYTORCH_MODEL, "vec string, label int");
  7. System.out.println(localPredictor.getOutputSchema());
  8. Row r = localPredictor.map(row);
  9. System.out.println(r.getField(0).toString() + " | " + r.getField(2).toString());

运行结果为:

  1. root
  2. |-- vec: STRING
  3. |-- label: INT
  4. root
  5. |-- label: INT
  6. |-- tensor: LEGACY(GenericType<com.alibaba.alink.common.linalg.tensor.FloatTensor>)
  7. |-- output_1: LEGACY(GenericType<com.alibaba.alink.common.linalg.tensor.FloatTensor>)
  8. 1 | [[-794.75903 2662.567 -658.8216 ... -173.1484 -263.41855 -712.4674]]