标量函数ScalarFunction:一对一
- 输入一行 输出一行 ```java // 注册函数 // createTemporarySystemFunction : 在全局注册 // createTemporaryFunction : 在当前catalog+database注册 tableEnv.createTemporarySystemFunction(“MyHash”, MyHash.class);
//TableAPI tableEnv.from(“MyTable”) .select($(“user”), call(“MyHash”, $(“myField”))); //SQL tableEnv.sqlQuery(“SELECT MyHash(myField) FROM MyTable”);
// 实现自定义的ScalarFunction public static class MyHash extends ScalarFunction { // 实现eval方法 // @DataTypeHint(inputGroup = InputGroup.ANY) 表示任何类型 public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o){ return o.hashCode(); } }
<a name="PV10P"></a>### 表函数TableFunction:一对多- 输入一行 输出一个或多个输入:sensor_6,123,123 <br />输出:<br />sensor_6,123,123,sensor,6<br />sensor_6,123,123,6,1```java// 需要在环境中注册UDFtableEnv.createTemporarySystemFunction("MySplit", MySplit.class);//TableAPITable resultTable = sensorTable.joinLateral("split(id) as (word, length)").select("id, ts, word, length");//SQLselectuser,url,word,lengthfrom EventTable,LATERAL TABLE( MySplit(url) ) AS T(word, length)// 实现自定义TableFunction// 注意有泛型,这里输出的是两个字段,二元组// DataTypeHint: 为TableAPI使用而手动声明的类型@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))public static class MySplit extends TableFunction<Tuple2<String, Integer>>{ //泛型是输出类型// 必须实现一个eval方法,没有返回值,按照?分割public void eval( String str ){for( String s: str.split("\\?") ){collect(new Tuple2<>(s, s.length()));}}}
聚合函数AggregateFunction:多对一
- 输入多行
- AggregationFunction要求必须实现的方法:
- createAccumulator()
- accumulate() :与之前的 eval()类似,也是底层架构要求的,必须为 public, 方法名必须为 accumulate,且无法直接 override、只能手动实现
- getValue()
- AggregateFunction 的工作原理如下:
- 首先,它需要一个累加器(Accumulator),用来保存聚合中间结果的数据结构;可以通过调用 createAccumulator() 方法创建空累加器
- 随后,对每个输入行调用函数的 accumulate() 方法来更新累加器
- 处理完所有行后,将调用函数的 getValue() 方法来计算并返回最终结果
```java
//TableAPI
Table resultTable = sensorTable
.groupBy(“id”)
.aggregate(“avgTemp(temp) as avgtemp” )
.select(“id, avgtemp”);
//SQL
tableEnv.createTemporaryView(“sensor”, sensorTable);
Table resultSqlTable = tableEnv.sqlQuery(
"select " +"id, " +"avgTemp(temp) " +"from sensor " +"group by id");
// 实现自定义的AggregateFunction
//
public void accumulate( Tuple2<Double, Integer> accumulator, Double temp ){accumulator.f0 += temp;accumulator.f1 += 1;}@Overridepublic Double getValue(Tuple2<Double, Integer> accumulator) {return accumulator.f0 / accumulator.f1;}
}
<a name="xnbUu"></a>### 表聚合函数TableAggregateFunction:多对多- 输入多行 输出多行,典型应用场景TOPN- 输入:sensor_1,1547718199,35.8<br /> sensor_1,1547718207,36.3<br /> sensor_1,1547718212,37.1- 输出:1> (true,sensor_1,35.8,1)1> (true,sensor_1,4.9E-324,2)<br />1> (false,sensor_1,35.8,1)<br />1> (false,sensor_1,4.9E-324,2)<br />1> (true,sensor_1,37.1,1)<br />1> (true,sensor_1,35.8,2)<br />1> (false,sensor_1,37.1,1)<br />1> (false,sensor_1,35.8,2)<br />1> (true,sensor_1,37.1,1)<br />1> (true,sensor_1,36.3,2)- AggregationFunction 要求必须实现的方法:- createAccumulator()- accumulate()- emitValue() :无法override 手动实现- TableAggregateFunction 的工作原理如下:- 首先,它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据结构。通过调用 createAccumulator() 方法可以创建空累加器。- 随后,对每个输入行调用函数的 accumulate() 方法来更新累加器。- 处理完所有行后,将调用函数的 emitValue() 方法来计算并返回最终结果。```java// 创建一个表聚合函数实例Top2Temp top2Temp = new Top2Temp();tableEnv.registerFunction("top2Temp", top2Temp);// table api使用Table resultTable = sensorTable.groupBy("id").flatAggregate("top2Temp(temperature) as (temp, rank)").select("id, temp, rank");// SQL中没有表聚合函数的使用方式// 表聚合函数需要使用的累加器public static class Top2TempAcc {double highestTemp = Double.MIN_VALUE;double secondHighestTemp = Double.MIN_VALUE;}// 自定义表聚合函数public static class Top2Temp extends TableAggregateFunction<Tuple2<Double,Integer>, Top2TempAcc> {@Overridepublic Top2TempAcc createAccumulator() {return new Top2TempAcc();}public void accumulate(Top2TempAcc acc, Double temp) {if (temp > acc.highestTemp) {acc.secondHighestTemp = acc.highestTemp;acc.highestTemp = temp;} else if (temp > acc.secondHighestTemp) {acc.secondHighestTemp = temp;}}public void emitValue(Top2TempAcc acc, Collector<Tuple2<Double, Integer>> out) {out.collect(new Tuple2<>(acc.highestTemp, 1));out.collect(new Tuple2<>(acc.secondHighestTemp, 2));}}
