Java 类名:com.alibaba.alink.operator.stream.feature.OverCountWindowStreamOp
Python 类名:OverCountWindowStreamOp
功能介绍
OverCount窗口是Over窗口的一种,基于OverWindow,使用聚合函数进行流式特征构造。给定一行数据,将生成的特征追加在后面,输出一行数据,生成方式有clause(表达式决定)。
- clause语句的形式,通过聚合函数进行操作。其中clause语法和flink sql一致,计算逻辑也和flink overwindow一致。
- 依据指定列进行groupBy,在用户指定的窗口区间内,按照clause指定的方式进行计算。
Clause
clause当前支持全部flink支持的聚合函数,并在此基础上额外支持了一系列聚合函数。
详细用法请参考 https://www.yuque.com/pinshu/alink_tutorial/list_aggregate_function窗口
Alink支持的窗口, 其中Group窗口是输出窗口聚合统计量,OVER窗口是给定一行数据,将窗口特征追加到数据后面,输出带特征的一行数据。 各窗口的详细用法请参考 https://www.yuque.com/pinshu/alink_guide/dffffm参数说明
| 名称 | 中文名称 | 描述 | 类型 | 是否必须? | 取值范围 | 默认值 | | —- | —- | —- | —- | —- | —- | —- |
| clause | 运算语句 | 运算语句 | String | ✓ | | |
| timeCol | 时间戳列(TimeStamp) | 时间戳列(TimeStamp) | String | ✓ | 所选列类型为 [TIMESTAMP] | |
| groupCols | 分组列名数组 | 分组列名,多列,可选,默认不选 | String[] | | | null |
| latency | 水位线的延迟 | 水位线的延迟,默认0.0 | Double | | | 0.0 |
| precedingRows | 数据窗口大小 | 数据窗口大小 | Integer | | | null |
| reservedCols | 算法保留列名 | 算法保留列 | String[] | | | null |
| watermarkType | 水位线的类别 | 水位线的类别 | String | | “PERIOD”, “PUNCTUATED” | “PERIOD” |
代码示例
Python 代码
from pyalink.alink import *
import pandas as pd
useLocalEnv(1)
sourceFrame = pd.DataFrame([
[0, 0, 0, 1],
[0, 2, 0, 2],
[0, 1, 1, 3],
[0, 3, 1, 4],
[0, 3, 3, 5],
[0, 0, 3, 6],
[0, 0, 4, 7],
[0, 3, 4, 8],
[0, 1, 2, 9],
[0, 2, 2, 10],
])
source = StreamOperator.fromDataframe(sourceFrame,schemaStr="user int, device long, ip long, timeCol long")
op = OverCountWindowStreamOp().setTimeCol("timeCol").setPrecedingRows(10).setGroupCols(["user"]).setClause("count_preceding(ip) as countip")
source.select('user, device, ip, to_timestamp(timeCol) as timeCol').link(op).print()
StreamOperator.execute()
Java 代码
import org.apache.flink.types.Row;
import com.alibaba.alink.operator.stream.StreamOperator;
import com.alibaba.alink.operator.stream.feature.OverCountWindowStreamOp;
import com.alibaba.alink.operator.stream.source.MemSourceStreamOp;
import com.alibaba.alink.operator.stream.sql.SqlCmdStreamOp;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
public class OverCountWindowStreamOpTest {
@Test
public void testOverCountWindowStreamOp() throws Exception {
List <Row> sourceFrame = Arrays.asList(
Row.of(0, 0, 0, 1L),
Row.of(0, 2, 0, 2L),
Row.of(0, 1, 1, 3L),
Row.of(0, 3, 1, 4L),
Row.of(0, 3, 3, 5L),
Row.of(0, 0, 3, 6L),
Row.of(0, 0, 4, 7L),
Row.of(0, 3, 4, 8L),
Row.of(0, 1, 2, 9L),
Row.of(0, 2, 2, 10L)
);
StreamOperator <?> streamSource = new MemSourceStreamOp(sourceFrame,
"user int, device int, ip int, timeCol long");
StreamOperator <?> op = new OverCountWindowStreamOp().setTimeCol("timeCol").setPrecedingRows(10)
.setGroupCols("user").setClause("count_preceding(ip) as countip");
streamSource.select("user, device, ip, to_timestamp(timeCol) as timeCol").link(op).print();
StreamOperator.execute();
}
}
运行结果
| user | device | ip | timeCol | countip | | —- | —- | —- | —- | —- |
| 0 | 0 | 0 | 1970-01-01 08:00:00.001 | 0 |
| 0 | 2 | 0 | 1970-01-01 08:00:00.002 | 1 |
| 0 | 1 | 1 | 1970-01-01 08:00:00.003 | 2 |
| 0 | 3 | 1 | 1970-01-01 08:00:00.004 | 3 |
| 0 | 3 | 3 | 1970-01-01 08:00:00.005 | 4 |
| 0 | 0 | 3 | 1970-01-01 08:00:00.006 | 5 |
| 0 | 0 | 4 | 1970-01-01 08:00:00.007 | 6 |
| 0 | 3 | 4 | 1970-01-01 08:00:00.008 | 7 |
| 0 | 1 | 2 | 1970-01-01 08:00:00.009 | 8 |
| 0 | 2 | 2 | 1970-01-01 08:00:00.01 | 9 |