内置函数

参考官网: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html

自定义函数

自定义函数分类:
1) 标量函数(Scalar functions) 将标量值转换成一个新标量值;
2) 表值函数(Table functions) 将标量值转换成新的行数据;
3) 聚合函数(Aggregate functions) 将多行数据里的标量值转换成一个新标量值;
4) 表值聚合函数(Table aggregate) 将多行数据里的标量值转换成新的行数据;
异步表值函数(Async table functions) 是异步查询外部数据系统的特殊函数。

标量函数

可以将0、1或多个标量值,映射到新的标量值

TableAPI

  1. // 1. 在tableApi中使用自定义函数
  2. // 1.1 内联方式方式使用
  3. table
  4. .select($("f0"), call(MyUpperCase.class, $("f0")).as("u"))
  5. .execute()
  6. .print();
  7. // 1.2 注册之后使用
  8. tEnv.createTemporaryFunction("upper", MyUpperCase.class);
  9. table
  10. .select($("f0"), call("upper", $("f0")).as("u"))
  11. .execute()
  12. .print();

SQLAPI

  1. // 2. 在sql中使用
  2. // 只能先注册再使用
  3. tEnv.createTemporaryFunction("upper", MyUpperCase.class);
  4. tEnv.sqlQuery("select f0, upper(f0) u from " + table).execute().print();
  1. package com.atguigu.chapter11_func_tableapi_sql.function;
  2. import org.apache.flink.configuration.Configuration;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.table.api.Table;
  5. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  6. import org.apache.flink.table.functions.ScalarFunction;
  7. /**
  8. * @Author lizhenchao@atguigu.cn
  9. * @Date 2021/7/26 9:11
  10. */
  11. public class Flink01_Function_Scala_01 {
  12. public static void main(String[] args) {
  13. Configuration conf = new Configuration();
  14. conf.setInteger("rest.port", 20000);
  15. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
  16. env.setParallelism(1);
  17. StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
  18. Table table = tEnv.fromValues("hello", "atguigu", "abc");
  19. // 1. 在tableApi中使用自定义函数
  20. // 1.1 内联方式方式使用
  21. /*table
  22. .select($("f0"), call(MyUpperCase.class, $("f0")).as("u"))
  23. .execute()
  24. .print();*/
  25. // 1.2 注册之后使用
  26. /*tEnv.createTemporaryFunction("upper", MyUpperCase.class);
  27. table
  28. .select($("f0"), call("upper", $("f0")).as("u"))
  29. .execute()
  30. .print();*/
  31. // 2. 在sql中使用
  32. // 只能先注册再使用
  33. tEnv.createTemporaryFunction("upper", MyUpperCase.class);
  34. tEnv.sqlQuery("select f0, upper(f0) u from " + table).execute().print();
  35. }
  36. // 函数
  37. public static class MyUpperCase extends ScalarFunction {
  38. public String eval(String s) {
  39. if (s == null) {
  40. return null;
  41. }
  42. return s.toUpperCase();
  43. }
  44. }
  45. }

表值函数

看不懂就看视频吧

函数

  1. // row 是弱类型,需要使用注释告诉row内有什么类型
  2. @FunctionHint(output = @DataTypeHint("row<w string, len int>"))
  3. public static class SplitAndLen extends TableFunction<Row> {
  4. public void eval(String s) {
  5. if (s.contains("hello")) return(打断程序,不让collect写出则为null);// 如果是内连接,只有abc a b c,如果是左外连接 ,左边f0全包含,右边有hello的项都不包含
  6. String[] words = s.split(" ");
  7. for (String word : words) {
  8. // 调用几次就生成几行
  9. collect(Row.of(word, word.length()));
  10. }
  11. }
  12. }

TableAPI

  1. // 1. table
  2. // 内联
  3. table
  4. //.joinLateral(call(SplitAndLen.class, $("f0"))) // 内连接
  5. .leftOuterJoinLateral(call(SplitAndLen.class, $("f0"))) // 左外连接
  6. .select($("f0"), $("w"), $("len"))
  7. .execute()
  8. .print();
  9. -----------------------------------------------------------------------
  10. // 注册
  11. tEnv.createTemporaryFunction("split", SplitAndLen.class);
  12. table
  13. // .joinLateral(call("split", $("f0"))) // 内连接
  14. .leftOuterJoinLateral(call(SplitAndLen.class, $("f0"))) // 左外连接
  15. .select($("f0"), $("w"), $("len"))
  16. .execute()
  17. .print();

SQLAPI

  1. // 2. sql
  2. tEnv.createTemporaryView("t1", table);
  3. tEnv.createTemporaryFunction("split", SplitAndLen.class);
  4. // 内连接的写法1:
  5. tEnv.sqlQuery("select " +
  6. " f0," +
  7. " w," +
  8. " len " +
  9. "from t1 " +
  10. "join lateral table(split(f0)) on true")、//这个on是因为有join所以一定要有on
  11. // "left join lateral table(split(f0)) on true")// 下面的写法不能left join
  12. .execute()
  13. .print();
  14. ------------------------------------------------------------
  15. // select ... from a,b where a.id=b.id; ==== select .. from a join b on a.id=b.id
  16. // 内连接的写法2:
  17. tEnv.sqlQuery("select " +
  18. " f0," +
  19. " w1," +
  20. " len1 " +
  21. "from t1, " +
  22. " lateral table(split(f0)) as t(w1,len1)") //不能用外连接,as t(w1,len1)可以不写
  23. .execute()
  24. .print();

结果

sqlAPI 自动编码为upsert 只看最终结果即可
image.png

聚合函数

比较特别的地方是函数重写两个方法,调用时和普通聚合函数一样,group by 即可

函数

  1. public static class MyAcc{
  2. public Integer sum = 0;
  3. }
  4. public static class MySum extends AggregateFunction<Integer, MyAcc>{
  5. // 返回最终聚合的结果
  6. @Override
  7. public Integer getValue(MyAcc acc) {
  8. return acc.sum;
  9. }
  10. // 初始化累加器
  11. @Override
  12. public MyAcc createAccumulator() {
  13. return new MyAcc();
  14. }
  15. // 累加
  16. public void accumulate(MyAcc acc, Integer v){
  17. acc.sum += v;
  18. }
  19. }

TableAPI

  1. tEnv.createTemporaryFunction("my_sum", MySum.class);
  2. table
  3. .groupBy($("id"))
  4. .select($("id"), call("my_sum", $("vc")).as("vc_sum"))
  5. .execute()
  6. .print();

SQLAPI

  1. tEnv.createTemporaryFunction("my_sum", MySum.class);
  2. tEnv.sqlQuery("select id, my_sum(vc) sum_vc from " + table + " group by id").execute().print();

表值聚合函数

又有聚合,又有制表,典型例子是topN

函数

  1. public static class FirstSecond {
  2. public Integer first = 0;
  3. public Integer second = 0;
  4. }
  5. // 普通单值类型demo
  6. public static class Top2 extends TableAggregateFunction<String, FirstSecond> {
  7. @Override
  8. public FirstSecond createAccumulator() {
  9. return new FirstSecond();
  10. }
  11. // 聚合函数
  12. public void accumulate(FirstSecond acc, Integer v) {
  13. if (v > acc.first) {
  14. acc.second = acc.first;
  15. acc.first = v;
  16. } else if (v > acc.second) {
  17. acc.second = v;
  18. }
  19. }
  20. // 指标
  21. public void emitValue(FirstSecond acc, Collector<String> collector) {
  22. collector.collect("最大水位: " + acc.first); // 调用几次就有几行
  23. collector.collect("第二大水位: " + acc.second); // 调用几次就有几行
  24. }
  25. }
  26. -------------------------------------
  27. // 元祖类型输出demo(pojo也是多列)
  28. public static class Top22 extends TableAggregateFunction<Tuple2<Integer, Integer>, FirstSecond> {
  29. @Override
  30. public FirstSecond createAccumulator() {
  31. return new FirstSecond();
  32. }
  33. // 聚合函数
  34. public void accumulate(FirstSecond acc, Integer v) {
  35. if (v > acc.first) {
  36. acc.second = acc.first;
  37. acc.first = v;
  38. } else if (v > acc.second) {
  39. acc.second = v;
  40. }
  41. }
  42. // 制表
  43. public void emitValue(FirstSecond acc, Collector<Tuple2<Integer, Integer>> collector) {
  44. collector.collect(Tuple2.of(acc.first, 1)); // 调用几次就有几行
  45. if (acc.second > 0)
  46. collector.collect(Tuple2.of(acc.second, 2)); // 调用几次就有几行
  47. }
  48. }

TableAPI

  1. public static void main(String[] args) {
  2. Configuration conf = new Configuration();
  3. conf.setInteger("rest.port", 20000);
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
  5. env.setParallelism(1);
  6. DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1000L, 10),
  7. new WaterSensor("sensor_1", 2000L, 20),
  8. new WaterSensor("sensor_2", 3000L, 30),
  9. new WaterSensor("sensor_1", 4000L, 40),
  10. new WaterSensor("sensor_1", 5000L, 50),
  11. new WaterSensor("sensor_2", 6000L, 60));
  12. StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
  13. Table table = tEnv.fromDataStream(stream);
  14. // 接下来我们写一个自定义TableAggregateFunction,用来提取每个sensor最高的两个温度值(两行)。
  15. table
  16. .groupBy($("id"))
  17. // .flatAggregate(call(Top2.class, $("vc")).as("value"))
  18. .flatAggregate(call(Top22.class, $("vc")).as("v", "rank"))
  19. .select($("id"), $("v"), $("rank"))
  20. .execute()
  21. .print();
  22. }

SQLAPI

  1. // 单值类型
  2. tEnv.createTemporaryFunction("top2",Top2.class);
  3. tEnv.sqlQuery("select id,top2(vc) to2_vc from "+ table + " group by id").execute().print();
  4. // 元祖或pojo目前还不支持!!!!

结果:

元祖类型会自动输出为多列,但是需要指定 as (两个元素):
image.png