UDF/UDAF/UDTF 简介
第一种:UDF(User-Defined-Function) 函数
一对一的关系,输入一个值经过函数以后输出一个值;
第二种:UDAF(User-Defined Aggregation Function) 聚合函数
多对一的关系,输入多个值输出一个值,通常与groupBy联合使用;
第三种:UDTF(User-Defined Table-Generating Functions) 函数
一对多的关系,输入一个值输出多个值(一行变为多行);用户自定义生成函数,有点像flatMap;
Spark版本对函数的支持变化:
Spark版本 | Spark SQL UDF(Python,Java,Scala) | Spark SQL UDAF(Java、Scala) | Spark SQL UDF(R) | Hive UDF、UDAF、UDTF |
---|---|---|---|---|
1.1 - 1.4 | ✔️ | ✔️ | ||
1.5 | ✔️ | experimental | ✔️ | |
1.6 | ✔️ | ✔️ | ✔️ | |
2.0 | ✔️ | ✔️ | ✔️ | ✔️ |
UDF
Spark UDF
我们从源码知道spark提供了23个UDF相关的接口。如下图所示:
其实它们之间区别就与接口中定义参数的多少,这些udf能支持的传入的参数的个数从[0,22]分别对应每个UDF函数。
/**
* A Spark SQL UDF that has 0 arguments.
* R:返回类型
*/
@InterfaceStability.Stable
public interface UDF0<R> extends Serializable {
R call() throws Exception;
}
/**
* A Spark SQL UDF that has 1 arguments.
*/
@InterfaceStability.Stable
public interface UDF1<T1, R> extends Serializable {
R call(T1 t1) throws Exception;
}
/**
* A Spark SQL UDF that has 2 arguments.
*/
@InterfaceStability.Stable
public interface UDF2<T1, T2, R> extends Serializable {
R call(T1 t1, T2 t2) throws Exception;
}
实现方式
自定义udf的方式有两种:
- 代码内
- SQLContext.udf.register(),SQL方式
- 创建UserDefinedFunction,DSL方式
- 三方jar包
```scala val smallToBigger: UserDefinedFunction = udf((str: String) => t1.toUpperCase()) //使用UDF函数 sparkSession.sql(“select line, smallToBigger(line) as biggerLine from small_table”).show()sparkSession.udf.register("smallToBigger", new UDF1[String,String]() { //自定义函数的名称
@throws[Exception]
override def call(t1: String): String = {
t1.toUpperCase()
}
}, DataTypes.StringType) //返回值的类型
```java
package udf;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.api.java.UDF2;
public class ObjectEqUtil implements UDF2<String,Object,Boolean> {
@Override
public Boolean call(String str,Object value) throws Exception {
try {
return ...;
} catch (Exception var3) {
return null;
}
}
}
Hive UDF
实现方式
- 继承org.apache.hadoop.hive.ql.exec.UDF
- 实现evaluate方法(注意:该方法必须有返回值,可以为null)
- 将jar包上传到hdfs /xx/xx/udf/hive-udf.jar将jar包重命名为hive-udf.jar
注册成【临时/永久】函数 ```java public class CalDate extends UDF {
@Description(name = “cal_date”,
value = "_FUNC_(intervalDays) Return to dateStr",
extended = "select cal_date(2)")
public String evaluate(Integer intervalDays) {
DateTime dateTime = DateUtil.offsetDay(DateUtil.date(), -intervalDays);
return DateUtil.format(dateTime, "yyyyMMdd");
} }
// 永久注册 // create function cal_date as ‘com.zcy.bigdata.history.cal.CalDate’ using jar ‘hdfs:///xx/xx/udf/hive-udf.jar’; ```
UDAF
UDTF
其实Hive官方为我们提供了很多的UDTF函数如:explode
、json_tuple
、get_splits
等。实际上UDTF就是把一列数据转换为多列多行。
在Spark的源码中其实是没有定义UDTF相关接口的,这让我知道Spark的UDTF函数其实来自于Hive
其实现方式,等用到在整理