Java 类名:com.alibaba.alink.operator.batch.timeseries.HoltWintersBatchOp
Python 类名:HoltWintersBatchOp

功能介绍

给定分组,对每一组的数据使用HoltWinters进行时间序列预测。

使用方式

参考文档 https://www.yuque.com/pinshu/alink_guide/xbp5ky

算法原理

HoltWinters由Holt和Winters提出的三次指数平滑算法,又称holt-winters,
HoltWinters 详细介绍请见链接 https://en.wikipedia.org/wiki/Exponential_smoothing
holt-winters支持2种季节类型: additive 和 multiplicative

  • additive seasonal holt-winters

image

  • multiplicative seasonal holt_winters

image

  • 其中,

  • smoothValue(l、b、s)分别表示level,trend,seasonal

  • smoothParameter(α、β、γ)分别表示alpha,beta,gamma
  • t表示当前时刻,h表示要预测h步
  • p表示period或frequency,时间序列的周期

使用方式

  • 第一步,将每组数据(时间列和数据列) 聚合成MTable.``` GroupByBatchOp() .setGroupByPredicate(“id”) .setSelectClause(“id, mtable_agg(ts, val) as data”)
  1. - 第二步,使用时间序列方法进行预测,预测结果也是MTable
  2. - 第三步,使用FlattenMTableBatchOp,将MTable转换成列,

FlattenMTableBatchOp() .setReservedCols([“id”, “predict”]) .setSelectedCol(“predict”) .setSchemaStr(“ts timestamp, val double”)

  1. ## 参数说明
  2. |
  3. 名称
  4. | 中文名称
  5. | 描述
  6. | 类型
  7. | 是否必须?
  8. | 取值范围
  9. | 默认值
  10. |
  11. | --- | --- | --- | --- | --- | --- | --- |
  12. |
  13. predictionCol
  14. | 预测结果列名
  15. | 预测结果列名
  16. | String
  17. |
  18. |
  19. |
  20. |
  21. |
  22. valueCol
  23. | value列,类型为MTable
  24. | value列,类型为MTable
  25. | String
  26. |
  27. | 所选列类型为 [M_TABLE]
  28. |
  29. |
  30. |
  31. alpha
  32. | alpha
  33. | alpha
  34. | Double
  35. |
  36. | [0.0, 1.0]
  37. | 0.3
  38. |
  39. |
  40. beta
  41. | beta
  42. | beta
  43. | Double
  44. |
  45. | [0.0, 1.0]
  46. | 0.1
  47. |
  48. |
  49. doSeasonal
  50. | 时间是否具有季节性
  51. | 时间是否具有季节性
  52. | Boolean
  53. |
  54. |
  55. | false
  56. |
  57. |
  58. doTrend
  59. | 时间是否具有趋势性
  60. | 时间是否具有趋势性
  61. | Boolean
  62. |
  63. |
  64. | false
  65. |
  66. |
  67. frequency
  68. | 时序频率
  69. | 时序频率
  70. | Integer
  71. |
  72. | [1, +inf)
  73. | 10
  74. |
  75. |
  76. gamma
  77. | gamma
  78. | gamma
  79. | Double
  80. |
  81. | [0.0, 1.0]
  82. | 0.1
  83. |
  84. |
  85. levelStart
  86. | level初始值
  87. | level初始值
  88. | Double
  89. |
  90. |
  91. |
  92. |
  93. |
  94. predictNum
  95. | 预测条数
  96. | 预测条数
  97. | Integer
  98. |
  99. |
  100. | 1
  101. |
  102. |
  103. predictionDetailCol
  104. | 预测详细信息列名
  105. | 预测详细信息列名
  106. | String
  107. |
  108. |
  109. |
  110. |
  111. |
  112. reservedCols
  113. | 算法保留列名
  114. | 算法保留列
  115. | String[]
  116. |
  117. |
  118. | null
  119. |
  120. |
  121. seasonalStart
  122. | seasonal初始值
  123. | seasonal初始值
  124. | double[]
  125. |
  126. |
  127. |
  128. |
  129. |
  130. seasonalType
  131. | 季节类型
  132. | 季节类型
  133. | String
  134. |
  135. | "MULTIPLICATIVE", "ADDITIVE"
  136. | "ADDITIVE"
  137. |
  138. |
  139. trendStart
  140. | trend初始值
  141. | trend初始值
  142. | Double
  143. |
  144. |
  145. |
  146. |
  147. |
  148. numThreads
  149. | 组件多线程线程个数
  150. | 组件多线程线程个数
  151. | Integer
  152. |
  153. |
  154. | 1
  155. |
  156. ## 代码示例
  157. ### Python 代码

from pyalink.alink import *

import pandas as pd

useLocalEnv(1)

import time, datetime import numpy as np import pandas as pd

data = pd.DataFrame([ [1, datetime.datetime.fromtimestamp(1), 10.0], [1, datetime.datetime.fromtimestamp(2), 11.0], [1, datetime.datetime.fromtimestamp(3), 12.0], [1, datetime.datetime.fromtimestamp(4), 13.0], [1, datetime.datetime.fromtimestamp(5), 14.0], [1, datetime.datetime.fromtimestamp(6), 15.0], [1, datetime.datetime.fromtimestamp(7), 16.0], [1, datetime.datetime.fromtimestamp(8), 17.0], [1, datetime.datetime.fromtimestamp(9), 18.0], [1, datetime.datetime.fromtimestamp(10), 19.0] ])

source = dataframeToOperator(data, schemaStr=’id int, ts timestamp, val double’, op_type=’batch’)

source.link( GroupByBatchOp() .setGroupByPredicate(“id”) .setSelectClause(“id, mtable_agg(ts, val) as data”) ).link(HoltWintersBatchOp() .setValueCol(“data”) .setPredictionCol(“pred”) .setPredictNum(12) ).print()

  1. ### Java 代码

package com.alibaba.alink.operator.batch.timeseries;

import org.apache.flink.types.Row;

import com.alibaba.alink.operator.batch.source.MemSourceBatchOp; import com.alibaba.alink.operator.batch.sql.GroupByBatchOp; import org.junit.Test;

import java.sql.Timestamp; import java.util.Arrays; import java.util.List;

public class HoltWintersBatchOpTest { @Test public void test() throws Exception { List mTableData = Arrays.asList( Row.of(1, new Timestamp(1), 10.0), Row.of(1, new Timestamp(2), 11.0), Row.of(1, new Timestamp(3), 12.0), Row.of(1, new Timestamp(4), 13.0), Row.of(1, new Timestamp(5), 14.0), Row.of(1, new Timestamp(6), 15.0), Row.of(1, new Timestamp(7), 16.0), Row.of(1, new Timestamp(8), 17.0), Row.of(1, new Timestamp(9), 18.0), Row.of(1, new Timestamp(10), 19.0) );

  1. MemSourceBatchOp source = new MemSourceBatchOp(mTableData, new String[] {"id", "ts", "val"});
  2. source.link(
  3. new GroupByBatchOp()
  4. .setGroupByPredicate("id")
  5. .setSelectClause("mtable_agg(ts, val) as data")
  6. ).link(new HoltWintersBatchOp()
  7. .setValueCol("data")
  8. .setPredictionCol("pred")
  9. .setPredictNum(12)
  10. ).print();
  11. }

}

```

运行结果

| id | data | pred | | —- | —- | —- |

| 1 | {“data”:{“ts”:[“1970-01-01 08:00:00.001”,”1970-01-01 08:00:00.002”,”1970-01-01 08:00:00.003”,”1970-01-01 08:00:00.004”,”1970-01-01 08:00:00.005”,”1970-01-01 08:00:00.006”,”1970-01-01 08:00:00.007”,”1970-01-01 08:00:00.008”,”1970-01-01 08:00:00.009”,”1970-01-01 08:00:00.01”],”val”:[10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | {“data”:{“ts”:[“1970-01-01 08:00:00.011”,”1970-01-01 08:00:00.012”,”1970-01-01 08:00:00.013”,”1970-01-01 08:00:00.014”,”1970-01-01 08:00:00.015”,”1970-01-01 08:00:00.016”,”1970-01-01 08:00:00.017”,”1970-01-01 08:00:00.018”,”1970-01-01 08:00:00.019”,”1970-01-01 08:00:00.02”,”1970-01-01 08:00:00.021”,”1970-01-01 08:00:00.022”],”val”:[19.0,19.0,19.0,19.0,19.0,19.0,19.0,19.0,19.0,19.0,19.0,19.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} |