Java 类名:com.alibaba.alink.operator.batch.regression.KerasSequentialRegressorTrainBatchOp
Python 类名:KerasSequentialRegressorTrainBatchOp

功能介绍

构建一个 Keras 的 Sequential 模型
训练回归模型。
通过 layers 参数指定构成 Sequential 模型的网络层,Alink 会自动在最开始添加 Input 层,在最后添加 Dense 层和激活层,得到完整的模型用于训练。
指定 layers 参数时,使用的是 Python 语句,例如

  1. "Conv1D(256, 5, padding='same', activation='relu')",
  2. "Conv1D(128, 5, padding='same', activation='relu')",
  3. "Dropout(0.1)",
  4. "MaxPooling1D(pool_size=8)",
  5. "Conv1D(128, 5, padding='same', activation='relu')",
  6. "Conv1D(128, 5, padding='same', activation='relu')",
  7. "Flatten()"

tf.keras.layers 内的网络层已经提前 import,可以直接使用。
使用的 TensorFlow 版本是 2.3.1。
该组件可以接 KerasSequentialRegressorPredictBatchOpKerasSequentialRegressorPredictStreamOp 进行推理。

参数说明

名称 中文名称 描述 类型 是否必须? 取值范围 默认值
labelCol 标签列名 输入表中的标签列名 String
layers 各 layer 的描述 各 layer 的描述,使用 Python 语法,例如 “Conv1D(256, 5, padding=’same’, activation=’relu’)” String[]
tensorCol tensor列 tensor列 String [DENSE_VECTOR, SPARSE_VECTOR, STRING, VECTOR]
batchSize 数据批大小 数据批大小 Integer
bestMetric 最优指标 判断模型最优时用的指标,仅在总并发度为 1 时起作用。都支持的有:loss;二分类还支持:auc, precision, recall, binary_accuracy, false_negatives, false_positives, true_negatives, true_positives;多分类还支持:sparse_categorical_accuracy;回归还支持:mean_absolute_error, mean_absolute_percentage_error, mean_squared_error, mean_squared_logarithmic_error, root_mean_squared_error String
checkpointFilePath 保存 checkpoint 的路径 用于保存中间结果的路径,将作为 TensorFlow 中 Estimatormodel_dir 传入,需要为所有 worker 都能访问到的目录 String
intraOpParallelism Op 间并发度 Op 间并发度 Integer
learningRate 学习率 学习率 Double
numEpochs epoch数 epoch数 Integer
numPSs PS 角色数 PS 角色的数量。值未设置时,如果 Worker 角色数也未设置,则为作业总并发度的 1/4(需要取整),否则为总并发度减去 Worker 角色数。 Integer
numWorkers Worker 角色数 Worker 角色的数量。值未设置时,如果 PS

代码示例

以下代码仅用于示意,可能需要修改部分代码或者配置环境后才能正常运行!

Python 代码

  1. source = CsvSourceBatchOp() \
  2. .setFilePath("https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/random_tensor.csv") \
  3. .setSchemaStr("tensor string, label double")
  4. source = ToTensorBatchOp() \
  5. .setSelectedCol("tensor") \
  6. .setTensorDataType("DOUBLE") \
  7. .setTensorShape([200, 3]) \
  8. .linkFrom(source)
  9. trainBatchOp = KerasSequentialRegressorTrainBatchOp() \
  10. .setTensorCol("tensor") \
  11. .setLabelCol("label") \
  12. .setLayers([
  13. "Conv1D(256, 5, padding='same', activation='relu')",
  14. "Conv1D(128, 5, padding='same', activation='relu')",
  15. "Dropout(0.1)",
  16. "MaxPooling1D(pool_size=8)",
  17. "Conv1D(128, 5, padding='same', activation='relu')",
  18. "Conv1D(128, 5, padding='same', activation='relu')",
  19. "Flatten()"
  20. ]) \
  21. .setOptimizer("Adam()") \
  22. .setNumEpochs(1) \
  23. .linkFrom(source)
  24. predictBatchOp = KerasSequentialRegressorPredictBatchOp() \
  25. .setPredictionCol("pred") \
  26. .setReservedCols(["label"]) \
  27. .linkFrom(trainBatchOp, source)
  28. predictBatchOp.lazyPrint(10)
  29. BatchOperator.execute()

Java 代码

  1. import com.alibaba.alink.operator.batch.BatchOperator;
  2. import com.alibaba.alink.operator.batch.dataproc.ToTensorBatchOp;
  3. import com.alibaba.alink.operator.batch.regression.KerasSequentialRegressorPredictBatchOp;
  4. import com.alibaba.alink.operator.batch.regression.KerasSequentialRegressorTrainBatchOp;
  5. import com.alibaba.alink.operator.batch.source.CsvSourceBatchOp;
  6. import org.junit.Test;
  7. public class KerasSequentialRegressorTrainBatchOpTest {
  8. @Test
  9. public void testKerasSequentialRegressorTrainBatchOp() throws Exception {
  10. BatchOperator<?> source = new CsvSourceBatchOp()
  11. .setFilePath("https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/random_tensor.csv")
  12. .setSchemaStr("tensor string, label double");
  13. source = new ToTensorBatchOp()
  14. .setSelectedCol("tensor")
  15. .setTensorDataType("DOUBLE")
  16. .setTensorShape(200, 3)
  17. .linkFrom(source);
  18. KerasSequentialRegressorTrainBatchOp trainBatchOp = new KerasSequentialRegressorTrainBatchOp()
  19. .setTensorCol("tensor")
  20. .setLabelCol("label")
  21. .setLayers(new String[] {
  22. "Conv1D(256, 5, padding='same', activation='relu')",
  23. "Conv1D(128, 5, padding='same', activation='relu')",
  24. "Dropout(0.1)",
  25. "MaxPooling1D(pool_size=8)",
  26. "Conv1D(128, 5, padding='same', activation='relu')",
  27. "Conv1D(128, 5, padding='same', activation='relu')",
  28. "Flatten()"
  29. })
  30. .setOptimizer("Adam()")
  31. .setNumEpochs(1)
  32. .linkFrom(source);
  33. KerasSequentialRegressorPredictBatchOp predictBatchOp = new KerasSequentialRegressorPredictBatchOp()
  34. .setPredictionCol("pred")
  35. .setReservedCols("label")
  36. .linkFrom(trainBatchOp, source);
  37. predictBatchOp.lazyPrint(10);
  38. BatchOperator.execute();
  39. }
  40. }

运行结果

label pred
1.0000 0.4822
0.0000 0.4826
0.0000 0.4752
0.0000 0.4702
1.0000 0.4907
1.0000 0.4992
0.0000 0.4866
1.0000 0.5045
0.0000 0.4994
1.0000 0.4837