Java 类名:com.alibaba.alink.operator.stream.feature.OverCountWindowStreamOp
Python 类名:OverCountWindowStreamOp

功能介绍

OverCount窗口是Over窗口的一种,基于OverWindow,使用聚合函数进行流式特征构造。给定一行数据,将生成的特征追加在后面,输出一行数据,生成方式有clause(表达式决定)。

  • clause语句的形式,通过聚合函数进行操作。其中clause语法和flink sql一致,计算逻辑也和flink overwindow一致。
  • 依据指定列进行groupBy,在用户指定的窗口区间内,按照clause指定的方式进行计算。

    Clause

    clause当前支持全部flink支持的聚合函数,并在此基础上额外支持了一系列聚合函数。
    详细用法请参考 https://www.yuque.com/pinshu/alink_tutorial/list_aggregate_function

    窗口

    Alink支持的窗口, 其中Group窗口是输出窗口聚合统计量,OVER窗口是给定一行数据,将窗口特征追加到数据后面,输出带特征的一行数据。 特征构造: OverCountWindow (OverCountWindowStreamOp) - 图1各窗口的详细用法请参考 https://www.yuque.com/pinshu/alink_guide/dffffm

    参数说明

    | 名称 | 中文名称 | 描述 | 类型 | 是否必须? | 取值范围 | 默认值 | | —- | —- | —- | —- | —- | —- | —- |

| clause | 运算语句 | 运算语句 | String | ✓ | | |

| timeCol | 时间戳列(TimeStamp) | 时间戳列(TimeStamp) | String | ✓ | 所选列类型为 [TIMESTAMP] | |

| groupCols | 分组列名数组 | 分组列名,多列,可选,默认不选 | String[] | | | null |

| latency | 水位线的延迟 | 水位线的延迟,默认0.0 | Double | | | 0.0 |

| precedingRows | 数据窗口大小 | 数据窗口大小 | Integer | | | null |

| reservedCols | 算法保留列名 | 算法保留列 | String[] | | | null |

| watermarkType | 水位线的类别 | 水位线的类别 | String | | “PERIOD”, “PUNCTUATED” | “PERIOD” |

代码示例

Python 代码

  1. from pyalink.alink import *
  2. import pandas as pd
  3. useLocalEnv(1)
  4. sourceFrame = pd.DataFrame([
  5. [0, 0, 0, 1],
  6. [0, 2, 0, 2],
  7. [0, 1, 1, 3],
  8. [0, 3, 1, 4],
  9. [0, 3, 3, 5],
  10. [0, 0, 3, 6],
  11. [0, 0, 4, 7],
  12. [0, 3, 4, 8],
  13. [0, 1, 2, 9],
  14. [0, 2, 2, 10],
  15. ])
  16. source = StreamOperator.fromDataframe(sourceFrame,schemaStr="user int, device long, ip long, timeCol long")
  17. op = OverCountWindowStreamOp().setTimeCol("timeCol").setPrecedingRows(10).setGroupCols(["user"]).setClause("count_preceding(ip) as countip")
  18. source.select('user, device, ip, to_timestamp(timeCol) as timeCol').link(op).print()
  19. StreamOperator.execute()

Java 代码

  1. import org.apache.flink.types.Row;
  2. import com.alibaba.alink.operator.stream.StreamOperator;
  3. import com.alibaba.alink.operator.stream.feature.OverCountWindowStreamOp;
  4. import com.alibaba.alink.operator.stream.source.MemSourceStreamOp;
  5. import com.alibaba.alink.operator.stream.sql.SqlCmdStreamOp;
  6. import org.junit.Test;
  7. import java.util.Arrays;
  8. import java.util.List;
  9. public class OverCountWindowStreamOpTest {
  10. @Test
  11. public void testOverCountWindowStreamOp() throws Exception {
  12. List <Row> sourceFrame = Arrays.asList(
  13. Row.of(0, 0, 0, 1L),
  14. Row.of(0, 2, 0, 2L),
  15. Row.of(0, 1, 1, 3L),
  16. Row.of(0, 3, 1, 4L),
  17. Row.of(0, 3, 3, 5L),
  18. Row.of(0, 0, 3, 6L),
  19. Row.of(0, 0, 4, 7L),
  20. Row.of(0, 3, 4, 8L),
  21. Row.of(0, 1, 2, 9L),
  22. Row.of(0, 2, 2, 10L)
  23. );
  24. StreamOperator <?> streamSource = new MemSourceStreamOp(sourceFrame,
  25. "user int, device int, ip int, timeCol long");
  26. StreamOperator <?> op = new OverCountWindowStreamOp().setTimeCol("timeCol").setPrecedingRows(10)
  27. .setGroupCols("user").setClause("count_preceding(ip) as countip");
  28. streamSource.select("user, device, ip, to_timestamp(timeCol) as timeCol").link(op).print();
  29. StreamOperator.execute();
  30. }
  31. }

运行结果

| user | device | ip | timeCol | countip | | —- | —- | —- | —- | —- |

| 0 | 0 | 0 | 1970-01-01 08:00:00.001 | 0 |

| 0 | 2 | 0 | 1970-01-01 08:00:00.002 | 1 |

| 0 | 1 | 1 | 1970-01-01 08:00:00.003 | 2 |

| 0 | 3 | 1 | 1970-01-01 08:00:00.004 | 3 |

| 0 | 3 | 3 | 1970-01-01 08:00:00.005 | 4 |

| 0 | 0 | 3 | 1970-01-01 08:00:00.006 | 5 |

| 0 | 0 | 4 | 1970-01-01 08:00:00.007 | 6 |

| 0 | 3 | 4 | 1970-01-01 08:00:00.008 | 7 |

| 0 | 1 | 2 | 1970-01-01 08:00:00.009 | 8 |

| 0 | 2 | 2 | 1970-01-01 08:00:00.01 | 9 |