UDF

  1. spark内置的函数: [http://spark.apache.org/docs/latest/sql-ref-functions-builtin.html](http://spark.apache.org/docs/latest/sql-ref-functions-builtin.html)
  2. 允许自定义UDF <br />①编写函数<br /> ②在spark中注册函数<br /> ③使用函数
  1. @Test
  2. def testCustomUDF():Unit={
  3. //①编写函数
  4. def addAge(age:Int) :Int={
  5. age + 10
  6. }
  7. // ②在spark中注册函数 addAge _ 获取当前函数的引用,
  8. 其中,"addAge10"随便写,addAge _ 根据编写的函数写
  9. //session.udf.register("addAge10",addAge _)
  10. //使用匿名函数
  11. session.udf.register("addAge10",(age:Int) => age + 10)
  12. val dataFrame1: DataFrame = session.read.json("input/people.json")
  13. dataFrame1.createTempView("emps")
  14. //帮我将age + 10
  15. session.sql("select name,addAge10(age) from emps").show()
  16. }

UDAF

  1. org.apache.spark.sql.expressions.Aggregator[-IN, BUF, OUT]<br /> UDAF 输入: NN列,输出 11列<br /> IN: 输入的类型<br /> BUF: 缓冲区的类型<br /> OUT :函数输出的类型<br /> IN age:Int<br /> BUF: 1)使用元祖 ( sum:Double , count:Int ) <br /> 2)使用样例类封装需要保存的字段:MyBuf(sum:Double , count:Int)<br /> OUT avgAge:double
  2. Aggregator[IN, BUF, OUT] should now be registered as a UDF" + " via the functions.udaf(agg) method

定义聚合函数类继承 Aggregator[IN, BUF, OUT]

  1. package com.tcode
  2. import org.apache.spark.sql.expressions.Aggregator
  3. import org.apache.spark.sql.{Encoder, Encoders}
  4. /**
  5. * Created by Smexy on 2021/6/7
  6. */
  7. class MyAvg extends Aggregator[Int,MyBuf,Double]{
  8. // 初始化(构造)缓冲区
  9. override def zero: MyBuf = MyBuf(0.0 , 0)
  10. // 在Spark RDD的一个分区中,对这个分区要计算的累,进行累加,累加到缓冲区
  11. override def reduce(buffer: MyBuf, age: Int): MyBuf = {
  12. buffer.sum += age
  13. buffer.count += 1
  14. buffer
  15. }
  16. // 在Spark 不同分区的缓冲区进行合并,得到最终的缓冲区
  17. override def merge(b1: MyBuf, b2: MyBuf): MyBuf = {
  18. b1.sum += b2.sum
  19. b1.count += b2.count
  20. b1
  21. }
  22. // 返回最后的结果
  23. override def finish(reduction: MyBuf): Double = {
  24. reduction.sum / reduction.count
  25. }
  26. //提供缓冲区的Encoder
  27. override def bufferEncoder: Encoder[MyBuf] = Encoders.product[MyBuf]
  28. // //提供返回值类型的Encoder
  29. override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
  30. }
  31. case class MyBuf(var sum:Double , var count:Int)

调用UDAF

  1. @Test
  2. def testUDAF():Unit={
  3. //①定义函数
  4. val myAvg = new MyAvg()
  5. // ②注册函数
  6. session.udf.register("myavg",functions.udaf(myAvg))
  7. // ③使用函数
  8. val dataFrame1: DataFrame = session.read.json("input/people.json")
  9. dataFrame1.createTempView("emps")
  10. session.sql("select myavg(age) from emps").show()
  11. }