Java 类名:com.alibaba.alink.operator.stream.feature.OneHotPredictStreamOp
Python 类名:OneHotPredictStreamOp

功能介绍

one-hot编码,也称独热编码,对于每一个特征,如果它有m个可能值,那么经过 独热编码后,就变成了m个二元特征。并且,这些特征互斥,每次只有一个激活。 因此,数据会变成稀疏的,输出结果也是kv的稀疏结构。
组件为独热编码的流式预测组件。

编码结果

输入

| selectedCol0 | selectedCol1 | | —- | —- |

| a | 1 |

| b | 1 |

| c | 1 |

| d | 2 |

| a | 2 |

| b | 2 |

| c | 2 |

| e | null |

| NULL | 2 |

Encode ——> INDEX

预测结果为单个token的index,如0, 1, 2 …

Encode ——> VECTOR

预测结果为稀疏向量:

  1. 1. dropLasttrue,向量中非零元个数为0或者1, $5, $5$0:1.0或者NULL
  2. 2. dropLastfalse,向量中非零元个数必定为1, 只能是$5$0:1.0或者NULL
Encode ——> ASSEMBLED_VECTOR

预测结果为稀疏向量,是预测选择列中,各列预测为VECTOR时,按照选择顺序ASSEMBLE的结果。

向量维度

Encode ——> Vector
  1. distinct token Number: 训练集中指定列的去重后的token数目
  2. dropLast: 预测参数
  3. enableElse: 训练时若填写discreteThresholdsdiscreteThresholdsArray则为true,默认为false
  4. handleInvalid: 预测参数
举例

输入列为selectedCol0

  1. 1. 如果没有填写discreteThresholds,那么enableElsefalsedistinct token Number为(a,b,c,d,e)一共5token
  2. 1.1 dropLastTrue
  3. 1.1.1 handleInvalidkeep: vectorSize=(5 - 1 + 0 + 1 = 5)
  4. 1.1.2 handleInvalidskip: vectorSize=(5 - 1 + 0 + 0 = 4)
  5. 1.1.3 handleInvaliderror: vectorSize=(5 - 1 + 0 + 0 = 4)
  6. 1.2 dropLastFalse
  7. 1.1.1 handleInvalidkeep: vectorSize=(5 - 0 + 0 + 1 = 6)
  8. 1.2.2 handleInvalidskip: vectorSize=(5 - 0 + 0 + 0 = 5)
  9. 1.2.3 handleInvaliderror: vectorSize=(5 - 0 + 0 + 0 = 5)
  10. 2. 如果discreteThresholds2, 那么enableElsetrue, distinct token Number为(a,b,c)一共3token
  11. 2.1 dropLastTrue
  12. 1.1.1 handleInvalidkeep: vectorSize=(3 - 1 + 1 + 1 = 4)
  13. 1.1.2 handleInvalidskip: vectorSize=(3 - 1 + 1 + 0 = 3)
  14. 1.1.3 handleInvaliderror: vectorSize=(3 - 1 + 1 + 0 = 3)
  15. 2.2 dropLastFalse
  16. 1.1.1 handleInvalidkeep: vectorSize=(3 - 0 + 1 + 1 = 5)
  17. 1.2.2 handleInvalidskip: vectorSize=(3 - 0 + 1 + 0 = 4)
  18. 1.2.3 handleInvaliderror: vectorSize=(3 - 0 + 1 + 0 = 4)

Token index

Encode ——> Vector
  1. 1. 训练集中出现过的token: 预测值为模型中token对应的token_index,若 dropLasttrue, token_index最大的值会被丢掉,预测结果为全零元
  2. 2. null:
  3. 2.1 handleInvalidkeep: 预测值为distinct token Number - dropLast(true: 1, false: 0)
  4. 2.2 handleInvalidskip: null
  5. 2.3 handleInvaliderror: 报错
  6. 3. 训练集中未出现过的token:
  7. 3.1 enableElsetrue
  8. 3.1.1 handleInvalidkeep: 预测值为:distinct token Number - dropLast(true: 1, false: 0) + 1
  9. 3.1.2 handleInvalidskip: 预测值为:distinct token Number - dropLast(true: 1, false: 0)
  10. 3.1.3 handleInvaliderror: 预测值为:distinct token Number - dropLast(true: 1, false: 0)
  11. 3.2 enableElsefalse
  12. 3.2.1 handleInvalidkeep: 预测值为:distinct token Number - dropLast(true: 1, false: 0)
  13. 3.2.2 handleInvalidskip: null
  14. 3.2.3 handleInvaliderror: 报错
举例

输入列为selectedCol0

  1. 如果没有填写discreteThresholds
    假设模型中a,b,c,d,e对应的token index为0,1,2,3,4
    1.1 dropLast为True
    1.1.1 handleInvalid为keep

| selectedCol0 | Encode为INDEX的输出 | Encode为VECTOR的输出 | | —- | —- | —- |

| a | 0 | $5$0:1.0 |

| b | 1 | $5$1:1.0 |

| c | 2 | $5$2:1.0 |

| d | 3 | $5$3:1.0 |

| e | 4 (Encode为IDNEX时,dropLast不起作用) | (最大的token index被drop了) |

| NULL | 5 | $5$4:1.0 |

1.1.2 handleInvalid为skip: vectorSize=(5 - 1 + 0 + 0 = 4)

| selectedCol0 | Encode为INDEX的输出 | Encode为VECTOR的输出 | | —- | —- | —- |

| a | 0 | $4$0:1.0 |

| b | 1 | $4$1:1.0 |

| c | 2 | $4$2:1.0 |

| d | 3 | $4$3:1.0 |

| e | 4 (Encode为IDNEX时,dropLast不起作用) | (最大的token index被drop了) |

| NULL | NULL | NULL |

1.1.3 handleInvalid为error: 直接报错

  1. 1.2 dropLastFalse
  2. 1.1.1 handleInvalidkeep

| selectedCol0 | Encode为INDEX的输出 | Encode为VECTOR的输出 | | —- | —- | —- |

| a | 0 | $6$0:1.0 |

| b | 1 | $6$1:1.0 |

| c | 2 | $6$2:1.0 |

| d | 3 | $6$3:1.0 |

| e | 4 (Encode为IDNEX时,dropLast不起作用) | $6$4:1.0 |

| NULL | 5 | $6$5:1.0 |

1.2.2 handleInvalid为skip

| selectedCol0 | Encode为INDEX的输出 | Encode为VECTOR的输出 | | —- | —- | —- |

| a | 0 | $5$0:1.0 |

| b | 1 | $5$1:1.0 |

| c | 2 | $5$2:1.0 |

| d | 3 | $5$3:1.0 |

| e | 4 (Encode为IDNEX时,dropLast不起作用) | $5$4:1.0 |

| NULL | NULL | NULL |

  1. 1.2.3 handleInvaliderror: 直接报错
  1. 如果discreteThresholds为2
    假设模型中a,b,c对应的token index为0,1,2
    2.1 dropLast为True
    1.1.1 handleInvalid为keep:

| selectedCol0 | Encode为INDEX的输出 | Encode为VECTOR的输出 | | —- | —- | —- |

| a | 0 | $4$0:1.0 |

| b | 1 | $4$1:1.0 |

| c | 2 | (最大的token index被drop了) |

| d | 4 | $4$3:1.0 (unknown token) |

| e | 4 | $4$3:1.0 (unknown token) |

| NULL | 3 | $4$2:1.0 |

1.1.2 handleInvalid为skip:

| selectedCol0 | Encode为INDEX的输出 | Encode为VECTOR的输出 | | —- | —- | —- |

| a | 0 | $3$0:1.0 |

| b | 1 | $3$1:1.0 |

| c | 2 | (最大的token index被drop了) |

| d | 3 | $3$2:1.0 (unknown token) |

| e | 3 | $4$2:1.0 (unknown token) |

| NULL | NULL | NULL |

  1. 1.1.3 handleInvaliderror: 直接报错

2.2 dropLast为False
1.1.1 handleInvalid为keep:

| selectedCol0 | Encode为INDEX的输出 | Encode为VECTOR的输出 | | —- | —- | —- |

| a | 0 | $5$0:1.0 |

| b | 1 | $5$1:1.0 |

| c | 2 | $5$2:1.0 |

| d | 4 | $5$4:1.0 (unknown token) |

| e | 4 | $5$4:1.0 (unknown token) |

| NULL | 3 | $5$3:1.0 |

1.2.2 handleInvalid为skip:

| selectedCol0 | Encode为INDEX的输出 | Encode为VECTOR的输出 | | —- | —- | —- |

| a | 0 | $4$0:1.0 |

| b | 1 | $4$1:1.0 |

| c | 2 | $4$2:1.0 |

| d | 3 | $4$3:1.0 (unknown token) |

| e | 3 | $4$3:1.0 (unknown token) |

| NULL | NULL | NULL |

1.2.3 handleInvalid为error: 直接报错

参数说明

| 名称 | 中文名称 | 描述 | 类型 | 是否必须? | 取值范围 | 默认值 | | —- | —- | —- | —- | —- | —- | —- |

| selectedCols | 选择的列名 | 计算列对应的列名列表 | String[] | ✓ | | |

| dropLast | 是否删除最后一个元素 | 删除最后一个元素是为了保证线性无关性。默认true | Boolean | | | true |

| encode | 编码方法 | 编码方法 | String | | “VECTOR”, “ASSEMBLED_VECTOR”, “INDEX” | “ASSEMBLED_VECTOR” |

| handleInvalid | 未知token处理策略 | 未知token处理策略。”keep”表示用最大id加1代替, “skip”表示补null, “error”表示抛异常 | String | | “KEEP”, “ERROR”, “SKIP” | “KEEP” |

| modelFilePath | 模型的文件路径 | 模型的文件路径 | String | | | null |

| outputCols | 输出结果列列名数组 | 输出结果列列名数组,可选,默认null | String[] | | | null |

| reservedCols | 算法保留列名 | 算法保留列 | String[] | | | null |

| numThreads | 组件多线程线程个数 | 组件多线程线程个数 | Integer | | | 1 |

| modelStreamFilePath | 模型流的文件路径 | 模型流的文件路径 | String | | | null |

| modelStreamScanInterval | 扫描模型路径的时间间隔 | 描模型路径的时间间隔,单位秒 | Integer | | | 10 |

| modelStreamStartTime | 模型流的起始时间 | 模型流的起始时间。默认从当前时刻开始读。使用yyyy-mm-dd hh:mm:ss.fffffffff格式,详见Timestamp.valueOf(String s) | String | | | null |

代码示例

Python 代码

  1. from pyalink.alink import *
  2. import pandas as pd
  3. useLocalEnv(1)
  4. df = pd.DataFrame([
  5. ["a", 1],
  6. ["b", 1],
  7. ["c", 1],
  8. ["e", 2],
  9. ["a", 2],
  10. ["b", 1],
  11. ["c", 2],
  12. ["d", 2],
  13. [None, 1]
  14. ])
  15. inOp = BatchOperator.fromDataframe(df, schemaStr='query string, weight long')
  16. sinOp = StreamOperator.fromDataframe(df, schemaStr='query string, weight long')
  17. # one hot train
  18. one_hot = OneHotTrainBatchOp().setSelectedCols(["query"])
  19. model = inOp.link(one_hot)
  20. model.lazyPrint(10)
  21. # batch predict
  22. predictor = OneHotPredictBatchOp().setOutputCols(["output"])
  23. predictor.linkFrom(model, inOp).print()
  24. # stream predict
  25. spredictor = OneHotPredictStreamOp(model).setOutputCols(["output"])
  26. spredictor.linkFrom(sinOp).print()
  27. StreamOperator.execute()

Java 代码

  1. import org.apache.flink.types.Row;
  2. import com.alibaba.alink.operator.batch.BatchOperator;
  3. import com.alibaba.alink.operator.batch.feature.OneHotPredictBatchOp;
  4. import com.alibaba.alink.operator.batch.feature.OneHotTrainBatchOp;
  5. import com.alibaba.alink.operator.batch.source.MemSourceBatchOp;
  6. import com.alibaba.alink.operator.stream.StreamOperator;
  7. import com.alibaba.alink.operator.stream.feature.OneHotPredictStreamOp;
  8. import com.alibaba.alink.operator.stream.source.MemSourceStreamOp;
  9. import org.junit.Test;
  10. import java.util.Arrays;
  11. import java.util.List;
  12. public class OneHotPredictStreamOpTest {
  13. @Test
  14. public void testOneHotPredictStreamOp() throws Exception {
  15. List <Row> df = Arrays.asList(
  16. Row.of("a", 1),
  17. Row.of("b", 1),
  18. Row.of("c", 1),
  19. Row.of("e", 2),
  20. Row.of("a", 2),
  21. Row.of("b", 1),
  22. Row.of("c", 2),
  23. Row.of("d", 2),
  24. Row.of(null, 1)
  25. );
  26. BatchOperator <?> inOp = new MemSourceBatchOp(df, "query string, weight int");
  27. StreamOperator <?> sinOp = new MemSourceStreamOp(df, "query string, weight int");
  28. BatchOperator <?> one_hot = new OneHotTrainBatchOp().setSelectedCols("query");
  29. BatchOperator <?> model = inOp.link(one_hot);
  30. model.lazyPrint(10);
  31. BatchOperator <?> predictor = new OneHotPredictBatchOp().setOutputCols("output");
  32. predictor.linkFrom(model, inOp).print();
  33. StreamOperator <?> spredictor = new OneHotPredictStreamOp(model).setOutputCols("output");
  34. spredictor.linkFrom(sinOp).print();
  35. StreamOperator.execute();
  36. }
  37. }

运行结果

| column_index | token | token_index | | —- | —- | —- |

| -1 | {“selectedCols”:”[“query”]”,”selectedColTypes”:”[“VARCHAR”]”,”enableElse”:”false”} | null |

| 0 | a | 0 |

| 0 | b | 1 |

| 0 | c | 2 |

| 0 | d | 3 |

| 0 | e | 4 |

| query | weight | output | | —- | —- | —- |

| a | 1 | $5$0:1.0 |

| b | 1 | $5$1:1.0 |

| c | 1 | $5$2:1.0 |

| e | 2 | |

| a | 2 | $5$0:1.0 |

| b | 1 | $5$1:1.0 |

| c | 2 | $5$2:1.0 |

| d | 2 | $5$3:1.0 |

| null | 1 | $5$4:1.0 |

| query | weight | output | | —- | —- | —- |

| a | 1 | $5$0:1.0 |

| e | 2 | |

| d | 2 | $5$3:1.0 |

| b | 1 | $5$1:1.0 |

| c | 1 | $5$2:1.0 |

| c | 2 | $5$2:1.0 |

| a | 2 | $5$0:1.0 |

| null | 1 | $5$4:1.0 |

| b | 1 | $5$1:1.0 |