本篇幅介绍Flink Table/SQL中如何自定义一个表函数(TableFunction),介绍其基本用法以及与源码结合分析其调用流程。

基本使用

表函数TableFunction相对标量函数ScalarFunction一对一,它是一个一对多的情况,通常使用TableFunction来完成列转行的一个操作。先通过一个实际案例了解其用法:终端设备上报数据,数据类型包含温度、耗电量等,上报方式是以多条方式上报,例如:
Flink SQL 中TableFunction使用分析 - 图1
现在希望得到如下数据格式:
Flink SQL 中TableFunction使用分析 - 图2
这是一个典型的列转行或者一行转多行的场景,需要将data列进行拆分成为多行多列,先看下代码实现:

  1. public class MyUDTF extends TableFunction<Row>{
  2. public void eval(String s){
  3. JSONArray jsonArray =JSONArray.parseArray(s);
  4. for(int i =0; i < jsonArray.size(); i++){
  5. JSONObject jsonObject = jsonArray.getJSONObject(i);
  6. String type = jsonObject.getString("type");
  7. String value = jsonObject.getString("value");
  8. collector.collect(Row.of(type, value));
  9. }
  10. }
  11. @Overridepublic TypeInformation<Row> getResultType(){
  12. returnTypes.ROW(Types.STRING(),Types.STRING());
  13. }
  14. }

在MyUDTF中继承了TableFunction<T>, 所有的自定义表函数都必须继承该抽象类,其中T表示返回的数据类型,通常如果是原子类型则直接指定例如String, 如果是复合类型通常会选择Row, FlinkSQL 通过类型提取可以自动识别返回的类型,如果识别不了需要重载其getResultType方法,指定其返回的TypeInformation,重点看下eval 方法定义:

  • eval 方法, 处理数据的方法,必须声明为public/not static,并且该方法可以重载,会自动根据不同的输入参数选择对应的eval, 在eval方法里面可以使用collector对象将数据发送出去,该对象是从TableFunction继承过来的。

调用如下:

  1. def main(args:Array[String]):Unit={
  2. val env =StreamExecutionEnvironment.getExecutionEnvironment
  3. val tabEnv =TableEnvironment.getTableEnvironment(env)
  4. tabEnv.registerFunction("udtf",newMyUDTF)
  5. val kafkaConfig =newProperties();
  6. kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
  7. kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");
  8. val consumer =newFlinkKafkaConsumer[String]("topic1",newSimpleStringSchema(), kafkaConfig);
  9. val ds:DataStream[(String, java.lang.Long,String)]= env.addSource(consumer)
  10. .map(x =>{
  11. val obj = JSON.parseObject(x, classOf[RawData])
  12. Tuple3.apply(obj.devId, obj.time, obj.data)
  13. })
  14. tabEnv.registerDataStream("tbl1", ds,'devId, 'time,'data)
  15. val rsTab = tabEnv.sqlQuery("select devId,`time`,`type`,`value` from tbl1 , LATERAL TABLE(udtf(data)) as t(`type`,`value`) ")
  16. .writeToSink(new PaulRetractStreamTableSink)
  17. env.execute()
  18. }

测试数据:

  1. {"devid":"dev01","time":1574944573000,"data":[{"type":"temperature","value":"10"},{"type":"battery","value":"1"}]}

得到结果:

  1. 3>(true,dev01,1574944573000,temperature,10)
  2. 3>(true,dev01,1574944573000,battery,1)

至此拿到了符合要求的数据。在Flink SQL中使用TableFunction需要搭配LATERAL TABLE一起使用,将其认为是一张虚拟的表,整个过程就是一个Join with Table Function过程,左表(tbl1) 会join 右表(t1) 的每一条记录。但是也存在另外一种情况右表(t1)没有输出但是也需要左表输出那么可以使用LEFT JOIN LATERAL TABLE,用法如下:

  1. SELECT users, tag
  2. FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE

对于右表没有输出会自动补上null。

源码分析

在介绍源码分析之前先安利一个小技巧,很多时候比较难找到Flink SQL解析之后的任务具体执行过程,这个时候可以通过先打印其执行计划,使用方式:

  1. println(tabEnv.explain(rsTab))

就可以得到其抽象语法树、逻辑执行计划、物理执行计划:

  1. ==AbstractSyntaxTree==
  2. LogicalProject(devId=[$0], time=[$1], type=[$3], value=[$4])
  3. LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{2}])
  4. LogicalTableScan(table=[[tbl1]])
  5. LogicalTableFunctionScan(invocation=[udtf($cor0.data)], rowType=[RecordType(VARCHAR(65536) f0, VARCHAR(65536) f1)], elementType=[class[Ljava.lang.Object;])
  6. ==OptimizedLogicalPlan==
  7. DataStreamCalc(select=[devId, time, f0 AS type, f1 AS value])
  8. DataStreamCorrelate(invocation=[udtf($cor0.data)], correlate=[table(udtf($cor0.data))],select=[devId, time, data, f0, f1], rowType=[RecordType(VARCHAR(65536) devId, BIGINT time, VARCHAR(65536) data, VARCHAR(65536) f0, VARCHAR(65536) f1)], joinType=[INNER])
  9. DataStreamScan(table=[[tbl1]])
  10. ==PhysicalExecutionPlan==
  11. Stage1:DataSource
  12. content : collect elements withCollectionInputFormat
  13. Stage2:Operator
  14. content :Map
  15. ship_strategy : FORWARD
  16. Stage3:Operator
  17. content :from:(devId, time, data)
  18. ship_strategy : FORWARD
  19. ...........

可以从逻辑执行计划入手,Table Function Join 对应DataStreamCorrelate,重点在于其translateToPlan方法:

  • generateFunction 调用,生成一个ProcessFunction函数,内部封装用户自定义的TableFunction, 在该ProcessFunction里面会调用TableFunction的eval方法,由于该Function是动态生成的,可以通过debug方法查看,这里感受一下在processElement里面调用eval的代码:
  1. function_udf$MyUDTF$086f769e79e46e52752c8500480e4b32.eval(isNull$21 ?null:(java.lang.String) result$20);
  • generateCollector调用,生成的是一个TableFunctionCollector 类型的collector,这部分也是动态生成的
  • CRowCorrelateProcessRunner 也是一个ProcessFunction, 内部包含了generateFunction生成的function 与generateCollector生成的collector, 在其初始化open的时候会将该collector赋给function

接下来从CRowCorrelateProcessRunner的processElement方法看整个调用流程:

  1. cRowWrapper.out=out
  2. cRowWrapper.setChange(in.change)
  3. collector.setCollector(cRowWrapper)
  4. collector.setInput(in.row)//重点input 信息设置到动态生成的collector
  5. collector.reset()
  6. function.processElement(
  7. in.row,
  8. ctx.asInstanceOf[ProcessFunction[Row,Row]#Context],
  9. cRowWrapper)

这步调用动态生成的function, 在其processElement里面调用eval方法,eval 会调用动态生成的collector,这个步骤就可以理解为是一个join过程, 最终输出组合数据。