Java 类名:com.alibaba.alink.operator.batch.dataproc.ImputerTrainBatchOp
Python 类名:ImputerTrainBatchOp

功能介绍

数据缺失值模型训练
缺失值填充支持4种策略,最大值、最小值、均值、指定数值。当策略为指定数值时,需要设置参数fillValue。
模型生成后处理其他数据参考ImputerPredictBatchOp

参数说明

名称 中文名称 描述 类型 是否必须? 取值范围 默认值
selectedCols 选择的列名 计算列对应的列名列表 String[] 所选列类型为 [BIGDECIMAL, BIGINTEGER, BYTE, DOUBLE, FLOAT, INTEGER, LONG, SHORT]
fillValue 填充缺失值 自定义的填充值。当strategy为value时,读取fillValue的值 String null
strategy 缺失值填充规则 缺失值填充的规则,支持mean,max,min或者value。选择value时,需要读取fillValue的值 String “MEAN”, “MIN”, “MAX”, “VALUE” “MEAN”

代码示例

Python 代码

  1. from pyalink.alink import *
  2. import pandas as pd
  3. useLocalEnv(1)
  4. df_data = pd.DataFrame([
  5. ["a", 10.0, 100],
  6. ["b", -2.5, 9],
  7. ["c", 100.2, 1],
  8. ["d", -99.9, 100],
  9. ["a", 1.4, 1],
  10. ["b", -2.2, 9],
  11. ["c", 100.9, 1],
  12. [None, None, None]
  13. ])
  14. colnames = ["col1", "col2", "col3"]
  15. selectedColNames = ["col2", "col3"]
  16. inOp = BatchOperator.fromDataframe(df_data, schemaStr='col1 string, col2 double, col3 double')
  17. # train
  18. trainOp = ImputerTrainBatchOp()\
  19. .setSelectedCols(selectedColNames)
  20. model = trainOp.linkFrom(inOp)
  21. # batch predict
  22. predictOp = ImputerPredictBatchOp()
  23. predictOp.linkFrom(model, inOp).print()
  24. # stream predict
  25. sinOp = StreamOperator.fromDataframe(df_data, schemaStr='col1 string, col2 double, col3 double')
  26. predictStreamOp = ImputerPredictStreamOp(model)
  27. predictStreamOp.linkFrom(sinOp).print()
  28. StreamOperator.execute()

Java 代码

  1. import org.apache.flink.types.Row;
  2. import com.alibaba.alink.operator.batch.BatchOperator;
  3. import com.alibaba.alink.operator.batch.dataproc.ImputerPredictBatchOp;
  4. import com.alibaba.alink.operator.batch.dataproc.ImputerTrainBatchOp;
  5. import com.alibaba.alink.operator.batch.source.MemSourceBatchOp;
  6. import com.alibaba.alink.operator.stream.StreamOperator;
  7. import com.alibaba.alink.operator.stream.dataproc.ImputerPredictStreamOp;
  8. import com.alibaba.alink.operator.stream.source.MemSourceStreamOp;
  9. import org.junit.Test;
  10. import java.util.Arrays;
  11. import java.util.List;
  12. public class ImputerTrainBatchOpTest {
  13. @Test
  14. public void testImputerTrainBatchOp() throws Exception {
  15. List <Row> df_data = Arrays.asList(
  16. Row.of("a", 10.0, 100),
  17. Row.of("b", -2.5, 9),
  18. Row.of("c", 100.2, 1),
  19. Row.of("d", -99.9, 100),
  20. Row.of("a", 1.4, 1),
  21. Row.of("b", -2.2, 9),
  22. Row.of("c", 100.9, 1),
  23. Row.of(null, null, null)
  24. );
  25. String[] selectedColNames = new String[] {"col2", "col3"};
  26. BatchOperator <?> inOp = new MemSourceBatchOp(df_data, "col1 string, col2 double, col3 int");
  27. BatchOperator <?> trainOp = new ImputerTrainBatchOp()
  28. .setSelectedCols(selectedColNames);
  29. BatchOperator model = trainOp.linkFrom(inOp);
  30. BatchOperator <?> predictOp = new ImputerPredictBatchOp();
  31. predictOp.linkFrom(model, inOp).print();
  32. StreamOperator <?> sinOp = new MemSourceStreamOp(df_data, "col1 string, col2 double, col3 int");
  33. StreamOperator <?> predictStreamOp = new ImputerPredictStreamOp(model);
  34. predictStreamOp.linkFrom(sinOp).print();
  35. StreamOperator.execute();
  36. }
  37. }

运行结果

| col1 | col2 | col3 | | —- | —- | —- |

| a | 10.000000 | 100 |

| b | -2.500000 | 9 |

| c | 100.200000 | 1 |

| d | -99.900000 | 100 |

| a | 1.400000 | 1 |

| b | -2.200000 | 9 |

| c | 100.900000 | 1 |

| null | 15.414286 | 31 |