处理时间
【TableAPI】DataStream 到 Table 转换时定义
处理时间属性可以在 schema 定义的时候用 .proctime 后缀来定义。
时间属性一定不能定义在一个已有字段上,
所以它只能定义在 schema 定义的最后
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource
env.fromElements_(_new WaterSensor(“sensor_1”, 1000L, 10),
new WaterSensor(“sensor_1”, 2000L, 20),
new WaterSensor(“sensor_2”, 3000L, 30),
new WaterSensor(“sensor_1”, 4000L, 40),
new WaterSensor(“sensor_1”, 5000L, 50),
new WaterSensor(“sensor_2”, 6000L, 60));
// 1. 创建表的执行环境
_StreamTableEnvironment tableEnv = StreamTableEnvironment._create(env);
// 声明一个额外的字段来作为处理时间字段
_Table sensorTable = tableEnv.fromDataStream**(**waterSensorStream, $**(“id”)**, $**(“ts”)**, $**(“vc”)**, $**(“pt”).proctime()_)**;
sensorTable.print();
env.execute(); | | —- |
【SQLAPI】在创建表的 DDL 中定义
| public class Flink06TableApi_ProcessTime **{
public static void main(String[] args) throws Exception {
**StreamExecutionEnvironment env = StreamExecutionEnvironment._getExecutionEnvironment();
env.setParallelism(1);
// 1. 创建表的执行环境
_StreamTableEnvironment tableEnv = StreamTableEnvironment._create(env);
// 创建表, 声明一个额外的列作为处理时间
_tableEnv.executeSql**(“create table sensor(id string,ts bigint,vc int,pt_time as PROCTIME()) with(“
+ “‘connector’ = ‘filesystem’,”
+ “‘path’ = ‘input/sensor.txt’,”
+ “‘format’ = ‘csv’”
+ “)”)_**;
TableResult result = tableEnv.executeSql**_(_"select * from sensor"_)_**;<br /> result.print**_()_**;<br /> **_}<br />__}_** |
| —- |
事件时间
【TableAPI】DataStream 到 Table 转换时定义
事件时间属性可以用 .rowtime 后缀在定义 DataStream schema 的时候来定义。
时间戳和 watermark 在这之前一定是在 DataStream 上已经定义好了。
在从 DataStream 到 Table 转换时定义事件时间属性有两种方式。取决于用 .rowtime 后缀修饰的字段名字是否是已有字段,事件时间字段可以是:
①在 schema 的结尾追加一个新的字段
②替换一个已经存在的字段。
不管在哪种情况下,事件时间字段都表示 DataStream 中定义的事件的时间戳。
| import static org.apache.flink.table.api.Expressions.$;
public class Flink07_TableApi_EventTime **{
public static void main(String[] args) throws Exception {
**StreamExecutionEnvironment env = StreamExecutionEnvironment._getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator
.fromElements_(_new WaterSensor(“sensor_1”, 1000L, 10),
new WaterSensor(“sensor_1”, 2000L, 20),
new WaterSensor(“sensor_2”, 3000L, 30),
new WaterSensor(“sensor_1”, 4000L, 40),
new WaterSensor(“sensor_1”, 5000L, 50),
new WaterSensor(“sensor_2”, 6000L, 60))
__ .assignTimestampsAndWatermarks(
__ WatermarkStrategy
.
__ .withTimestampAssigner((element, recordTimestamp) -> element.getTs())
__ );
StreamTableEnvironment tableEnv = StreamTableEnvironment._create_**_(_**env**_)_**;<br /> Table table = tableEnv<br /> _// 用一个额外的字段作为事件时间属性_ _.fromDataStream**_(_**waterSensorStream, _$_**_(_"id"_)_**, _$_**_(_"ts"_)_**, _$_**_(_"vc"_)_**, _$_**_(_"et"_)_**.rowtime**_()__)_**;<br /> table.execute**_()_**.print**_()_**;<br /> env.execute**_()_**;<br /> **_}<br />__}_**
// 使用已有的字段作为时间属性
.fromDataStream(waterSensorStream, $(“id”), $(“ts”).rowtime(), $(“vc”)); |
| —- |
【SQLAPI】在创建表的 DDL 中定义
事件时间属性可以用 WATERMARK 语句在 CREATE TABLE DDL 中进行定义。
WATERMARK 语句在一个已有字段上定义一个 watermark 生成表达式,同时标记这个已有字段为时间属性字段.
通过:、
from_unixtime(ts/1000,’yyyy-MM-dd HH:mm:ss’)函数,将long型时间戳转化为标准时间格式,
需要指定时区: tenv.getConfig().setLocalTimeZone(_ZoneOffset._ofHours(_0));
之后通过to_timestamp将标准时间格式转化为timestamp(3) 类型_
| public class Flink07TableApi_EventTime_2 **{
public static void main(String[] args) throws Exception {
**StreamExecutionEnvironment env = StreamExecutionEnvironment._getExecutionEnvironment();
env.setParallelism(1);
// 设置东八区时区<br /> tenv.getConfig_()_.setLocalTimeZone_(_ZoneOffset._ofHours(_0_))_;StreamTableEnvironment tEnv = StreamTableEnvironment._create_**_(_**env**_)_**;<br /> _// 作为事件时间的字段必须是 timestamp 类型, 所以根据 long 类型的 ts 计算出来一个 t**(long->date->timestamp)**_ _tEnv.executeSql**_(_"create table sensor(" **+<br /> **"id string," **+<br /> **"ts bigint," **+<br /> **"vc int, " **+<br /> // 设置时间戳 ,**参数的单位要求是秒**<br /> **"t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," **+<br /> // 设置水印<br /> **"watermark for t as t - interval '5' second)" **+<br /> **"with("<br /> **+ **"'connector' = 'filesystem',"<br /> **+ **"'path' = 'input/sensor.txt',"<br /> **+ **"'format' = 'csv'"<br /> **+ **")"_)_**;tEnv.sqlQuery**_(_"select * from sensor"_)_**.execute**_()_**.print**_()_**;<br /> // 等价于tEnv.executeInsert()<br /> **_}<br />__}_** |
| —- |
说明:
1. 把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3),且是 schema 中的顶层列,它也可以是一个计算列。
2. 严格递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column。
3. 递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘0.001’ SECOND。
4. 乱序时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘string’ timeUnit。
