UDF/UDAF/UDTF 简介

第一种:UDF(User-Defined-Function) 函数
一对一的关系,输入一个值经过函数以后输出一个值;

第二种:UDAF(User-Defined Aggregation Function) 聚合函数
多对一的关系,输入多个值输出一个值,通常与groupBy联合使用;

第三种:UDTF(User-Defined Table-Generating Functions) 函数
一对多的关系,输入一个值输出多个值(一行变为多行);用户自定义生成函数,有点像flatMap;

Spark版本对函数的支持变化:

Spark版本 Spark SQL UDF(Python,Java,Scala) Spark SQL UDAF(Java、Scala) Spark SQL UDF(R) Hive UDF、UDAF、UDTF
1.1 - 1.4 ✔️

✔️
1.5 ✔️ experimental
✔️
1.6 ✔️ ✔️
✔️
2.0 ✔️ ✔️ ✔️ ✔️

UDF

Spark UDF

我们从源码知道spark提供了23个UDF相关的接口。如下图所示:
image.png
其实它们之间区别就与接口中定义参数的多少,这些udf能支持的传入的参数的个数从[0,22]分别对应每个UDF函数。

  1. /**
  2. * A Spark SQL UDF that has 0 arguments.
  3. * R:返回类型
  4. */
  5. @InterfaceStability.Stable
  6. public interface UDF0<R> extends Serializable {
  7. R call() throws Exception;
  8. }
  9. /**
  10. * A Spark SQL UDF that has 1 arguments.
  11. */
  12. @InterfaceStability.Stable
  13. public interface UDF1<T1, R> extends Serializable {
  14. R call(T1 t1) throws Exception;
  15. }
  16. /**
  17. * A Spark SQL UDF that has 2 arguments.
  18. */
  19. @InterfaceStability.Stable
  20. public interface UDF2<T1, T2, R> extends Serializable {
  21. R call(T1 t1, T2 t2) throws Exception;
  22. }

实现方式

自定义udf的方式有两种:

  • 代码内
    • SQLContext.udf.register(),SQL方式
    • 创建UserDefinedFunction,DSL方式
  • 三方jar包
    1. sparkSession.udf.register("smallToBigger", new UDF1[String,String]() { //自定义函数的名称
    2. @throws[Exception]
    3. override def call(t1: String): String = {
    4. t1.toUpperCase()
    5. }
    6. }, DataTypes.StringType) //返回值的类型
    ```scala val smallToBigger: UserDefinedFunction = udf((str: String) => t1.toUpperCase()) //使用UDF函数 sparkSession.sql(“select line, smallToBigger(line) as biggerLine from small_table”).show()
  1. ```java
  2. package udf;
  3. import org.apache.commons.lang3.StringUtils;
  4. import org.apache.spark.sql.api.java.UDF2;
  5. public class ObjectEqUtil implements UDF2<String,Object,Boolean> {
  6. @Override
  7. public Boolean call(String str,Object value) throws Exception {
  8. try {
  9. return ...;
  10. } catch (Exception var3) {
  11. return null;
  12. }
  13. }
  14. }

Hive UDF

实现方式

  1. 继承org.apache.hadoop.hive.ql.exec.UDF
  2. 实现evaluate方法(注意:该方法必须有返回值,可以为null)
  3. 将jar包上传到hdfs /xx/xx/udf/hive-udf.jar将jar包重命名为hive-udf.jar
  4. 注册成【临时/永久】函数 ```java public class CalDate extends UDF {

    @Description(name = “cal_date”,

    1. value = "_FUNC_(intervalDays) Return to dateStr",
    2. extended = "select cal_date(2)")

    public String evaluate(Integer intervalDays) {

    1. DateTime dateTime = DateUtil.offsetDay(DateUtil.date(), -intervalDays);
    2. return DateUtil.format(dateTime, "yyyyMMdd");

    } }

// 永久注册 // create function cal_date as ‘com.zcy.bigdata.history.cal.CalDate’ using jar ‘hdfs:///xx/xx/udf/hive-udf.jar’; ```

UDAF

UDAF已经过时

UDTF

其实Hive官方为我们提供了很多的UDTF函数如:explodejson_tupleget_splits等。实际上UDTF就是把一列数据转换为多列多行。
在Spark的源码中其实是没有定义UDTF相关接口的,这让我知道Spark的UDTF函数其实来自于Hive

其实现方式,等用到在整理