Java 类名:com.alibaba.alink.operator.stream.outlier.IForestOutlierStreamOp
Python 类名:IForestOutlierStreamOp

功能介绍

iForest 可以识别数据中异常点,在异常检测领域有比较好的效果。算法使用 sub-sampling 方法,降低了算法的计算复杂度。

文献或出处

  1. Isolation Forest

    参数说明

    | 名称 | 中文名称 | 描述 | 类型 | 是否必须? | 取值范围 | 默认值 | | —- | —- | —- | —- | —- | —- | —- |

| precedingTime | 时间窗口大小 | 时间窗口大小 | String | ✓ | | |

| predictionCol | 预测结果列名 | 预测结果列名 | String | ✓ | | |

| featureCols | 特征列名数组 | 特征列名数组,默认全选 | String[] | | 所选列类型为 [BIGDECIMAL, BIGINTEGER, BYTE, DOUBLE, FLOAT, INTEGER, LONG, SHORT] | null |

| groupCols | 分组列名数组 | 分组列名,多列,可选,默认不选 | String[] | | | null |

| numTrees | 模型中树的棵数 | 模型中树的棵数 | Integer | | | 100 |

| outlierThreshold | 异常评分阈值 | 只有评分大于该阈值才会被认为是异常点 | Double | | | |

| precedingRows | 数据窗口大小 | 数据窗口大小 | Integer | | | null |

| predictionDetailCol | 预测详细信息列名 | 预测详细信息列名 | String | | | |

| subsamplingSize | 每棵树的样本采样行数 | 每棵树的样本采样行数,默认 256 ,最小 2 ,最大 100000 . | Integer | | [1, 100000] | 256 |

| tensorCol | tensor列 | tensor列 | String | | 所选列类型为 [BOOL_TENSOR, BYTE_TENSOR, DOUBLE_TENSOR, FLOAT_TENSOR, INT_TENSOR, LONG_TENSOR, STRING, STRING_TENSOR, TENSOR, UBYTE_TENSOR] | null |

| timeCol | 时间戳列(TimeStamp) | 时间戳列(TimeStamp) | String | | | null |

| vectorCol | 向量列名 | 向量列对应的列名,默认值是null | String | | 所选列类型为 [DENSE_VECTOR, SPARSE_VECTOR, STRING, VECTOR] | null |

| numThreads | 组件多线程线程个数 | 组件多线程线程个数 | Integer | | | 1 |

代码示例

Python 代码

  1. import time, datetime
  2. import numpy as np
  3. import pandas as pd
  4. data = pd.DataFrame([
  5. [1, datetime.datetime.fromtimestamp(1), 10.0, 0],
  6. [1, datetime.datetime.fromtimestamp(2), 11.0, 0],
  7. [1, datetime.datetime.fromtimestamp(3), 12.0, 0],
  8. [1, datetime.datetime.fromtimestamp(4), 13.0, 0],
  9. [1, datetime.datetime.fromtimestamp(5), 14.0, 0],
  10. [1, datetime.datetime.fromtimestamp(6), 15.0, 0],
  11. [1, datetime.datetime.fromtimestamp(7), 16.0, 0],
  12. [1, datetime.datetime.fromtimestamp(8), 17.0, 0],
  13. [1, datetime.datetime.fromtimestamp(9), 18.0, 0],
  14. [1, datetime.datetime.fromtimestamp(10), 19.0, 0]
  15. ])
  16. dataOp = dataframeToOperator(data, schemaStr='id int, ts timestamp, val double, label int', op_type='stream')
  17. outlierOp = IForestOutlierStreamOp()\
  18. .setGroupCols(["id"])\
  19. .setTimeCol("ts")\
  20. .setPrecedingRows(3)\
  21. .setFeatureCols(["val"])\
  22. .setPredictionCol("pred")\
  23. .setPredictionDetailCol("pred_detail")
  24. dataOp.link(outlierOp).print()
  25. StreamOperator.execute()

Java 代码

  1. package com.alibaba.alink.operator.stream.outlier;
  2. import org.apache.flink.types.Row;
  3. import com.alibaba.alink.operator.stream.StreamOperator;
  4. import com.alibaba.alink.operator.stream.source.MemSourceStreamOp;
  5. import com.alibaba.alink.testutil.AlinkTestBase;
  6. import org.junit.Test;
  7. import java.sql.Timestamp;
  8. import java.util.Arrays;
  9. import java.util.List;
  10. public class IForestOutlierStreamOpTest extends AlinkTestBase {
  11. @Test
  12. public void test() throws Exception {
  13. List <Row> mTableData = Arrays.asList(
  14. Row.of(1, new Timestamp(1), 10.0, 0),
  15. Row.of(1, new Timestamp(2), 11.0, 0),
  16. Row.of(1, new Timestamp(3), 12.0, 0),
  17. Row.of(1, new Timestamp(4), 13.0, 0),
  18. Row.of(1, new Timestamp(5), 14.0, 0),
  19. Row.of(1, new Timestamp(6), 15.0, 0),
  20. Row.of(1, new Timestamp(7), 16.0, 0),
  21. Row.of(1, new Timestamp(8), 17.0, 0),
  22. Row.of(1, new Timestamp(9), 18.0, 0),
  23. Row.of(1, new Timestamp(10), 19.0, 0)
  24. );
  25. MemSourceStreamOp dataOp = new MemSourceStreamOp(mTableData, new String[] {"id", "ts", "val", "label"});
  26. IForestOutlierStreamOp outlierOp = new IForestOutlierStreamOp()
  27. .setGroupCols("id")
  28. .setTimeCol("ts")
  29. .setPrecedingRows(3)
  30. .setFeatureCols("val")
  31. .setPredictionCol("pred")
  32. .setPredictionDetailCol("pred_detail");
  33. dataOp.link(outlierOp).print();
  34. StreamOperator.execute();
  35. }
  36. }

运行结果