Java 类名:com.alibaba.alink.operator.stream.timeseries.AutoArimaStreamOp
Python 类名:AutoArimaStreamOp

功能介绍

给定分组,对每一组的数据进行AutoArima时间序列预测,给出下一时间段的结果。

算法原理

Arima全称为自回归积分滑动平均模型(Autoregressive Integrated Moving Average Model,简记ARIMA),是由博克思(Box)和詹金斯(Jenkins)于70年代初提出一著名时间序列预测方法,所以又称为box-jenkins模型、博克思-詹金斯法.
Arima 详细介绍请见链接 https://en.wikipedia.org/wiki/Autoregressive_integrated_moving_average
AutoArima是只需要指定MaxOrder, 不需要指定p/d/q, 对每个分组分别计算出最优的参数,给出预测结果。

使用方式

参考文档 https://www.yuque.com/pinshu/alink_guide/xbp5ky

参数说明

代码示例

Python 代码

  1. from pyalink.alink import *
  2. import pandas as pd
  3. useLocalEnv(1)
  4. import time, datetime
  5. import numpy as np
  6. import pandas as pd
  7. data = pd.DataFrame([
  8. [1, datetime.datetime.fromtimestamp(1001), 10.0],
  9. [1, datetime.datetime.fromtimestamp(1002), 11.0],
  10. [1, datetime.datetime.fromtimestamp(1003), 12.0],
  11. [1, datetime.datetime.fromtimestamp(1004), 13.0],
  12. [1, datetime.datetime.fromtimestamp(1005), 14.0],
  13. [1, datetime.datetime.fromtimestamp(1006), 15.0],
  14. [1, datetime.datetime.fromtimestamp(1007), 16.0],
  15. [1, datetime.datetime.fromtimestamp(1008), 17.0],
  16. [1, datetime.datetime.fromtimestamp(1009), 18.0],
  17. [1, datetime.datetime.fromtimestamp(1010), 19.0]
  18. ])
  19. source = dataframeToOperator(data, schemaStr='id int, ts timestamp, val double', op_type='stream')
  20. source.link(
  21. OverCountWindowStreamOp()
  22. .setGroupCols(["id"])
  23. .setTimeCol("ts")
  24. .setPrecedingRows(5)
  25. .setClause("mtable_agg_preceding(ts, val) as data")
  26. ).link(
  27. AutoArimaStreamOp()
  28. .setValueCol("data")
  29. .setPredictionCol("predict")
  30. .setMaxOrder(1)
  31. .setPredictNum(4)
  32. ).link(
  33. LookupValueInTimeSeriesStreamOp()
  34. .setTimeCol("ts")
  35. .setTimeSeriesCol("predict")
  36. .setOutputCol("out")
  37. ).print()
  38. StreamOperator.execute()

Java 代码

  1. package com.alibaba.alink.operator.stream.timeseries;
  2. import org.apache.flink.types.Row;
  3. import com.alibaba.alink.operator.stream.StreamOperator;
  4. import com.alibaba.alink.operator.stream.feature.OverCountWindowStreamOp;
  5. import com.alibaba.alink.operator.stream.source.MemSourceStreamOp;
  6. import com.alibaba.alink.testutil.AlinkTestBase;
  7. import org.junit.Test;
  8. import java.sql.Timestamp;
  9. import java.util.Arrays;
  10. import java.util.List;
  11. public class AutoArimaStreamOpTest extends AlinkTestBase {
  12. @Test
  13. public void test() throws Exception {
  14. List <Row> mTableData = Arrays.asList(
  15. Row.of(1, new Timestamp(1), 10.0),
  16. Row.of(1, new Timestamp(2), 11.0),
  17. Row.of(1, new Timestamp(3), 12.0),
  18. Row.of(1, new Timestamp(4), 13.0),
  19. Row.of(1, new Timestamp(5), 14.0),
  20. Row.of(1, new Timestamp(6), 15.0),
  21. Row.of(1, new Timestamp(7), 16.0),
  22. Row.of(1, new Timestamp(8), 17.0),
  23. Row.of(1, new Timestamp(9), 18.0),
  24. Row.of(1, new Timestamp(10), 19.0)
  25. );
  26. MemSourceStreamOp source = new MemSourceStreamOp(mTableData, new String[] {"id", "ts", "val"});
  27. source.link(
  28. new OverCountWindowStreamOp()
  29. .setGroupCols("id")
  30. .setTimeCol("ts")
  31. .setPrecedingRows(5)
  32. .setClause("mtable_agg(ts, val) as data")
  33. ).link(
  34. new AutoArimaStreamOp()
  35. .setGroupCol("id")
  36. .setValueCol("data")
  37. .setPredictionCol("predict")
  38. .setPredictNum(12)
  39. ).link(
  40. new LookupValueInTimeSeriesStreamOp()
  41. .setTimeCol("ts")
  42. .setTimeSeriesCol("predict")
  43. .setOutputCol("out")
  44. ).print();
  45. StreamOperator.execute();
  46. }
  47. }

运行结果

| id | ts | val | data | predict | out | | —- | —- | —- | —- | —- | —- |

| 1 | 1970-01-01 08:00:00.001 | 10.0000 | {“data”:{“ts”:[“1970-01-01 08:00:00.001”],”val”:[10.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | null | null |

| 1 | 1970-01-01 08:00:00.002 | 11.0000 | {“data”:{“ts”:[“1970-01-01 08:00:00.001”,”1970-01-01 08:00:00.002”],”val”:[10.0,11.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | {“data”:{“ts”:[“1970-01-01 08:00:00.003”,”1970-01-01 08:00:00.004”,”1970-01-01 08:00:00.005”,”1970-01-01 08:00:00.006”,”1970-01-01 08:00:00.007”,”1970-01-01 08:00:00.008”,”1970-01-01 08:00:00.009”,”1970-01-01 08:00:00.01”,”1970-01-01 08:00:00.011”,”1970-01-01 08:00:00.012”,”1970-01-01 08:00:00.013”,”1970-01-01 08:00:00.014”],”val”:[11.0,11.0,11.0,11.0,11.0,11.0,11.0,11.0,11.0,11.0,11.0,11.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | null |

| 1 | 1970-01-01 08:00:00.003 | 12.0000 | {“data”:{“ts”:[“1970-01-01 08:00:00.001”,”1970-01-01 08:00:00.002”,”1970-01-01 08:00:00.003”],”val”:[10.0,11.0,12.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | null | null |

| 1 | 1970-01-01 08:00:00.004 | 13.0000 | {“data”:{“ts”:[“1970-01-01 08:00:00.001”,”1970-01-01 08:00:00.002”,”1970-01-01 08:00:00.003”,”1970-01-01 08:00:00.004”],”val”:[10.0,11.0,12.0,13.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | null | null |

| 1 | 1970-01-01 08:00:00.005 | 14.0000 | {“data”:{“ts”:[“1970-01-01 08:00:00.001”,”1970-01-01 08:00:00.002”,”1970-01-01 08:00:00.003”,”1970-01-01 08:00:00.004”,”1970-01-01 08:00:00.005”],”val”:[10.0,11.0,12.0,13.0,14.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | null | null |

| 1 | 1970-01-01 08:00:00.006 | 15.0000 | {“data”:{“ts”:[“1970-01-01 08:00:00.001”,”1970-01-01 08:00:00.002”,”1970-01-01 08:00:00.003”,”1970-01-01 08:00:00.004”,”1970-01-01 08:00:00.005”,”1970-01-01 08:00:00.006”],”val”:[10.0,11.0,12.0,13.0,14.0,15.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | null | null |

| 1 | 1970-01-01 08:00:00.007 | 16.0000 | {“data”:{“ts”:[“1970-01-01 08:00:00.002”,”1970-01-01 08:00:00.003”,”1970-01-01 08:00:00.004”,”1970-01-01 08:00:00.005”,”1970-01-01 08:00:00.006”,”1970-01-01 08:00:00.007”],”val”:[11.0,12.0,13.0,14.0,15.0,16.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | null | null |

| 1 | 1970-01-01 08:00:00.008 | 17.0000 | {“data”:{“ts”:[“1970-01-01 08:00:00.003”,”1970-01-01 08:00:00.004”,”1970-01-01 08:00:00.005”,”1970-01-01 08:00:00.006”,”1970-01-01 08:00:00.007”,”1970-01-01 08:00:00.008”],”val”:[12.0,13.0,14.0,15.0,16.0,17.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | null | null |

| 1 | 1970-01-01 08:00:00.009 | 18.0000 | {“data”:{“ts”:[“1970-01-01 08:00:00.004”,”1970-01-01 08:00:00.005”,”1970-01-01 08:00:00.006”,”1970-01-01 08:00:00.007”,”1970-01-01 08:00:00.008”,”1970-01-01 08:00:00.009”],”val”:[13.0,14.0,15.0,16.0,17.0,18.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | null | null |

| 1 | 1970-01-01 08:00:00.01 | 19.0000 | {“data”:{“ts”:[“1970-01-01 08:00:00.005”,”1970-01-01 08:00:00.006”,”1970-01-01 08:00:00.007”,”1970-01-01 08:00:00.008”,”1970-01-01 08:00:00.009”,”1970-01-01 08:00:00.01”],”val”:[14.0,15.0,16.0,17.0,18.0,19.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | null | null |