Java 类名:com.alibaba.alink.operator.stream.outlier.EsdOutlierStreamOp
Python 类名:EsdOutlierStreamOp

功能介绍

  • Esd算法又叫做箱线图算法, 是一种常用的异常检测算法.

    参数说明

    | 名称 | 中文名称 | 描述 | 类型 | 是否必须? | 取值范围 | 默认值 | | —- | —- | —- | —- | —- | —- | —- |

| precedingTime | 时间窗口大小 | 时间窗口大小 | String | ✓ | | |

| predictionCol | 预测结果列名 | 预测结果列名 | String | ✓ | | |

| alpha | 置信度 | 置信度 | Double | | | 0.05 |

| direction | Not available! | Not available! | String | | “POSITIVE”, “NEGATIVE”, “BOTH” | “BOTH” |

| featureCol | 特征列名 | 特征列名,默认选最左边的列 | String | | 所选列类型为 [BIGDECIMAL, BIGINTEGER, BYTE, DOUBLE, FLOAT, INTEGER, LONG, SHORT] | null |

| groupCols | 分组列名数组 | 分组列名,多列,可选,默认不选 | String[] | | | null |

| maxIter | 最大迭代步数 | 最大迭代步数 | Integer | | | |

| outlierThreshold | 异常评分阈值 | 只有评分大于该阈值才会被认为是异常点 | Double | | | |

| precedingRows | 数据窗口大小 | 数据窗口大小 | Integer | | | null |

| predictionDetailCol | 预测详细信息列名 | 预测详细信息列名 | String | | | |

| timeCol | 时间戳列(TimeStamp) | 时间戳列(TimeStamp) | String | | | null |

| numThreads | 组件多线程线程个数 | 组件多线程线程个数 | Integer | | | 1 |

代码示例

Python 代码

  1. from pyalink.alink import *
  2. import pandas as pd
  3. useLocalEnv(1)
  4. import time, datetime
  5. import numpy as np
  6. import pandas as pd
  7. data = pd.DataFrame([
  8. [1, datetime.datetime.fromtimestamp(1), 10.0, 0],
  9. [1, datetime.datetime.fromtimestamp(2), 11.0, 0],
  10. [1, datetime.datetime.fromtimestamp(3), 12.0, 0],
  11. [1, datetime.datetime.fromtimestamp(4), 13.0, 0],
  12. [1, datetime.datetime.fromtimestamp(5), 14.0, 0],
  13. [1, datetime.datetime.fromtimestamp(6), 15.0, 0],
  14. [1, datetime.datetime.fromtimestamp(7), 16.0, 0],
  15. [1, datetime.datetime.fromtimestamp(8), 17.0, 0],
  16. [1, datetime.datetime.fromtimestamp(9), 18.0, 0],
  17. [1, datetime.datetime.fromtimestamp(10), 19.0, 0]
  18. ])
  19. dataOp = dataframeToOperator(data, schemaStr='id int, ts timestamp, val double, label int', op_type='stream')
  20. outlierOp = EsdOutlierStreamOp()\
  21. .setGroupCols(["id"])\
  22. .setTimeCol("ts")\
  23. .setPrecedingRows(3)\
  24. .setFeatureCol("val")\
  25. .setPredictionCol("pred")\
  26. .setPredictionDetailCol("pred_detail")
  27. dataOp.link(outlierOp).print()
  28. StreamOperator.execute()

Java 代码

  1. package com.alibaba.alink.operator.stream.outlier;
  2. import org.apache.flink.types.Row;
  3. import com.alibaba.alink.operator.stream.StreamOperator;
  4. import com.alibaba.alink.operator.stream.sink.CollectSinkStreamOp;
  5. import com.alibaba.alink.operator.stream.source.MemSourceStreamOp;
  6. import junit.framework.TestCase;
  7. import org.junit.Assert;
  8. import org.junit.Test;
  9. import java.sql.Timestamp;
  10. import java.util.Arrays;
  11. import java.util.List;
  12. public class EsdOutlierStreamOpTest extends TestCase {
  13. @Test
  14. public void test() throws Exception {
  15. List <Row> mTableData = Arrays.asList(
  16. Row.of(1, new Timestamp(1), 10.0, 0),
  17. Row.of(1, new Timestamp(2), 11.0, 0),
  18. Row.of(1, new Timestamp(3), 12.0, 0),
  19. Row.of(1, new Timestamp(4), 13.0, 0),
  20. Row.of(1, new Timestamp(5), 14.0, 0),
  21. Row.of(1, new Timestamp(6), 15.0, 0),
  22. Row.of(1, new Timestamp(7), 16.0, 0),
  23. Row.of(1, new Timestamp(8), 17.0, 0),
  24. Row.of(1, new Timestamp(9), 18.0, 0),
  25. Row.of(1, new Timestamp(10), 19.0, 0)
  26. );
  27. MemSourceStreamOp dataOp = new MemSourceStreamOp(mTableData, new String[] {"id", "ts", "val", "label"});
  28. EsdOutlierStreamOp outlierOp = new EsdOutlierStreamOp()
  29. .setGroupCols("id")
  30. .setTimeCol("ts")
  31. .setPrecedingRows(3)
  32. .setFeatureCol("val")
  33. .setPredictionCol("pred")
  34. .setPredictionDetailCol("pred_detail");
  35. dataOp.link(outlierOp).print();
  36. StreamOperator.execute();
  37. }
  38. }

运行结果