Java 类名:com.alibaba.alink.operator.stream.timeseries.HoltWintersStreamOp
Python 类名:HoltWintersStreamOp

功能介绍

给定分组,对每一组的数据使用HoltWinters进行时间序列预测。

使用方式

参考文档 https://www.yuque.com/pinshu/alink_guide/xbp5ky

算法原理

HoltWinters由Holt和Winters提出的三次指数平滑算法,又称holt-winters,
HoltWinters 详细介绍请见链接 https://en.wikipedia.org/wiki/Exponential_smoothing
holt-winters支持2种季节类型: additive 和 multiplicative

  • additive seasonal holt-winters

image

  • multiplicative seasonal holt_winters

image

  • 其中,

  • smoothValue(l、b、s)分别表示level,trend,seasonal

  • smoothParameter(α、β、γ)分别表示alpha,beta,gamma
  • t表示当前时刻,h表示要预测h步
  • p表示period或frequency,时间序列的周期

参数说明

| 名称 | 中文名称 | 描述 | 类型 | 是否必须? | 取值范围 | 默认值 | | —- | —- | —- | —- | —- | —- | —- |

| predictionCol | 预测结果列名 | 预测结果列名 | String | ✓ | | |

| valueCol | value列,类型为MTable | value列,类型为MTable | String | ✓ | 所选列类型为 [M_TABLE] | |

| alpha | alpha | alpha | Double | | [0.0, 1.0] | 0.3 |

| beta | beta | beta | Double | | [0.0, 1.0] | 0.1 |

| doSeasonal | 时间是否具有季节性 | 时间是否具有季节性 | Boolean | | | false |

| doTrend | 时间是否具有趋势性 | 时间是否具有趋势性 | Boolean | | | false |

| frequency | 时序频率 | 时序频率 | Integer | | [1, +inf) | 10 |

| gamma | gamma | gamma | Double | | [0.0, 1.0] | 0.1 |

| levelStart | level初始值 | level初始值 | Double | | | |

| predictNum | 预测条数 | 预测条数 | Integer | | | 1 |

| predictionDetailCol | 预测详细信息列名 | 预测详细信息列名 | String | | | |

| reservedCols | 算法保留列名 | 算法保留列 | String[] | | | null |

| seasonalStart | seasonal初始值 | seasonal初始值 | double[] | | | |

| seasonalType | 季节类型 | 季节类型 | String | | “MULTIPLICATIVE”, “ADDITIVE” | “ADDITIVE” |

| trendStart | trend初始值 | trend初始值 | Double | | | |

| numThreads | 组件多线程线程个数 | 组件多线程线程个数 | Integer | | | 1 |

代码示例

Python 代码

  1. from pyalink.alink import *
  2. import pandas as pd
  3. useLocalEnv(1)
  4. import time, datetime
  5. import numpy as np
  6. import pandas as pd
  7. data = pd.DataFrame([
  8. [1, datetime.datetime.fromtimestamp(1), 10.0],
  9. [1, datetime.datetime.fromtimestamp(2), 11.0],
  10. [1, datetime.datetime.fromtimestamp(3), 12.0],
  11. [1, datetime.datetime.fromtimestamp(4), 13.0],
  12. [1, datetime.datetime.fromtimestamp(5), 14.0],
  13. [1, datetime.datetime.fromtimestamp(6), 15.0],
  14. [1, datetime.datetime.fromtimestamp(7), 16.0],
  15. [1, datetime.datetime.fromtimestamp(8), 17.0],
  16. [1, datetime.datetime.fromtimestamp(9), 18.0],
  17. [1, datetime.datetime.fromtimestamp(10), 19.0]
  18. ])
  19. source = dataframeToOperator(data, schemaStr='id int, ts timestamp, val double', op_type='stream')
  20. source.link(
  21. OverCountWindowStreamOp()
  22. .setGroupCols(["id"])
  23. .setTimeCol("ts")
  24. .setPrecedingRows(5)
  25. .setClause("mtable_agg_preceding(ts, val) as data")
  26. ).link(
  27. HoltWintersStreamOp()
  28. .setValueCol("data")
  29. .setPredictionCol("predict")
  30. .setPredictNum(12)
  31. ).link(
  32. LookupValueInTimeSeriesStreamOp()
  33. .setTimeCol("ts")
  34. .setTimeSeriesCol("predict")
  35. .setOutputCol("out")
  36. ).print()
  37. StreamOperator.execute()

Java 代码

  1. package com.alibaba.alink.operator.stream.timeseries;
  2. import org.apache.flink.types.Row;
  3. import com.alibaba.alink.operator.stream.StreamOperator;
  4. import com.alibaba.alink.operator.stream.feature.OverCountWindowStreamOp;
  5. import com.alibaba.alink.operator.stream.source.MemSourceStreamOp;
  6. import org.junit.Test;
  7. import java.sql.Timestamp;
  8. import java.util.Arrays;
  9. import java.util.List;
  10. public class HoltWintersStreamOpTest {
  11. @Test
  12. public void test() throws Exception {
  13. List <Row> mTableData = Arrays.asList(
  14. Row.of(1, new Timestamp(1), 10.0),
  15. Row.of(1, new Timestamp(2), 11.0),
  16. Row.of(1, new Timestamp(3), 12.0),
  17. Row.of(1, new Timestamp(4), 13.0),
  18. Row.of(1, new Timestamp(5), 14.0),
  19. Row.of(1, new Timestamp(6), 15.0),
  20. Row.of(1, new Timestamp(7), 16.0),
  21. Row.of(1, new Timestamp(8), 17.0),
  22. Row.of(1, new Timestamp(9), 18.0),
  23. Row.of(1, new Timestamp(10), 19.0)
  24. );
  25. MemSourceStreamOp source = new MemSourceStreamOp(mTableData, new String[] {"id", "ts", "val"});
  26. source.link(
  27. new OverCountWindowStreamOp()
  28. .setGroupCols("id")
  29. .setTimeCol("ts")
  30. .setPrecedingRows(5)
  31. .setClause("mtable_agg(ts, val) as data")
  32. ).link(
  33. new HoltWintersStreamOp()
  34. .setGroupCol("id")
  35. .setValueCol("data")
  36. .setPredictionCol("predict")
  37. .setPredictNum(12)
  38. ).link(
  39. new LookupValueInTimeSeriesStreamOp()
  40. .setTimeCol("ts")
  41. .setTimeSeriesCol("predict")
  42. .setOutputCol("out")
  43. ).print();
  44. StreamOperator.execute();
  45. }
  46. }

运行结果

| id | ts | val | data | predict | out | | —- | —- | —- | —- | —- | —- |

| 1 | 1970-01-01 08:00:00.001 | 10.0000 | {“data”:{“ts”:[“1970-01-01 08:00:00.001”],”val”:[10.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | null | null |

| 1 | 1970-01-01 08:00:00.002 | 11.0000 | {“data”:{“ts”:[“1970-01-01 08:00:00.001”,”1970-01-01 08:00:00.002”],”val”:[10.0,11.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | {“data”:{“ts”:[“1970-01-01 08:00:00.003”,”1970-01-01 08:00:00.004”,”1970-01-01 08:00:00.005”,”1970-01-01 08:00:00.006”,”1970-01-01 08:00:00.007”,”1970-01-01 08:00:00.008”,”1970-01-01 08:00:00.009”,”1970-01-01 08:00:00.01”,”1970-01-01 08:00:00.011”,”1970-01-01 08:00:00.012”,”1970-01-01 08:00:00.013”,”1970-01-01 08:00:00.014”],”val”:[10.3,10.3,10.3,10.3,10.3,10.3,10.3,10.3,10.3,10.3,10.3,10.3]},”schema”:”ts TIMESTAMP,val DOUBLE”} | null |

| 1 | 1970-01-01 08:00:00.003 | 12.0000 | {“data”:{“ts”:[“1970-01-01 08:00:00.001”,”1970-01-01 08:00:00.002”,”1970-01-01 08:00:00.003”],”val”:[10.0,11.0,12.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | {“data”:{“ts”:[“1970-01-01 08:00:00.004”,”1970-01-01 08:00:00.005”,”1970-01-01 08:00:00.006”,”1970-01-01 08:00:00.007”,”1970-01-01 08:00:00.008”,”1970-01-01 08:00:00.009”,”1970-01-01 08:00:00.01”,”1970-01-01 08:00:00.011”,”1970-01-01 08:00:00.012”,”1970-01-01 08:00:00.013”,”1970-01-01 08:00:00.014”,”1970-01-01 08:00:00.015”],”val”:[12.0,12.0,12.0,12.0,12.0,12.0,12.0,12.0,12.0,12.0,12.0,12.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | null |

| 1 | 1970-01-01 08:00:00.004 | 13.0000 | {“data”:{“ts”:[“1970-01-01 08:00:00.001”,”1970-01-01 08:00:00.002”,”1970-01-01 08:00:00.003”,”1970-01-01 08:00:00.004”],”val”:[10.0,11.0,12.0,13.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | {“data”:{“ts”:[“1970-01-01 08:00:00.005”,”1970-01-01 08:00:00.006”,”1970-01-01 08:00:00.007”,”1970-01-01 08:00:00.008”,”1970-01-01 08:00:00.009”,”1970-01-01 08:00:00.01”,”1970-01-01 08:00:00.011”,”1970-01-01 08:00:00.012”,”1970-01-01 08:00:00.013”,”1970-01-01 08:00:00.014”,”1970-01-01 08:00:00.015”,”1970-01-01 08:00:00.016”],”val”:[13.0,13.0,13.0,13.0,13.0,13.0,13.0,13.0,13.0,13.0,13.0,13.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | null |

| 1 | 1970-01-01 08:00:00.005 | 14.0000 | {“data”:{“ts”:[“1970-01-01 08:00:00.001”,”1970-01-01 08:00:00.002”,”1970-01-01 08:00:00.003”,”1970-01-01 08:00:00.004”,”1970-01-01 08:00:00.005”],”val”:[10.0,11.0,12.0,13.0,14.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | {“data”:{“ts”:[“1970-01-01 08:00:00.006”,”1970-01-01 08:00:00.007”,”1970-01-01 08:00:00.008”,”1970-01-01 08:00:00.009”,”1970-01-01 08:00:00.01”,”1970-01-01 08:00:00.011”,”1970-01-01 08:00:00.012”,”1970-01-01 08:00:00.013”,”1970-01-01 08:00:00.014”,”1970-01-01 08:00:00.015”,”1970-01-01 08:00:00.016”,”1970-01-01 08:00:00.017”],”val”:[14.0,14.0,14.0,14.0,14.0,14.0,14.0,14.0,14.0,14.0,14.0,14.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | null |

| 1 | 1970-01-01 08:00:00.006 | 15.0000 | {“data”:{“ts”:[“1970-01-01 08:00:00.001”,”1970-01-01 08:00:00.002”,”1970-01-01 08:00:00.003”,”1970-01-01 08:00:00.004”,”1970-01-01 08:00:00.005”,”1970-01-01 08:00:00.006”],”val”:[10.0,11.0,12.0,13.0,14.0,15.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | {“data”:{“ts”:[“1970-01-01 08:00:00.007”,”1970-01-01 08:00:00.008”,”1970-01-01 08:00:00.009”,”1970-01-01 08:00:00.01”,”1970-01-01 08:00:00.011”,”1970-01-01 08:00:00.012”,”1970-01-01 08:00:00.013”,”1970-01-01 08:00:00.014”,”1970-01-01 08:00:00.015”,”1970-01-01 08:00:00.016”,”1970-01-01 08:00:00.017”,”1970-01-01 08:00:00.018”],”val”:[15.0,15.0,15.0,15.0,15.0,15.0,15.0,15.0,15.0,15.0,15.0,15.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | null |

| 1 | 1970-01-01 08:00:00.007 | 16.0000 | {“data”:{“ts”:[“1970-01-01 08:00:00.002”,”1970-01-01 08:00:00.003”,”1970-01-01 08:00:00.004”,”1970-01-01 08:00:00.005”,”1970-01-01 08:00:00.006”,”1970-01-01 08:00:00.007”],”val”:[11.0,12.0,13.0,14.0,15.0,16.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | {“data”:{“ts”:[“1970-01-01 08:00:00.008”,”1970-01-01 08:00:00.009”,”1970-01-01 08:00:00.01”,”1970-01-01 08:00:00.011”,”1970-01-01 08:00:00.012”,”1970-01-01 08:00:00.013”,”1970-01-01 08:00:00.014”,”1970-01-01 08:00:00.015”,”1970-01-01 08:00:00.016”,”1970-01-01 08:00:00.017”,”1970-01-01 08:00:00.018”,”1970-01-01 08:00:00.019”],”val”:[16.0,16.0,16.0,16.0,16.0,16.0,16.0,16.0,16.0,16.0,16.0,16.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | null |

| 1 | 1970-01-01 08:00:00.008 | 17.0000 | {“data”:{“ts”:[“1970-01-01 08:00:00.003”,”1970-01-01 08:00:00.004”,”1970-01-01 08:00:00.005”,”1970-01-01 08:00:00.006”,”1970-01-01 08:00:00.007”,”1970-01-01 08:00:00.008”],”val”:[12.0,13.0,14.0,15.0,16.0,17.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | {“data”:{“ts”:[“1970-01-01 08:00:00.009”,”1970-01-01 08:00:00.01”,”1970-01-01 08:00:00.011”,”1970-01-01 08:00:00.012”,”1970-01-01 08:00:00.013”,”1970-01-01 08:00:00.014”,”1970-01-01 08:00:00.015”,”1970-01-01 08:00:00.016”,”1970-01-01 08:00:00.017”,”1970-01-01 08:00:00.018”,”1970-01-01 08:00:00.019”,”1970-01-01 08:00:00.02”],”val”:[17.0,17.0,17.0,17.0,17.0,17.0,17.0,17.0,17.0,17.0,17.0,17.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | null |

| 1 | 1970-01-01 08:00:00.009 | 18.0000 | {“data”:{“ts”:[“1970-01-01 08:00:00.004”,”1970-01-01 08:00:00.005”,”1970-01-01 08:00:00.006”,”1970-01-01 08:00:00.007”,”1970-01-01 08:00:00.008”,”1970-01-01 08:00:00.009”],”val”:[13.0,14.0,15.0,16.0,17.0,18.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | {“data”:{“ts”:[“1970-01-01 08:00:00.01”,”1970-01-01 08:00:00.011”,”1970-01-01 08:00:00.012”,”1970-01-01 08:00:00.013”,”1970-01-01 08:00:00.014”,”1970-01-01 08:00:00.015”,”1970-01-01 08:00:00.016”,”1970-01-01 08:00:00.017”,”1970-01-01 08:00:00.018”,”1970-01-01 08:00:00.019”,”1970-01-01 08:00:00.02”,”1970-01-01 08:00:00.021”],”val”:[18.0,18.0,18.0,18.0,18.0,18.0,18.0,18.0,18.0,18.0,18.0,18.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | null |

| 1 | 1970-01-01 08:00:00.01 | 19.0000 | {“data”:{“ts”:[“1970-01-01 08:00:00.005”,”1970-01-01 08:00:00.006”,”1970-01-01 08:00:00.007”,”1970-01-01 08:00:00.008”,”1970-01-01 08:00:00.009”,”1970-01-01 08:00:00.01”],”val”:[14.0,15.0,16.0,17.0,18.0,19.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | {“data”:{“ts”:[“1970-01-01 08:00:00.011”,”1970-01-01 08:00:00.012”,”1970-01-01 08:00:00.013”,”1970-01-01 08:00:00.014”,”1970-01-01 08:00:00.015”,”1970-01-01 08:00:00.016”,”1970-01-01 08:00:00.017”,”1970-01-01 08:00:00.018”,”1970-01-01 08:00:00.019”,”1970-01-01 08:00:00.02”,”1970-01-01 08:00:00.021”,”1970-01-01 08:00:00.022”],”val”:[19.0,19.0,19.0,19.0,19.0,19.0,19.0,19.0,19.0,19.0,19.0,19.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | null |