Java 类名:com.alibaba.alink.operator.batch.dataproc.ImputerPredictBatchOp
Python 类名:ImputerPredictBatchOp

功能介绍

数据缺失值填充处理,批式预测组件
运行时需要指定缺失值模型,由ImputerTrainBatchOp产生。缺失值填充的4种策略,即最大值、最小值、均值、指定数值,在生成缺失值模型时指定。

参数说明

名称 中文名称 描述 类型 是否必须? 取值范围 默认值
modelFilePath 模型的文件路径 模型的文件路径 String null
outputCols 输出结果列列名数组 输出结果列列名数组,可选,默认null String[] null
numThreads 组件多线程线程个数 组件多线程线程个数 Integer 1

代码示例

Python 代码

  1. from pyalink.alink import *
  2. import pandas as pd
  3. useLocalEnv(1)
  4. df_data = pd.DataFrame([
  5. ["a", 10.0, 100],
  6. ["b", -2.5, 9],
  7. ["c", 100.2, 1],
  8. ["d", -99.9, 100],
  9. ["a", 1.4, 1],
  10. ["b", -2.2, 9],
  11. ["c", 100.9, 1],
  12. [None, None, None]
  13. ])
  14. colnames = ["col1", "col2", "col3"]
  15. selectedColNames = ["col2", "col3"]
  16. inOp = BatchOperator.fromDataframe(df_data, schemaStr='col1 string, col2 double, col3 double')
  17. # train
  18. trainOp = ImputerTrainBatchOp()\
  19. .setSelectedCols(selectedColNames)
  20. model = trainOp.linkFrom(inOp)
  21. # batch predict
  22. predictOp = ImputerPredictBatchOp()
  23. predictOp.linkFrom(model, inOp).print()
  24. # stream predict
  25. sinOp = StreamOperator.fromDataframe(df_data, schemaStr='col1 string, col2 double, col3 double')
  26. predictStreamOp = ImputerPredictStreamOp(model)
  27. predictStreamOp.linkFrom(sinOp).print()
  28. StreamOperator.execute()

Java 代码

  1. import org.apache.flink.types.Row;
  2. import com.alibaba.alink.operator.batch.BatchOperator;
  3. import com.alibaba.alink.operator.batch.dataproc.ImputerPredictBatchOp;
  4. import com.alibaba.alink.operator.batch.dataproc.ImputerTrainBatchOp;
  5. import com.alibaba.alink.operator.batch.source.MemSourceBatchOp;
  6. import com.alibaba.alink.operator.stream.StreamOperator;
  7. import com.alibaba.alink.operator.stream.dataproc.ImputerPredictStreamOp;
  8. import com.alibaba.alink.operator.stream.source.MemSourceStreamOp;
  9. import org.junit.Test;
  10. import java.util.Arrays;
  11. import java.util.List;
  12. public class ImputerPredictBatchOpTest {
  13. @Test
  14. public void testImputerPredictBatchOp() throws Exception {
  15. List <Row> df_data = Arrays.asList(
  16. Row.of("a", 10.0, 100),
  17. Row.of("b", -2.5, 9),
  18. Row.of("c", 100.2, 1),
  19. Row.of("d", -99.9, 100),
  20. Row.of("a", 1.4, 1),
  21. Row.of("b", -2.2, 9),
  22. Row.of("c", 100.9, 1),
  23. Row.of(null, null, null)
  24. );
  25. String[] selectedColNames = new String[] {"col2", "col3"};
  26. BatchOperator <?> inOp = new MemSourceBatchOp(df_data, "col1 string, col2 double, col3 int");
  27. BatchOperator <?> trainOp = new ImputerTrainBatchOp()
  28. .setSelectedCols(selectedColNames);
  29. BatchOperator model = trainOp.linkFrom(inOp);
  30. BatchOperator <?> predictOp = new ImputerPredictBatchOp();
  31. predictOp.linkFrom(model, inOp).print();
  32. StreamOperator <?> sinOp = new MemSourceStreamOp(df_data, "col1 string, col2 double, col3 int");
  33. StreamOperator <?> predictStreamOp = new ImputerPredictStreamOp(model);
  34. predictStreamOp.linkFrom(sinOp).print();
  35. StreamOperator.execute();
  36. }
  37. }

运行结果

| col1 | col2 | col3 | | —- | —- | —- |

| a | 10.000000 | 100 |

| b | -2.500000 | 9 |

| c | 100.200000 | 1 |

| d | -99.900000 | 100 |

| a | 1.400000 | 1 |

| b | -2.200000 | 9 |

| c | 100.900000 | 1 |

| null | 15.414286 | 31 |