Java 类名:com.alibaba.alink.operator.batch.timeseries.ShiftBatchOp
Python 类名:ShiftBatchOp

功能介绍

给定分组,对每一组的数据使用Shift进行时间序列预测,使用ShiftNum之前的数据作为预测结果。

使用方式

使用方式

参考文档 https://www.yuque.com/pinshu/alink_guide/xbp5ky

参数说明

| 名称 | 中文名称 | 描述 | 类型 | 是否必须? | 取值范围 | 默认值 | | —- | —- | —- | —- | —- | —- | —- |

| predictionCol | 预测结果列名 | 预测结果列名 | String | ✓ | | |

| valueCol | value列,类型为MTable | value列,类型为MTable | String | ✓ | 所选列类型为 [M_TABLE] | |

| predictNum | 预测条数 | 预测条数 | Integer | | | 1 |

| predictionDetailCol | 预测详细信息列名 | 预测详细信息列名 | String | | | |

| reservedCols | 算法保留列名 | 算法保留列 | String[] | | | null |

| shiftNum | shift个数 | shift个数 | Integer | | | 7 |

| numThreads | 组件多线程线程个数 | 组件多线程线程个数 | Integer | | | 1 |

代码示例

Python 代码

  1. from pyalink.alink import *
  2. import pandas as pd
  3. useLocalEnv(1)
  4. import time, datetime
  5. import numpy as np
  6. import pandas as pd
  7. data = pd.DataFrame([
  8. [1, datetime.datetime.fromtimestamp(1), 10.0],
  9. [1, datetime.datetime.fromtimestamp(2), 11.0],
  10. [1, datetime.datetime.fromtimestamp(3), 12.0],
  11. [1, datetime.datetime.fromtimestamp(4), 13.0],
  12. [1, datetime.datetime.fromtimestamp(5), 14.0],
  13. [1, datetime.datetime.fromtimestamp(6), 15.0],
  14. [1, datetime.datetime.fromtimestamp(7), 16.0],
  15. [1, datetime.datetime.fromtimestamp(8), 17.0],
  16. [1, datetime.datetime.fromtimestamp(9), 18.0],
  17. [1, datetime.datetime.fromtimestamp(10), 19.0]
  18. ])
  19. source = dataframeToOperator(data, schemaStr='id int, ts timestamp, val double', op_type='batch')
  20. source.link(
  21. GroupByBatchOp()
  22. .setGroupByPredicate("id")
  23. .setSelectClause("id, mtable_agg(ts, val) as data")
  24. ).link(ShiftBatchOp()
  25. .setValueCol("data")
  26. .setShiftNum(7)
  27. .setPredictNum(12)
  28. .setPredictionCol("predict")
  29. ).print()

Java 代码

  1. package com.alibaba.alink.operator.batch.timeseries;
  2. import org.apache.flink.types.Row;
  3. import com.alibaba.alink.operator.batch.sql.GroupByBatchOp;
  4. import com.alibaba.alink.operator.batch.source.MemSourceBatchOp;
  5. import org.junit.Test;
  6. import java.sql.Timestamp;
  7. import java.util.Arrays;
  8. import java.util.List;
  9. public class ShiftBatchOpTest {
  10. @Test
  11. public void test() throws Exception {
  12. List <Row> mTableData = Arrays.asList(
  13. Row.of(1, new Timestamp(1), 10.0),
  14. Row.of(1, new Timestamp(2), 11.0),
  15. Row.of(1, new Timestamp(3), 12.0),
  16. Row.of(1, new Timestamp(4), 13.0),
  17. Row.of(1, new Timestamp(5), 14.0),
  18. Row.of(1, new Timestamp(6), 15.0),
  19. Row.of(1, new Timestamp(7), 16.0),
  20. Row.of(1, new Timestamp(8), 17.0),
  21. Row.of(1, new Timestamp(9), 18.0),
  22. Row.of(1, new Timestamp(10), 19.0)
  23. );
  24. MemSourceBatchOp source = new MemSourceBatchOp(mTableData, new String[] {"id", "ts", "val"});
  25. source
  26. .link(
  27. new GroupByBatchOp()
  28. .setGroupByPredicate("id")
  29. .setSelectClause("mtable_agg(ts, val) as data")
  30. )
  31. .link(
  32. new ShiftBatchOp()
  33. .setGroupCol("id")
  34. .setValueCol("data")
  35. .setShiftNum(7)
  36. .setPredictNum(12)
  37. .setPredictionCol("predict")
  38. )
  39. .print();
  40. }
  41. }

运行结果

| id | data | predict | | —- | —- | —- |

| 1 | {“data”:{“ts”:[“1970-01-01 08:00:00.001”,”1970-01-01 08:00:00.002”,”1970-01-01 08:00:00.003”,”1970-01-01 08:00:00.004”,”1970-01-01 08:00:00.005”,”1970-01-01 08:00:00.006”,”1970-01-01 08:00:00.007”,”1970-01-01 08:00:00.008”,”1970-01-01 08:00:00.009”,”1970-01-01 08:00:00.01”],”val”:[10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | {“data”:{“ts”:[“1970-01-01 08:00:00.011”,”1970-01-01 08:00:00.012”,”1970-01-01 08:00:00.013”,”1970-01-01 08:00:00.014”,”1970-01-01 08:00:00.015”,”1970-01-01 08:00:00.016”,”1970-01-01 08:00:00.017”,”1970-01-01 08:00:00.018”,”1970-01-01 08:00:00.019”,”1970-01-01 08:00:00.02”,”1970-01-01 08:00:00.021”,”1970-01-01 08:00:00.022”],”val”:[13.0,14.0,15.0,16.0,17.0,18.0,19.0,13.0,14.0,15.0,16.0,17.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} |