内置函数
参考官网: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html
自定义函数
自定义函数分类:
1) 标量函数(Scalar functions) 将标量值转换成一个新标量值;
2) 表值函数(Table functions) 将标量值转换成新的行数据;
3) 聚合函数(Aggregate functions) 将多行数据里的标量值转换成一个新标量值;
4) 表值聚合函数(Table aggregate) 将多行数据里的标量值转换成新的行数据;
异步表值函数(Async table functions) 是异步查询外部数据系统的特殊函数。
标量函数
TableAPI
// 1. 在tableApi中使用自定义函数
// 1.1 内联方式方式使用
table
.select($("f0"), call(MyUpperCase.class, $("f0")).as("u"))
.execute()
.print();
// 1.2 注册之后使用
tEnv.createTemporaryFunction("upper", MyUpperCase.class);
table
.select($("f0"), call("upper", $("f0")).as("u"))
.execute()
.print();
SQLAPI
// 2. 在sql中使用
// 只能先注册再使用
tEnv.createTemporaryFunction("upper", MyUpperCase.class);
tEnv.sqlQuery("select f0, upper(f0) u from " + table).execute().print();
package com.atguigu.chapter11_func_tableapi_sql.function;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
/**
* @Author lizhenchao@atguigu.cn
* @Date 2021/7/26 9:11
*/
public class Flink01_Function_Scala_01 {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 20000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
Table table = tEnv.fromValues("hello", "atguigu", "abc");
// 1. 在tableApi中使用自定义函数
// 1.1 内联方式方式使用
/*table
.select($("f0"), call(MyUpperCase.class, $("f0")).as("u"))
.execute()
.print();*/
// 1.2 注册之后使用
/*tEnv.createTemporaryFunction("upper", MyUpperCase.class);
table
.select($("f0"), call("upper", $("f0")).as("u"))
.execute()
.print();*/
// 2. 在sql中使用
// 只能先注册再使用
tEnv.createTemporaryFunction("upper", MyUpperCase.class);
tEnv.sqlQuery("select f0, upper(f0) u from " + table).execute().print();
}
// 函数
public static class MyUpperCase extends ScalarFunction {
public String eval(String s) {
if (s == null) {
return null;
}
return s.toUpperCase();
}
}
}
表值函数
函数
// row 是弱类型,需要使用注释告诉row内有什么类型
@FunctionHint(output = @DataTypeHint("row<w string, len int>"))
public static class SplitAndLen extends TableFunction<Row> {
public void eval(String s) {
if (s.contains("hello")) return(打断程序,不让collect写出则为null);// 如果是内连接,只有abc a b c,如果是左外连接 ,左边f0全包含,右边有hello的项都不包含
String[] words = s.split(" ");
for (String word : words) {
// 调用几次就生成几行
collect(Row.of(word, word.length()));
}
}
}
TableAPI
// 1. table
// 内联
table
//.joinLateral(call(SplitAndLen.class, $("f0"))) // 内连接
.leftOuterJoinLateral(call(SplitAndLen.class, $("f0"))) // 左外连接
.select($("f0"), $("w"), $("len"))
.execute()
.print();
-----------------------------------------------------------------------
// 注册
tEnv.createTemporaryFunction("split", SplitAndLen.class);
table
// .joinLateral(call("split", $("f0"))) // 内连接
.leftOuterJoinLateral(call(SplitAndLen.class, $("f0"))) // 左外连接
.select($("f0"), $("w"), $("len"))
.execute()
.print();
SQLAPI
// 2. sql
tEnv.createTemporaryView("t1", table);
tEnv.createTemporaryFunction("split", SplitAndLen.class);
// 内连接的写法1:
tEnv.sqlQuery("select " +
" f0," +
" w," +
" len " +
"from t1 " +
"join lateral table(split(f0)) on true")、//这个on是因为有join所以一定要有on
// "left join lateral table(split(f0)) on true")// 下面的写法不能left join
.execute()
.print();
------------------------------------------------------------
// select ... from a,b where a.id=b.id; ==== select .. from a join b on a.id=b.id
// 内连接的写法2:
tEnv.sqlQuery("select " +
" f0," +
" w1," +
" len1 " +
"from t1, " +
" lateral table(split(f0)) as t(w1,len1)") //不能用外连接,as t(w1,len1)可以不写
.execute()
.print();
结果
聚合函数
比较特别的地方是函数重写两个方法,调用时和普通聚合函数一样,group by 即可
函数
public static class MyAcc{
public Integer sum = 0;
}
public static class MySum extends AggregateFunction<Integer, MyAcc>{
// 返回最终聚合的结果
@Override
public Integer getValue(MyAcc acc) {
return acc.sum;
}
// 初始化累加器
@Override
public MyAcc createAccumulator() {
return new MyAcc();
}
// 累加
public void accumulate(MyAcc acc, Integer v){
acc.sum += v;
}
}
TableAPI
tEnv.createTemporaryFunction("my_sum", MySum.class);
table
.groupBy($("id"))
.select($("id"), call("my_sum", $("vc")).as("vc_sum"))
.execute()
.print();
SQLAPI
tEnv.createTemporaryFunction("my_sum", MySum.class);
tEnv.sqlQuery("select id, my_sum(vc) sum_vc from " + table + " group by id").execute().print();
表值聚合函数
函数
public static class FirstSecond {
public Integer first = 0;
public Integer second = 0;
}
// 普通单值类型demo
public static class Top2 extends TableAggregateFunction<String, FirstSecond> {
@Override
public FirstSecond createAccumulator() {
return new FirstSecond();
}
// 聚合函数
public void accumulate(FirstSecond acc, Integer v) {
if (v > acc.first) {
acc.second = acc.first;
acc.first = v;
} else if (v > acc.second) {
acc.second = v;
}
}
// 指标
public void emitValue(FirstSecond acc, Collector<String> collector) {
collector.collect("最大水位: " + acc.first); // 调用几次就有几行
collector.collect("第二大水位: " + acc.second); // 调用几次就有几行
}
}
-------------------------------------
// 元祖类型输出demo(pojo也是多列)
public static class Top22 extends TableAggregateFunction<Tuple2<Integer, Integer>, FirstSecond> {
@Override
public FirstSecond createAccumulator() {
return new FirstSecond();
}
// 聚合函数
public void accumulate(FirstSecond acc, Integer v) {
if (v > acc.first) {
acc.second = acc.first;
acc.first = v;
} else if (v > acc.second) {
acc.second = v;
}
}
// 制表
public void emitValue(FirstSecond acc, Collector<Tuple2<Integer, Integer>> collector) {
collector.collect(Tuple2.of(acc.first, 1)); // 调用几次就有几行
if (acc.second > 0)
collector.collect(Tuple2.of(acc.second, 2)); // 调用几次就有几行
}
}
TableAPI
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 20000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1000L, 10),
new WaterSensor("sensor_1", 2000L, 20),
new WaterSensor("sensor_2", 3000L, 30),
new WaterSensor("sensor_1", 4000L, 40),
new WaterSensor("sensor_1", 5000L, 50),
new WaterSensor("sensor_2", 6000L, 60));
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
Table table = tEnv.fromDataStream(stream);
// 接下来我们写一个自定义TableAggregateFunction,用来提取每个sensor最高的两个温度值(两行)。
table
.groupBy($("id"))
// .flatAggregate(call(Top2.class, $("vc")).as("value"))
.flatAggregate(call(Top22.class, $("vc")).as("v", "rank"))
.select($("id"), $("v"), $("rank"))
.execute()
.print();
}
SQLAPI
// 单值类型
tEnv.createTemporaryFunction("top2",Top2.class);
tEnv.sqlQuery("select id,top2(vc) to2_vc from "+ table + " group by id").execute().print();
// 元祖或pojo目前还不支持!!!!
结果:
元祖类型会自动输出为多列,但是需要指定 as (两个元素):