Java 类名:com.alibaba.alink.operator.stream.timeseries.ShiftStreamOp
Python 类名:ShiftStreamOp
功能介绍
给定分组,对每一组的数据使用Shift进行时间序列预测,使用ShiftNum之前的数据作为预测结果。
使用方式
参考文档 https://www.yuque.com/pinshu/alink_guide/xbp5ky
参数说明
| 名称 | 中文名称 | 描述 | 类型 | 是否必须? | 取值范围 | 默认值 | | —- | —- | —- | —- | —- | —- | —- |
| predictionCol | 预测结果列名 | 预测结果列名 | String | ✓ | | |
| valueCol | value列,类型为MTable | value列,类型为MTable | String | ✓ | 所选列类型为 [M_TABLE] | |
| predictNum | 预测条数 | 预测条数 | Integer | | | 1 |
| predictionDetailCol | 预测详细信息列名 | 预测详细信息列名 | String | | | |
| reservedCols | 算法保留列名 | 算法保留列 | String[] | | | null |
| shiftNum | shift个数 | shift个数 | Integer | | | 7 |
| numThreads | 组件多线程线程个数 | 组件多线程线程个数 | Integer | | | 1 |
代码示例
Python 代码
from pyalink.alink import *import pandas as pduseLocalEnv(1)import time, datetimeimport numpy as npimport pandas as pddata = pd.DataFrame([[1, datetime.datetime.fromtimestamp(1), "10.0"],[1, datetime.datetime.fromtimestamp(2), "11.0"],[1, datetime.datetime.fromtimestamp(3), "12.0"],[1, datetime.datetime.fromtimestamp(4), "13.0"],[1, datetime.datetime.fromtimestamp(5), "14.0"],[1, datetime.datetime.fromtimestamp(6), "15.0"],[1, datetime.datetime.fromtimestamp(7), "16.0"],[1, datetime.datetime.fromtimestamp(8), "17.0"],[1, datetime.datetime.fromtimestamp(9), "18.0"],[1, datetime.datetime.fromtimestamp(10), "19.0"]])source = dataframeToOperator(data, schemaStr='id int, ts timestamp, val string', op_type='stream')source.link(OverCountWindowStreamOp().setGroupCols(["id"]).setTimeCol("ts").setPrecedingRows(5).setClause("mtable_agg_preceding(ts, val) as data")).link(ShiftStreamOp().setValueCol("data").setShiftNum(7).setPredictNum(12).setPredictionCol("predict")).link(LookupVectorInTimeSeriesStreamOp().setTimeCol("ts").setTimeSeriesCol("predict").setOutputCol("out")).print()StreamOperator.execute()
Java 代码
package com.alibaba.alink.operator.stream.timeseries;import org.apache.flink.types.Row;import com.alibaba.alink.operator.batch.source.MemSourceBatchOp;import com.alibaba.alink.operator.batch.timeseries.ShiftBatchOp;import com.alibaba.alink.operator.stream.StreamOperator;import com.alibaba.alink.operator.stream.feature.OverCountWindowStreamOp;import com.alibaba.alink.operator.stream.source.MemSourceStreamOp;import org.junit.Test;import java.sql.Timestamp;import java.util.Arrays;import java.util.List;public class ShiftStreamOpTest {@Testpublic void test() throws Exception {List <Row> mTableData = Arrays.asList(Row.of(1, new Timestamp(1), 10.0),Row.of(1, new Timestamp(2), 11.0),Row.of(1, new Timestamp(3), 12.0),Row.of(1, new Timestamp(4), 13.0),Row.of(1, new Timestamp(5), 14.0),Row.of(1, new Timestamp(6), 15.0),Row.of(1, new Timestamp(7), 16.0),Row.of(1, new Timestamp(8), 17.0),Row.of(1, new Timestamp(9), 18.0),Row.of(1, new Timestamp(10), 19.0));MemSourceStreamOp source = new MemSourceStreamOp(mTableData, new String[] {"id", "ts", "val"});source.link(new OverCountWindowStreamOp().setGroupCols("id").setTimeCol("ts").setPrecedingRows(5).setClause("mtable_agg(ts, val) as data")).link(new ShiftStreamOp().setGroupCol("id").setValueCol("data").setShiftNum(7).setPredictNum(12).setPredictionCol("predict")).link(new LookupValueInTimeSeriesStreamOp().setTimeCol("ts").setTimeSeriesCol("predict").setOutputCol("out")).print();StreamOperator.execute();}}
运行结果
| id | ts | val | data | predict | out | | —- | —- | —- | —- | —- | —- |
| 1 | 1970-01-01 08:00:00.001 | 10.0000 | {“data”:{“ts”:[“1970-01-01 08:00:00.001”],”val”:[10.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | null | null |
| 1 | 1970-01-01 08:00:00.002 | 11.0000 | {“data”:{“ts”:[“1970-01-01 08:00:00.001”,”1970-01-01 08:00:00.002”],”val”:[10.0,11.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | {“data”:{“ts”:[“1970-01-01 08:00:00.003”,”1970-01-01 08:00:00.004”,”1970-01-01 08:00:00.005”,”1970-01-01 08:00:00.006”,”1970-01-01 08:00:00.007”,”1970-01-01 08:00:00.008”,”1970-01-01 08:00:00.009”,”1970-01-01 08:00:00.01”,”1970-01-01 08:00:00.011”,”1970-01-01 08:00:00.012”,”1970-01-01 08:00:00.013”,”1970-01-01 08:00:00.014”],”val”:[10.0,11.0,10.0,11.0,10.0,11.0,10.0,11.0,10.0,11.0,10.0,11.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | null |
| 1 | 1970-01-01 08:00:00.003 | 12.0000 | {“data”:{“ts”:[“1970-01-01 08:00:00.001”,”1970-01-01 08:00:00.002”,”1970-01-01 08:00:00.003”],”val”:[10.0,11.0,12.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | {“data”:{“ts”:[“1970-01-01 08:00:00.004”,”1970-01-01 08:00:00.005”,”1970-01-01 08:00:00.006”,”1970-01-01 08:00:00.007”,”1970-01-01 08:00:00.008”,”1970-01-01 08:00:00.009”,”1970-01-01 08:00:00.01”,”1970-01-01 08:00:00.011”,”1970-01-01 08:00:00.012”,”1970-01-01 08:00:00.013”,”1970-01-01 08:00:00.014”,”1970-01-01 08:00:00.015”],”val”:[10.0,11.0,12.0,10.0,11.0,12.0,10.0,11.0,12.0,10.0,11.0,12.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | null |
| 1 | 1970-01-01 08:00:00.004 | 13.0000 | {“data”:{“ts”:[“1970-01-01 08:00:00.001”,”1970-01-01 08:00:00.002”,”1970-01-01 08:00:00.003”,”1970-01-01 08:00:00.004”],”val”:[10.0,11.0,12.0,13.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | {“data”:{“ts”:[“1970-01-01 08:00:00.005”,”1970-01-01 08:00:00.006”,”1970-01-01 08:00:00.007”,”1970-01-01 08:00:00.008”,”1970-01-01 08:00:00.009”,”1970-01-01 08:00:00.01”,”1970-01-01 08:00:00.011”,”1970-01-01 08:00:00.012”,”1970-01-01 08:00:00.013”,”1970-01-01 08:00:00.014”,”1970-01-01 08:00:00.015”,”1970-01-01 08:00:00.016”],”val”:[10.0,11.0,12.0,13.0,10.0,11.0,12.0,13.0,10.0,11.0,12.0,13.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | null |
| 1 | 1970-01-01 08:00:00.005 | 14.0000 | {“data”:{“ts”:[“1970-01-01 08:00:00.001”,”1970-01-01 08:00:00.002”,”1970-01-01 08:00:00.003”,”1970-01-01 08:00:00.004”,”1970-01-01 08:00:00.005”],”val”:[10.0,11.0,12.0,13.0,14.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | {“data”:{“ts”:[“1970-01-01 08:00:00.006”,”1970-01-01 08:00:00.007”,”1970-01-01 08:00:00.008”,”1970-01-01 08:00:00.009”,”1970-01-01 08:00:00.01”,”1970-01-01 08:00:00.011”,”1970-01-01 08:00:00.012”,”1970-01-01 08:00:00.013”,”1970-01-01 08:00:00.014”,”1970-01-01 08:00:00.015”,”1970-01-01 08:00:00.016”,”1970-01-01 08:00:00.017”],”val”:[10.0,11.0,12.0,13.0,14.0,10.0,11.0,12.0,13.0,14.0,10.0,11.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | null |
| 1 | 1970-01-01 08:00:00.006 | 15.0000 | {“data”:{“ts”:[“1970-01-01 08:00:00.001”,”1970-01-01 08:00:00.002”,”1970-01-01 08:00:00.003”,”1970-01-01 08:00:00.004”,”1970-01-01 08:00:00.005”,”1970-01-01 08:00:00.006”],”val”:[10.0,11.0,12.0,13.0,14.0,15.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | {“data”:{“ts”:[“1970-01-01 08:00:00.007”,”1970-01-01 08:00:00.008”,”1970-01-01 08:00:00.009”,”1970-01-01 08:00:00.01”,”1970-01-01 08:00:00.011”,”1970-01-01 08:00:00.012”,”1970-01-01 08:00:00.013”,”1970-01-01 08:00:00.014”,”1970-01-01 08:00:00.015”,”1970-01-01 08:00:00.016”,”1970-01-01 08:00:00.017”,”1970-01-01 08:00:00.018”],”val”:[10.0,11.0,12.0,13.0,14.0,15.0,10.0,11.0,12.0,13.0,14.0,15.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | null |
| 1 | 1970-01-01 08:00:00.007 | 16.0000 | {“data”:{“ts”:[“1970-01-01 08:00:00.002”,”1970-01-01 08:00:00.003”,”1970-01-01 08:00:00.004”,”1970-01-01 08:00:00.005”,”1970-01-01 08:00:00.006”,”1970-01-01 08:00:00.007”],”val”:[11.0,12.0,13.0,14.0,15.0,16.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | {“data”:{“ts”:[“1970-01-01 08:00:00.008”,”1970-01-01 08:00:00.009”,”1970-01-01 08:00:00.01”,”1970-01-01 08:00:00.011”,”1970-01-01 08:00:00.012”,”1970-01-01 08:00:00.013”,”1970-01-01 08:00:00.014”,”1970-01-01 08:00:00.015”,”1970-01-01 08:00:00.016”,”1970-01-01 08:00:00.017”,”1970-01-01 08:00:00.018”,”1970-01-01 08:00:00.019”],”val”:[11.0,12.0,13.0,14.0,15.0,16.0,11.0,12.0,13.0,14.0,15.0,16.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | null |
| 1 | 1970-01-01 08:00:00.008 | 17.0000 | {“data”:{“ts”:[“1970-01-01 08:00:00.003”,”1970-01-01 08:00:00.004”,”1970-01-01 08:00:00.005”,”1970-01-01 08:00:00.006”,”1970-01-01 08:00:00.007”,”1970-01-01 08:00:00.008”],”val”:[12.0,13.0,14.0,15.0,16.0,17.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | {“data”:{“ts”:[“1970-01-01 08:00:00.009”,”1970-01-01 08:00:00.01”,”1970-01-01 08:00:00.011”,”1970-01-01 08:00:00.012”,”1970-01-01 08:00:00.013”,”1970-01-01 08:00:00.014”,”1970-01-01 08:00:00.015”,”1970-01-01 08:00:00.016”,”1970-01-01 08:00:00.017”,”1970-01-01 08:00:00.018”,”1970-01-01 08:00:00.019”,”1970-01-01 08:00:00.02”],”val”:[12.0,13.0,14.0,15.0,16.0,17.0,12.0,13.0,14.0,15.0,16.0,17.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | null |
| 1 | 1970-01-01 08:00:00.009 | 18.0000 | {“data”:{“ts”:[“1970-01-01 08:00:00.004”,”1970-01-01 08:00:00.005”,”1970-01-01 08:00:00.006”,”1970-01-01 08:00:00.007”,”1970-01-01 08:00:00.008”,”1970-01-01 08:00:00.009”],”val”:[13.0,14.0,15.0,16.0,17.0,18.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | {“data”:{“ts”:[“1970-01-01 08:00:00.01”,”1970-01-01 08:00:00.011”,”1970-01-01 08:00:00.012”,”1970-01-01 08:00:00.013”,”1970-01-01 08:00:00.014”,”1970-01-01 08:00:00.015”,”1970-01-01 08:00:00.016”,”1970-01-01 08:00:00.017”,”1970-01-01 08:00:00.018”,”1970-01-01 08:00:00.019”,”1970-01-01 08:00:00.02”,”1970-01-01 08:00:00.021”],”val”:[13.0,14.0,15.0,16.0,17.0,18.0,13.0,14.0,15.0,16.0,17.0,18.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | null |
| 1 | 1970-01-01 08:00:00.01 | 19.0000 | {“data”:{“ts”:[“1970-01-01 08:00:00.005”,”1970-01-01 08:00:00.006”,”1970-01-01 08:00:00.007”,”1970-01-01 08:00:00.008”,”1970-01-01 08:00:00.009”,”1970-01-01 08:00:00.01”],”val”:[14.0,15.0,16.0,17.0,18.0,19.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | {“data”:{“ts”:[“1970-01-01 08:00:00.011”,”1970-01-01 08:00:00.012”,”1970-01-01 08:00:00.013”,”1970-01-01 08:00:00.014”,”1970-01-01 08:00:00.015”,”1970-01-01 08:00:00.016”,”1970-01-01 08:00:00.017”,”1970-01-01 08:00:00.018”,”1970-01-01 08:00:00.019”,”1970-01-01 08:00:00.02”,”1970-01-01 08:00:00.021”,”1970-01-01 08:00:00.022”],”val”:[14.0,15.0,16.0,17.0,18.0,19.0,14.0,15.0,16.0,17.0,18.0,19.0]},”schema”:”ts TIMESTAMP,val DOUBLE”} | null |
