Java 类名:com.alibaba.alink.operator.batch.timeseries.ProphetBatchOp
Python 类名:ProphetBatchOp

功能介绍

对每一行的MTable数据, 进行Prophet时间序列预测,给出下一时间段的预测结果。

使用方式

参考文档 https://www.yuque.com/pinshu/alink_guide/xbp5ky

算法原理

Prophet是facebook开源的一个时间序列预测算法, github地址:https://github.com/facebook/prophet.
Prophet适用于具有明显的内在规律的数据, 例如:

  • 有一定的历史数据,有至少几个月的每小时、每天或每周观察的历史数据
  • 有较强的季节性趋势:每周的一些天,每年的一些时间
  • 有已知的以不定期的间隔发生的重要节假日(比如国庆节)
  • 缺失的历史数据或较大的异常数据的数量在合理范围内
  • 对于数据中蕴含的非线性增长的趋势都有一个自然极限或饱和状态

    使用方式

  • 第一步,将每组数据(时间列和数据列) 聚合成MTable.``` GroupByBatchOp() .setGroupByPredicate(“id”) .setSelectClause(“id, mtable_agg(ts, val) as data”)

  1. - 第二步,使用时间序列方法进行预测,预测结果也是MTable
  2. - 第三步,使用FlattenMTableBatchOp,将MTable转换成列,

FlattenMTableBatchOp() .setReservedCols([“id”, “predict”]) .setSelectedCol(“predict”) .setSchemaStr(“ts timestamp, val double”)

  1. ## 参数说明
  2. |
  3. 名称
  4. | 中文名称
  5. | 描述
  6. | 类型
  7. | 是否必须?
  8. | 取值范围
  9. | 默认值
  10. |
  11. | --- | --- | --- | --- | --- | --- | --- |
  12. |
  13. predictionCol
  14. | 预测结果列名
  15. | 预测结果列名
  16. | String
  17. |
  18. |
  19. |
  20. |
  21. |
  22. valueCol
  23. | value列,类型为MTable
  24. | value列,类型为MTable
  25. | String
  26. |
  27. | 所选列类型为 [M_TABLE]
  28. |
  29. |
  30. |
  31. predictNum
  32. | 预测条数
  33. | 预测条数
  34. | Integer
  35. |
  36. |
  37. | 1
  38. |
  39. |
  40. predictionDetailCol
  41. | 预测详细信息列名
  42. | 预测详细信息列名
  43. | String
  44. |
  45. |
  46. |
  47. |
  48. |
  49. pythonEnv
  50. | Python 环境路径
  51. | Python 环境路径,一般情况下不需要填写。如果是压缩文件,需要解压后得到一个目录,且目录名与压缩文件主文件名一致,可以使用 http://, https://, oss://, hdfs:// 等路径;如果是目录,那么只能使用本地路径,即 file://。
  52. | String
  53. |
  54. |
  55. | ""
  56. |
  57. |
  58. reservedCols
  59. | 算法保留列名
  60. | 算法保留列
  61. | String[]
  62. |
  63. |
  64. | null
  65. |
  66. |
  67. stanInit
  68. | 初始值
  69. | 初始值
  70. | String
  71. |
  72. |
  73. | null
  74. |
  75. |
  76. uncertaintySamples
  77. | 用来计算指标的采样数目
  78. | 用来计算指标的采样数目,设置成0,不计算指标。
  79. | Integer
  80. |
  81. |
  82. | 1000
  83. |
  84. |
  85. numThreads
  86. | 组件多线程线程个数
  87. | 组件多线程线程个数
  88. | Integer
  89. |
  90. |
  91. | 1
  92. |
  93. ## 代码示例
  94. ### Python 代码

from pyalink.alink import *

import pandas as pd

useLocalEnv(1)

import time, datetime import numpy as np import pandas as pd

downloader = AlinkGlobalConfiguration.getPluginDownloader() downloader.downloadPlugin(‘tf115_python_env_linux’)

data = pd.DataFrame([ [1, datetime.datetime.fromtimestamp(1), 10.0], [1, datetime.datetime.fromtimestamp(2), 11.0], [1, datetime.datetime.fromtimestamp(3), 12.0], [1, datetime.datetime.fromtimestamp(4), 13.0], [1, datetime.datetime.fromtimestamp(5), 14.0], [1, datetime.datetime.fromtimestamp(6), 15.0], [1, datetime.datetime.fromtimestamp(7), 16.0], [1, datetime.datetime.fromtimestamp(8), 17.0], [1, datetime.datetime.fromtimestamp(9), 18.0], [1, datetime.datetime.fromtimestamp(10), 19.0] ])

source = dataframeToOperator(data, schemaStr=’id int, ts timestamp, val double’, op_type=’batch’)

source.link( GroupByBatchOp() .setGroupByPredicate(“id”) .setSelectClause(“id, mtable_agg(ts, val) as data”) ).link(ProphetBatchOp() .setValueCol(“data”) .setPredictNum(4) .setPredictionCol(“pred”) ).print()

  1. ### Java 代码

package com.alibaba.alink.operator.batch.timeseries;

import org.apache.flink.types.Row;

import com.alibaba.alink.operator.batch.sql.GroupByBatchOp; import com.alibaba.alink.operator.batch.source.MemSourceBatchOp; import com.alibaba.alink.testutil.AlinkTestBase; import org.junit.Test;

import java.sql.Timestamp; import java.util.Arrays; import java.util.List;

public class ArimaBatchOpTest extends AlinkTestBase {

  1. @Test
  2. public void test() throws Exception {
  3. List <Row> mTableData = Arrays.asList(
  4. Row.of(1, new Timestamp(1), 10.0),
  5. Row.of(1, new Timestamp(2), 11.0),
  6. Row.of(1, new Timestamp(3), 12.0),
  7. Row.of(1, new Timestamp(4), 13.0),
  8. Row.of(1, new Timestamp(5), 14.0),
  9. Row.of(1, new Timestamp(6), 15.0),
  10. Row.of(1, new Timestamp(7), 16.0),
  11. Row.of(1, new Timestamp(8), 17.0),
  12. Row.of(1, new Timestamp(9), 18.0),
  13. Row.of(1, new Timestamp(10), 19.0)
  14. );
  15. MemSourceBatchOp source = new MemSourceBatchOp(mTableData, new String[] {"id", "ts", "val"});
  16. source.link(
  17. new GroupByBatchOp()
  18. .setGroupByPredicate("id")
  19. .setSelectClause("mtable_agg(ts, val) as data")
  20. ).link(new ProphetBatchOp()
  21. .setValueCol("data")
  22. .setPredictNum(4)
  23. .setPredictionCol("pred")
  24. ).print();
  25. }

}

```

运行结果

| id | data | predict | | —- | —- | —- |

| 1 | {“data”:{“ts”:[“1970-01-01 08:00:00.001”,”1970-01-01 08:00:00.002”,”1970-01-01 08:00:00.003”,”1970-01-01 08:00:00.004”,”1970-01-01 08:00:00.005”,”1970-01-01 08:00:00.006”,”1970-01-01 08:00:00.007”,”1970-01-01 08:00:00.008”,”1970-01-01 08:00:00.009”,”1970-01-01 08:00:00.01”],”val”:[10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | {“data”:{“ts”:[“1970-01-01 08:00:00.011”,”1970-01-01 08:00:00.012”,”1970-01-01 08:00:00.013”,”1970-01-01 08:00:00.014”,”1970-01-01 08:00:00.015”,”1970-01-01 08:00:00.016”,”1970-01-01 08:00:00.017”,”1970-01-01 08:00:00.018”,”1970-01-01 08:00:00.019”,”1970-01-01 08:00:00.02”,”1970-01-01 08:00:00.021”,”1970-01-01 08:00:00.022”],”val”:[20.0,21.0,22.0,23.0,24.0,25.0,26.0,27.0,28.0,29.0,30.0,31.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} |

t