Java 类名:com.alibaba.alink.operator.stream.feature.SessionTimeWindowStreamOp
Python 类名:SessionTimeWindowStreamOp

功能介绍

会话窗口是GroupWindow的一种,基于GroupWindow,使用聚合函数进行计算,输出窗口内的统计量,,特征生成方式由clause(表达式决定)。

  • clause语句的形式,通过聚合函数进行操作。其中clause语法和flink sql一致,计算逻辑也和flink overwindow一致。
  • 依据指定列进行groupBy,在用户指定的窗口区间内,按照clause指定的方式进行计算。

    Clause

    clause当前支持全部flink支持的聚合函数,并在此基础上额外支持了一系列聚合函数。
    详细用法请参考 https://www.yuque.com/pinshu/alink_tutorial/list_aggregate_function

    窗口

    Alink支持的窗口, 其中Group窗口是输出窗口聚合统计量,OVER窗口是给定一行数据,将窗口特征追加到数据后面,输出带特征的一行数据。 特征构造: 会话窗口 (SessionTimeWindowStreamOp) - 图1各窗口的详细用法请参考 https://www.yuque.com/pinshu/alink_guide/dffffm

    参数说明

    | 名称 | 中文名称 | 描述 | 类型 | 是否必须? | 取值范围 | 默认值 | | —- | —- | —- | —- | —- | —- | —- |

| clause | 运算语句 | 运算语句 | String | ✓ | | |

| sessionGapTime | 会话窗口间隔大小 | 会话窗口间隔大小 | String | ✓ | | |

| timeCol | 时间戳列(TimeStamp) | 时间戳列(TimeStamp) | String | ✓ | 所选列类型为 [TIMESTAMP] | |

| groupCols | 分组列名数组 | 分组列名,多列,可选,默认不选 | String[] | | | null |

| latency | 水位线的延迟 | 水位线的延迟,默认0.0 | Double | | | 0.0 |

| watermarkType | 水位线的类别 | 水位线的类别 | String | | “PERIOD”, “PUNCTUATED” | “PERIOD” |

代码示例

Python 代码

  1. from pyalink.alink import *
  2. import pandas as pd
  3. useLocalEnv(1)
  4. sourceFrame = pd.DataFrame([
  5. [0, 0, 0, 1],
  6. [0, 2, 0, 2],
  7. [0, 1, 1, 3],
  8. [0, 3, 1, 4],
  9. [0, 3, 3, 5],
  10. [0, 0, 3, 6],
  11. [0, 0, 4, 7],
  12. [0, 3, 4, 8],
  13. [0, 1, 2, 9],
  14. [0, 2, 2, 10],
  15. ])
  16. streamSource = StreamOperator.fromDataframe(sourceFrame,schemaStr="user int, device long, ip long, timeCol long")
  17. op = SessionTimeWindowStreamOp()\
  18. .setTimeCol("timeCol")\
  19. .setSessionGapTime(60)\
  20. .setLatency(180)\
  21. .setGroupCols(["user"])\
  22. .setClause("count_preceding(ip) as countip")
  23. streamSource.select('user, device, ip, to_timestamp(timeCol) as timeCol').link(op).print()
  24. StreamOperator.execute()

Java 代码

  1. package com.alibaba.alink.operator.stream.feature;
  2. import org.apache.flink.types.Row;
  3. import com.alibaba.alink.operator.stream.StreamOperator;
  4. import com.alibaba.alink.operator.stream.source.MemSourceStreamOp;
  5. import com.alibaba.alink.testutil.AlinkTestBase;
  6. import org.junit.Test;
  7. import java.sql.Timestamp;
  8. import java.util.Arrays;
  9. import java.util.List;
  10. public class SessionTimeWindowStreamOpTest extends AlinkTestBase {
  11. @Test
  12. public void test() throws Exception {
  13. List <Row> sourceFrame = Arrays.asList(
  14. Row.of(0, 0, 0, new Timestamp(1000L)),
  15. Row.of(0, 2, 0, new Timestamp(2000L)),
  16. Row.of(0, 1, 1, new Timestamp(3000L)),
  17. Row.of(0, 3, 1, new Timestamp(4000L)),
  18. Row.of(0, 3, 3, new Timestamp(5000L)),
  19. Row.of(0, 0, 3, new Timestamp(7000L)),
  20. Row.of(0, 0, 4, new Timestamp(8000L)),
  21. Row.of(0, 3, 4, new Timestamp(9000L)),
  22. Row.of(0, 1, 2, new Timestamp(10000L))
  23. );
  24. StreamOperator <?> source = new MemSourceStreamOp(
  25. sourceFrame, new String[] {"user", "device", "ip", "ts"});
  26. source.link(
  27. new SessionTimeWindowStreamOp()
  28. .setTimeCol("ts")
  29. .setSessionGapTime("2s")
  30. .setGroupCols("user")
  31. .setClause("SESSION_START() as start_time, SESSION_END() as end_time, count_preceding(ip) as count_ip")
  32. ).print();
  33. StreamOperator.execute();
  34. }
  35. }

运行结果

| user | countip | | —- | —- |

| 0 | 9 |