Java 类名:com.alibaba.alink.operator.stream.feature.SessionTimeWindowStreamOp
Python 类名:SessionTimeWindowStreamOp
功能介绍
会话窗口是GroupWindow的一种,基于GroupWindow,使用聚合函数进行计算,输出窗口内的统计量,,特征生成方式由clause(表达式决定)。
- clause语句的形式,通过聚合函数进行操作。其中clause语法和flink sql一致,计算逻辑也和flink overwindow一致。
- 依据指定列进行groupBy,在用户指定的窗口区间内,按照clause指定的方式进行计算。
Clause
clause当前支持全部flink支持的聚合函数,并在此基础上额外支持了一系列聚合函数。
详细用法请参考 https://www.yuque.com/pinshu/alink_tutorial/list_aggregate_function窗口
Alink支持的窗口, 其中Group窗口是输出窗口聚合统计量,OVER窗口是给定一行数据,将窗口特征追加到数据后面,输出带特征的一行数据。 各窗口的详细用法请参考 https://www.yuque.com/pinshu/alink_guide/dffffm参数说明
| 名称 | 中文名称 | 描述 | 类型 | 是否必须? | 取值范围 | 默认值 | | —- | —- | —- | —- | —- | —- | —- |
| clause | 运算语句 | 运算语句 | String | ✓ | | |
| sessionGapTime | 会话窗口间隔大小 | 会话窗口间隔大小 | String | ✓ | | |
| timeCol | 时间戳列(TimeStamp) | 时间戳列(TimeStamp) | String | ✓ | 所选列类型为 [TIMESTAMP] | |
| groupCols | 分组列名数组 | 分组列名,多列,可选,默认不选 | String[] | | | null |
| latency | 水位线的延迟 | 水位线的延迟,默认0.0 | Double | | | 0.0 |
| watermarkType | 水位线的类别 | 水位线的类别 | String | | “PERIOD”, “PUNCTUATED” | “PERIOD” |
代码示例
Python 代码
from pyalink.alink import *
import pandas as pd
useLocalEnv(1)
sourceFrame = pd.DataFrame([
[0, 0, 0, 1],
[0, 2, 0, 2],
[0, 1, 1, 3],
[0, 3, 1, 4],
[0, 3, 3, 5],
[0, 0, 3, 6],
[0, 0, 4, 7],
[0, 3, 4, 8],
[0, 1, 2, 9],
[0, 2, 2, 10],
])
streamSource = StreamOperator.fromDataframe(sourceFrame,schemaStr="user int, device long, ip long, timeCol long")
op = SessionTimeWindowStreamOp()\
.setTimeCol("timeCol")\
.setSessionGapTime(60)\
.setLatency(180)\
.setGroupCols(["user"])\
.setClause("count_preceding(ip) as countip")
streamSource.select('user, device, ip, to_timestamp(timeCol) as timeCol').link(op).print()
StreamOperator.execute()
Java 代码
package com.alibaba.alink.operator.stream.feature;
import org.apache.flink.types.Row;
import com.alibaba.alink.operator.stream.StreamOperator;
import com.alibaba.alink.operator.stream.source.MemSourceStreamOp;
import com.alibaba.alink.testutil.AlinkTestBase;
import org.junit.Test;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.List;
public class SessionTimeWindowStreamOpTest extends AlinkTestBase {
@Test
public void test() throws Exception {
List <Row> sourceFrame = Arrays.asList(
Row.of(0, 0, 0, new Timestamp(1000L)),
Row.of(0, 2, 0, new Timestamp(2000L)),
Row.of(0, 1, 1, new Timestamp(3000L)),
Row.of(0, 3, 1, new Timestamp(4000L)),
Row.of(0, 3, 3, new Timestamp(5000L)),
Row.of(0, 0, 3, new Timestamp(7000L)),
Row.of(0, 0, 4, new Timestamp(8000L)),
Row.of(0, 3, 4, new Timestamp(9000L)),
Row.of(0, 1, 2, new Timestamp(10000L))
);
StreamOperator <?> source = new MemSourceStreamOp(
sourceFrame, new String[] {"user", "device", "ip", "ts"});
source.link(
new SessionTimeWindowStreamOp()
.setTimeCol("ts")
.setSessionGapTime("2s")
.setGroupCols("user")
.setClause("SESSION_START() as start_time, SESSION_END() as end_time, count_preceding(ip) as count_ip")
).print();
StreamOperator.execute();
}
}
运行结果
| user | countip | | —- | —- |
| 0 | 9 |