Java 类名:com.alibaba.alink.operator.stream.regression.KerasSequentialRegressorPredictStreamOp
Python 类名:KerasSequentialRegressorPredictStreamOp

功能介绍

与 KerasSequential 回归训练组件对应的流预测组件。

参数说明

| 名称 | 中文名称 | 描述 | 类型 | 是否必须? | 取值范围 | 默认值 | | —- | —- | —- | —- | —- | —- | —- |

| predictionCol | 预测结果列名 | 预测结果列名 | String | ✓ | | |

| inferBatchSize | 推理数据批大小 | 推理数据批大小 | Integer | | | 256 |

| modelFilePath | 模型的文件路径 | 模型的文件路径 | String | | | null |

| reservedCols | 算法保留列名 | 算法保留列 | String[] | | | null |

| modelStreamFilePath | 模型流的文件路径 | 模型流的文件路径 | String | | | null |

| modelStreamScanInterval | 扫描模型路径的时间间隔 | 描模型路径的时间间隔,单位秒 | Integer | | | 10 |

| modelStreamStartTime | 模型流的起始时间 | 模型流的起始时间。默认从当前时刻开始读。使用yyyy-mm-dd hh:mm:ss.fffffffff格式,详见Timestamp.valueOf(String s) | String | | | null |

代码示例

以下代码仅用于示意,可能需要修改部分代码或者配置环境后才能正常运行!

Python 代码

  1. source = CsvSourceBatchOp() \
  2. .setFilePath("https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/random_tensor.csv") \
  3. .setSchemaStr("tensor string, label double")
  4. source = ToTensorBatchOp() \
  5. .setSelectedCol("tensor") \
  6. .setTensorDataType("DOUBLE") \
  7. .setTensorShape([200, 3]) \
  8. .linkFrom(source)
  9. streamSource = CsvSourceStreamOp() \
  10. .setFilePath("https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/random_tensor.csv") \
  11. .setSchemaStr("tensor string, label double")
  12. trainBatchOp = KerasSequentialRegressorTrainBatchOp() \
  13. .setTensorCol("tensor") \
  14. .setLabelCol("label") \
  15. .setLayers([
  16. "Conv1D(256, 5, padding='same', activation='relu')",
  17. "Conv1D(128, 5, padding='same', activation='relu')",
  18. "Dropout(0.1)",
  19. "MaxPooling1D(pool_size=8)",
  20. "Conv1D(128, 5, padding='same', activation='relu')",
  21. "Conv1D(128, 5, padding='same', activation='relu')",
  22. "Flatten()"
  23. ]) \
  24. .setOptimizer("Adam()") \
  25. .setNumEpochs(1) \
  26. .linkFrom(source)
  27. predictStreamOp = KerasSequentialRegressorPredictStreamOp(trainBatchOp) \
  28. .setPredictionCol("pred") \
  29. .setReservedCols(["label"]) \
  30. .linkFrom(streamSource)
  31. predictStreamOp.print()
  32. StreamOperator.execute()

Java 代码

  1. import com.alibaba.alink.operator.batch.BatchOperator;
  2. import com.alibaba.alink.operator.batch.dataproc.ToTensorBatchOp;
  3. import com.alibaba.alink.operator.batch.regression.KerasSequentialRegressorTrainBatchOp;
  4. import com.alibaba.alink.operator.batch.source.CsvSourceBatchOp;
  5. import com.alibaba.alink.operator.stream.StreamOperator;
  6. import com.alibaba.alink.operator.stream.regression.KerasSequentialRegressorPredictStreamOp;
  7. import com.alibaba.alink.operator.stream.source.CsvSourceStreamOp;
  8. import org.junit.Test;
  9. public class KerasSequentialRegressorPredictStreamOpTest {
  10. @Test
  11. public void testKerasSequentialRegressorPredictStreamOp() throws Exception {
  12. BatchOperator <?> source = new CsvSourceBatchOp()
  13. .setFilePath("https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/random_tensor.csv")
  14. .setSchemaStr("tensor string, label double");
  15. source = new ToTensorBatchOp()
  16. .setSelectedCol("tensor")
  17. .setTensorDataType("DOUBLE")
  18. .setTensorShape(200, 3)
  19. .linkFrom(source);
  20. StreamOperator <?> streamSource = new CsvSourceStreamOp()
  21. .setFilePath("https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/random_tensor.csv")
  22. .setSchemaStr("tensor string, label double");
  23. KerasSequentialRegressorTrainBatchOp trainBatchOp = new KerasSequentialRegressorTrainBatchOp()
  24. .setTensorCol("tensor")
  25. .setLabelCol("label")
  26. .setLayers(new String[] {
  27. "Conv1D(256, 5, padding='same', activation='relu')",
  28. "Conv1D(128, 5, padding='same', activation='relu')",
  29. "Dropout(0.1)",
  30. "MaxPooling1D(pool_size=8)",
  31. "Conv1D(128, 5, padding='same', activation='relu')",
  32. "Conv1D(128, 5, padding='same', activation='relu')",
  33. "Flatten()"
  34. })
  35. .setOptimizer("Adam()")
  36. .setNumEpochs(1)
  37. .linkFrom(source);
  38. KerasSequentialRegressorPredictStreamOp predictStreamOp = new KerasSequentialRegressorPredictStreamOp(trainBatchOp)
  39. .setPredictionCol("pred")
  40. .setReservedCols("label")
  41. .linkFrom(streamSource);
  42. predictStreamOp.print();
  43. StreamOperator.execute();
  44. }
  45. }

运行结果

| label | pred | pred_detail | | —- | —- | —- |

| 0 | 0 | {“0”:0.636155836712713,”1”:0.36384416328728697} |

| 1 | 0 | {“0”:0.6334926095655181,”1”:0.3665073904344819} |

| 1 | 0 | {“0”:0.6381823204965642,”1”:0.3618176795034358} |

| 1 | 0 | {“0”:0.6376416296248051,”1”:0.362358370375195} |

| 1 | 0 | {“0”:0.6345856985385896,”1”:0.36541430146141035} |

| 1 | 0 | {“0”:0.6357593109428179,”1”:0.364240689057182} |

| 0 | 0 | {“0”:0.6404387449594703,”1”:0.3595612550405296} |

| 1 | 0 | {“0”:0.6372702905549685,”1”:0.36272970944503136} |

| 0 | 0 | {“0”:0.635502012172225,”1”:0.36449798782777487} |

| 0 | 0 | {“0”:0.6262401788033837,”1”:0.37375982119661644} |