Java 类名:com.alibaba.alink.operator.batch.timeseries.LookupVectorInTimeSeriesBatchOp
Python 类名:LookupVectorInTimeSeriesBatchOp

功能介绍

在时间序列中查找对应时间的值。

注意事项

  • 时间序列列,是特殊的MTable类型,一列是时间,一列是值,参考运行结果的data列。
  • 查找的时间在数据列中不存在时,用相邻时刻的值差值得到结果。

    参数说明

    | 名称 | 中文名称 | 描述 | 类型 | 是否必须? | 取值范围 | 默认值 | | —- | —- | —- | —- | —- | —- | —- |

| outputCol | 输出结果列列名 | 输出结果列列名,必选 | String | ✓ | | |

| timeCol | 时间戳列(TimeStamp) | 时间戳列(TimeStamp) | String | ✓ | 所选列类型为 [TIMESTAMP] | |

| timeSeriesCol | 时间序列列 | 时间序列列,是特殊的MTable类型,一列是时间,一列是值 | String | ✓ | 所选列类型为 [M_TABLE] | |

| reservedCols | 算法保留列名 | 算法保留列 | String[] | | | null |

| numThreads | 组件多线程线程个数 | 组件多线程线程个数 | Integer | | | 1 |

代码示例

Python 代码

  1. from pyalink.alink import *
  2. import pandas as pd
  3. useLocalEnv(1)
  4. import time, datetime
  5. import numpy as np
  6. import pandas as pd
  7. data = pd.DataFrame([
  8. [1, datetime.datetime.fromtimestamp(1), "10.0 10.0"],
  9. [1, datetime.datetime.fromtimestamp(2), "11.0 11.0"],
  10. [1, datetime.datetime.fromtimestamp(3), "12.0 12.0"],
  11. [1, datetime.datetime.fromtimestamp(4), "13.0 13.0"],
  12. [1, datetime.datetime.fromtimestamp(5), "14.0 14.0"],
  13. [1, datetime.datetime.fromtimestamp(6), "15.0 15.0"],
  14. [1, datetime.datetime.fromtimestamp(7), "16.0 16.0"],
  15. [1, datetime.datetime.fromtimestamp(8), "17.0 17.0"],
  16. [1, datetime.datetime.fromtimestamp(9), "18.0 18.0"],
  17. [1, datetime.datetime.fromtimestamp(10), "19.0 19.0"]
  18. ])
  19. source = dataframeToOperator(data, schemaStr='id int, ts timestamp, val string', op_type='batch')
  20. source.link(
  21. GroupByBatchOp()
  22. .setGroupByPredicate("id")
  23. .setSelectClause("id, mtable_agg(ts, val) as data")
  24. ).link(
  25. ShiftBatchOp()
  26. .setValueCol("data")
  27. .setShiftNum(7)
  28. .setPredictNum(12)
  29. .setPredictionCol("predict")
  30. ).link(
  31. FlattenMTableBatchOp()
  32. .setReservedCols(["id", "predict"])
  33. .setSelectedCol("predict")
  34. .setSchemaStr("ts timestamp, val VECTOR")
  35. ).link(
  36. LookupVectorInTimeSeriesBatchOp()
  37. .setTimeCol("ts")
  38. .setTimeSeriesCol("predict")
  39. .setOutputCol("out")
  40. .setReservedCols(["id"])
  41. ).print()

Java 代码

  1. package com.alibaba.alink.operator.batch.timeseries;
  2. import org.apache.flink.types.Row;
  3. import com.alibaba.alink.common.MTable;
  4. import com.alibaba.alink.operator.batch.source.MemSourceBatchOp;
  5. import org.junit.Test;
  6. import java.sql.Timestamp;
  7. import java.util.Arrays;
  8. import java.util.List;
  9. import static org.junit.Assert.*;
  10. public class LookupVectorInTimeSeriesBatchOpTest {
  11. @Test
  12. public void test() throws Exception {
  13. List<Row> mTableData = Arrays.asList(
  14. Row.of(new Timestamp(1), "10.0 21.0"),
  15. Row.of(new Timestamp(2), "11.0 22.0"),
  16. Row.of(new Timestamp(3), "12.0 23.0"),
  17. Row.of(new Timestamp(4), "13.0 24.0"),
  18. Row.of(new Timestamp(5), "14.0 25.0"),
  19. Row.of(new Timestamp(6), "15.0 26.0"),
  20. Row.of(new Timestamp(7), "16.0 27.0"),
  21. Row.of(new Timestamp(8), "17.0 28.0"),
  22. Row.of(new Timestamp(9), "18.0 29.0"),
  23. Row.of(new Timestamp(10), "19.0 30.0")
  24. );
  25. MTable mtable = new MTable(mTableData, "ts timestamp, val vector");
  26. MemSourceBatchOp source = new MemSourceBatchOp(
  27. new Object[][] {
  28. {1, new Timestamp(5), mtable}
  29. },
  30. new String[] {"id", "ts", "data"});
  31. source
  32. .link(new LookupVectorInTimeSeriesBatchOp()
  33. .setTimeCol("ts")
  34. .setTimeSeriesCol("data")
  35. .setOutputCol("out")
  36. )
  37. .print();
  38. }
  39. }

运行结果

| id | ts | data | out | | —- | —- | —- | —- |

| 1 | 1970-01-01 08:00:00.005 | MTable(10,2)(ts,val)
1970-01-01 08:00:00.001 | 10.0 21.0
1970-01-01 08:00:00.002 | 11.0 22.0
1970-01-01 08:00:00.003 | 12.0 23.0
1970-01-01 08:00:00.004 | 13.0 24.0
1970-01-01 08:00:00.005 | 14.0 25.0 | 14.0 25.0 |