标量函数ScalarFunction:一对一

  • 输入一行 输出一行 ```java // 注册函数 // createTemporarySystemFunction : 在全局注册 // createTemporaryFunction : 在当前catalog+database注册 tableEnv.createTemporarySystemFunction(“MyHash”, MyHash.class);

//TableAPI tableEnv.from(“MyTable”) .select($(“user”), call(“MyHash”, $(“myField”))); //SQL tableEnv.sqlQuery(“SELECT MyHash(myField) FROM MyTable”);

// 实现自定义的ScalarFunction public static class MyHash extends ScalarFunction { // 实现eval方法 // @DataTypeHint(inputGroup = InputGroup.ANY) 表示任何类型 public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o){ return o.hashCode(); } }

  1. <a name="PV10P"></a>
  2. ### 表函数TableFunction:一对多
  3. - 输入一行 输出一个或多个
  4. 输入:sensor_6,123,123 <br />输出:<br />sensor_6,123,123,sensor,6<br />sensor_6,123,123,6,1
  5. ```java
  6. // 需要在环境中注册UDF
  7. tableEnv.createTemporarySystemFunction("MySplit", MySplit.class);
  8. //TableAPI
  9. Table resultTable = sensorTable
  10. .joinLateral("split(id) as (word, length)")
  11. .select("id, ts, word, length");
  12. //SQL
  13. select
  14. user,
  15. url,
  16. word,
  17. length
  18. from EventTable,
  19. LATERAL TABLE( MySplit(url) ) AS T(word, length)
  20. // 实现自定义TableFunction
  21. // 注意有泛型,这里输出的是两个字段,二元组
  22. // DataTypeHint: 为TableAPI使用而手动声明的类型
  23. @FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
  24. public static class MySplit extends TableFunction<Tuple2<String, Integer>>{ //泛型是输出类型
  25. // 必须实现一个eval方法,没有返回值,按照?分割
  26. public void eval( String str ){
  27. for( String s: str.split("\\?") ){
  28. collect(new Tuple2<>(s, s.length()));
  29. }
  30. }
  31. }

聚合函数AggregateFunction:多对一

  • 输入多行
  • AggregationFunction要求必须实现的方法:
    • createAccumulator()
    • accumulate() :与之前的 eval()类似,也是底层架构要求的,必须为 public, 方法名必须为 accumulate,且无法直接 override、只能手动实现
    • getValue()
  • AggregateFunction 的工作原理如下:
    • 首先,它需要一个累加器(Accumulator),用来保存聚合中间结果的数据结构;可以通过调用 createAccumulator() 方法创建空累加器
    • 随后,对每个输入行调用函数的 accumulate() 方法来更新累加器
    • 处理完所有行后,将调用函数的 getValue() 方法来计算并返回最终结果 ```java //TableAPI Table resultTable = sensorTable .groupBy(“id”) .aggregate(“avgTemp(temp) as avgtemp” ) .select(“id, avgtemp”); //SQL tableEnv.createTemporaryView(“sensor”, sensorTable); Table resultSqlTable = tableEnv.sqlQuery(
      1. "select " +
      2. "id, " +
      3. "avgTemp(temp) " +
      4. "from sensor " +
      5. "group by id");

// 实现自定义的AggregateFunction //> : <输出类型,累加器类型> public static class AvgTemp extends AggregateFunction>{ @Override public Tuple2 createAccumulator() { return new Tuple2<>(0.0, 0);// 温度总和,个数 }

  1. public void accumulate( Tuple2<Double, Integer> accumulator, Double temp ){
  2. accumulator.f0 += temp;
  3. accumulator.f1 += 1;
  4. }
  5. @Override
  6. public Double getValue(Tuple2<Double, Integer> accumulator) {
  7. return accumulator.f0 / accumulator.f1;
  8. }

}

  1. <a name="xnbUu"></a>
  2. ### 表聚合函数TableAggregateFunction:多对多
  3. - 输入多行 输出多行,典型应用场景TOPN
  4. - 输入:sensor_1,1547718199,35.8<br /> sensor_1,1547718207,36.3<br /> sensor_1,1547718212,37.1
  5. - 输出:1> (true,sensor_1,35.8,1)
  6. 1> (true,sensor_1,4.9E-324,2)<br />1> (false,sensor_1,35.8,1)<br />1> (false,sensor_1,4.9E-324,2)<br />1> (true,sensor_1,37.1,1)<br />1> (true,sensor_1,35.8,2)<br />1> (false,sensor_1,37.1,1)<br />1> (false,sensor_1,35.8,2)<br />1> (true,sensor_1,37.1,1)<br />1> (true,sensor_1,36.3,2)
  7. - AggregationFunction 要求必须实现的方法:
  8. - createAccumulator()
  9. - accumulate()
  10. - emitValue() :无法override 手动实现
  11. - TableAggregateFunction 的工作原理如下:
  12. - 首先,它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据结构。通过调用 createAccumulator() 方法可以创建空累加器。
  13. - 随后,对每个输入行调用函数的 accumulate() 方法来更新累加器。
  14. - 处理完所有行后,将调用函数的 emitValue() 方法来计算并返回最终结果。
  15. ```java
  16. // 创建一个表聚合函数实例
  17. Top2Temp top2Temp = new Top2Temp();
  18. tableEnv.registerFunction("top2Temp", top2Temp);
  19. // table api使用
  20. Table resultTable = sensorTable
  21. .groupBy("id")
  22. .flatAggregate("top2Temp(temperature) as (temp, rank)")
  23. .select("id, temp, rank");
  24. // SQL中没有表聚合函数的使用方式
  25. // 表聚合函数需要使用的累加器
  26. public static class Top2TempAcc {
  27. double highestTemp = Double.MIN_VALUE;
  28. double secondHighestTemp = Double.MIN_VALUE;
  29. }
  30. // 自定义表聚合函数
  31. public static class Top2Temp extends TableAggregateFunction<Tuple2<Double,Integer>, Top2TempAcc> {
  32. @Override
  33. public Top2TempAcc createAccumulator() {
  34. return new Top2TempAcc();
  35. }
  36. public void accumulate(Top2TempAcc acc, Double temp) {
  37. if (temp > acc.highestTemp) {
  38. acc.secondHighestTemp = acc.highestTemp;
  39. acc.highestTemp = temp;
  40. } else if (temp > acc.secondHighestTemp) {
  41. acc.secondHighestTemp = temp;
  42. }
  43. }
  44. public void emitValue(Top2TempAcc acc, Collector<Tuple2<Double, Integer>> out) {
  45. out.collect(new Tuple2<>(acc.highestTemp, 1));
  46. out.collect(new Tuple2<>(acc.secondHighestTemp, 2));
  47. }
  48. }