内置函数
参考官网: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html
自定义函数
自定义函数分类:
1) 标量函数(Scalar functions) 将标量值转换成一个新标量值;
2) 表值函数(Table functions) 将标量值转换成新的行数据;
3) 聚合函数(Aggregate functions) 将多行数据里的标量值转换成一个新标量值;
4) 表值聚合函数(Table aggregate) 将多行数据里的标量值转换成新的行数据;
异步表值函数(Async table functions) 是异步查询外部数据系统的特殊函数。
标量函数
TableAPI
// 1. 在tableApi中使用自定义函数// 1.1 内联方式方式使用table.select($("f0"), call(MyUpperCase.class, $("f0")).as("u")).execute().print();// 1.2 注册之后使用tEnv.createTemporaryFunction("upper", MyUpperCase.class);table.select($("f0"), call("upper", $("f0")).as("u")).execute().print();
SQLAPI
// 2. 在sql中使用// 只能先注册再使用tEnv.createTemporaryFunction("upper", MyUpperCase.class);tEnv.sqlQuery("select f0, upper(f0) u from " + table).execute().print();
package com.atguigu.chapter11_func_tableapi_sql.function;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.functions.ScalarFunction;/*** @Author lizhenchao@atguigu.cn* @Date 2021/7/26 9:11*/public class Flink01_Function_Scala_01 {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port", 20000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);Table table = tEnv.fromValues("hello", "atguigu", "abc");// 1. 在tableApi中使用自定义函数// 1.1 内联方式方式使用/*table.select($("f0"), call(MyUpperCase.class, $("f0")).as("u")).execute().print();*/// 1.2 注册之后使用/*tEnv.createTemporaryFunction("upper", MyUpperCase.class);table.select($("f0"), call("upper", $("f0")).as("u")).execute().print();*/// 2. 在sql中使用// 只能先注册再使用tEnv.createTemporaryFunction("upper", MyUpperCase.class);tEnv.sqlQuery("select f0, upper(f0) u from " + table).execute().print();}// 函数public static class MyUpperCase extends ScalarFunction {public String eval(String s) {if (s == null) {return null;}return s.toUpperCase();}}}
表值函数
函数
// row 是弱类型,需要使用注释告诉row内有什么类型@FunctionHint(output = @DataTypeHint("row<w string, len int>"))public static class SplitAndLen extends TableFunction<Row> {public void eval(String s) {if (s.contains("hello")) return(打断程序,不让collect写出则为null);// 如果是内连接,只有abc a b c,如果是左外连接 ,左边f0全包含,右边有hello的项都不包含String[] words = s.split(" ");for (String word : words) {// 调用几次就生成几行collect(Row.of(word, word.length()));}}}
TableAPI
// 1. table// 内联table//.joinLateral(call(SplitAndLen.class, $("f0"))) // 内连接.leftOuterJoinLateral(call(SplitAndLen.class, $("f0"))) // 左外连接.select($("f0"), $("w"), $("len")).execute().print();-----------------------------------------------------------------------// 注册tEnv.createTemporaryFunction("split", SplitAndLen.class);table// .joinLateral(call("split", $("f0"))) // 内连接.leftOuterJoinLateral(call(SplitAndLen.class, $("f0"))) // 左外连接.select($("f0"), $("w"), $("len")).execute().print();
SQLAPI
// 2. sqltEnv.createTemporaryView("t1", table);tEnv.createTemporaryFunction("split", SplitAndLen.class);// 内连接的写法1:tEnv.sqlQuery("select " +" f0," +" w," +" len " +"from t1 " +"join lateral table(split(f0)) on true")、//这个on是因为有join所以一定要有on// "left join lateral table(split(f0)) on true")// 下面的写法不能left join.execute().print();------------------------------------------------------------// select ... from a,b where a.id=b.id; ==== select .. from a join b on a.id=b.id// 内连接的写法2:tEnv.sqlQuery("select " +" f0," +" w1," +" len1 " +"from t1, " +" lateral table(split(f0)) as t(w1,len1)") //不能用外连接,as t(w1,len1)可以不写.execute().print();
结果
聚合函数
比较特别的地方是函数重写两个方法,调用时和普通聚合函数一样,group by 即可
函数
public static class MyAcc{public Integer sum = 0;}public static class MySum extends AggregateFunction<Integer, MyAcc>{// 返回最终聚合的结果@Overridepublic Integer getValue(MyAcc acc) {return acc.sum;}// 初始化累加器@Overridepublic MyAcc createAccumulator() {return new MyAcc();}// 累加public void accumulate(MyAcc acc, Integer v){acc.sum += v;}}
TableAPI
tEnv.createTemporaryFunction("my_sum", MySum.class);table.groupBy($("id")).select($("id"), call("my_sum", $("vc")).as("vc_sum")).execute().print();
SQLAPI
tEnv.createTemporaryFunction("my_sum", MySum.class);tEnv.sqlQuery("select id, my_sum(vc) sum_vc from " + table + " group by id").execute().print();
表值聚合函数
函数
public static class FirstSecond {public Integer first = 0;public Integer second = 0;}// 普通单值类型demopublic static class Top2 extends TableAggregateFunction<String, FirstSecond> {@Overridepublic FirstSecond createAccumulator() {return new FirstSecond();}// 聚合函数public void accumulate(FirstSecond acc, Integer v) {if (v > acc.first) {acc.second = acc.first;acc.first = v;} else if (v > acc.second) {acc.second = v;}}// 指标public void emitValue(FirstSecond acc, Collector<String> collector) {collector.collect("最大水位: " + acc.first); // 调用几次就有几行collector.collect("第二大水位: " + acc.second); // 调用几次就有几行}}-------------------------------------// 元祖类型输出demo(pojo也是多列)public static class Top22 extends TableAggregateFunction<Tuple2<Integer, Integer>, FirstSecond> {@Overridepublic FirstSecond createAccumulator() {return new FirstSecond();}// 聚合函数public void accumulate(FirstSecond acc, Integer v) {if (v > acc.first) {acc.second = acc.first;acc.first = v;} else if (v > acc.second) {acc.second = v;}}// 制表public void emitValue(FirstSecond acc, Collector<Tuple2<Integer, Integer>> collector) {collector.collect(Tuple2.of(acc.first, 1)); // 调用几次就有几行if (acc.second > 0)collector.collect(Tuple2.of(acc.second, 2)); // 调用几次就有几行}}
TableAPI
public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port", 20000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1000L, 10),new WaterSensor("sensor_1", 2000L, 20),new WaterSensor("sensor_2", 3000L, 30),new WaterSensor("sensor_1", 4000L, 40),new WaterSensor("sensor_1", 5000L, 50),new WaterSensor("sensor_2", 6000L, 60));StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);Table table = tEnv.fromDataStream(stream);// 接下来我们写一个自定义TableAggregateFunction,用来提取每个sensor最高的两个温度值(两行)。table.groupBy($("id"))// .flatAggregate(call(Top2.class, $("vc")).as("value")).flatAggregate(call(Top22.class, $("vc")).as("v", "rank")).select($("id"), $("v"), $("rank")).execute().print();}
SQLAPI
// 单值类型tEnv.createTemporaryFunction("top2",Top2.class);tEnv.sqlQuery("select id,top2(vc) to2_vc from "+ table + " group by id").execute().print();// 元祖或pojo目前还不支持!!!!
结果:
元祖类型会自动输出为多列,但是需要指定 as (两个元素):
