目标

创建基于scala的spark项目,编写用于spark的自定义函数(UDF),并打包成jar

环境

InteliJ IDEA 2019.1
Oracle JDK 8

1. 编译 UDF

1.1 新建项目

在InteliJ IDEA中创建新的Maven项目,在pom文件中添加如下内容

  1. <build>
  2. <plugins>
  3. <plugin>
  4. <groupId>org.apache.maven.plugins</groupId>
  5. <artifactId>maven-compiler-plugin</artifactId>
  6. <version>3.8.0</version>
  7. <configuration>
  8. <source>8</source>
  9. <target>8</target>
  10. </configuration>
  11. </plugin>
  12. <plugin>
  13. <groupId>net.alchim31.maven</groupId>
  14. <artifactId>scala-maven-plugin</artifactId>
  15. <version>4.0.1</version>
  16. <executions>
  17. <execution>
  18. <goals>
  19. <goal>compile</goal>
  20. <goal>testCompile</goal>
  21. </goals>
  22. </execution>
  23. </executions>
  24. </plugin>
  25. </plugins>
  26. </build>
  27. <dependencies>
  28. <dependency>
  29. <groupId>org.scala-lang</groupId>
  30. <artifactId>scala-library</artifactId>
  31. <version>2.11.12</version>
  32. </dependency>
  33. <dependency>
  34. <groupId>org.apache.spark</groupId>
  35. <artifactId>spark-core_2.11</artifactId>
  36. <version>2.4.3</version>
  37. </dependency>
  38. <dependency>
  39. <groupId>org.apache.spark</groupId>
  40. <artifactId>spark-sql_2.11</artifactId>
  41. <version>2.4.3</version>
  42. </dependency>
  43. </dependencies>

1.2 新建源码

在src/main文件夹下,创建新的文件夹scala,右键Mark Directory as -> Sources Root
在scala文件夹点右键,New -> Scala Class
这时就能新建一个scala源码文件了,比如新建一个class叫做main

  1. import org.apache.spark.sql.api.java.UDF2
  2. class main extends UDF2[Int,Int,Double]{
  3. override def call(t1: Int, t2: Int): Double = {
  4. return t1*t2*t2*3.14
  5. }
  6. }
  7. object run extends App{
  8. val o = new main()
  9. val r = o.call(2,3)
  10. println(r)
  11. }

写完代码点绿三角运行,可得:

  1. 56.52

以上程序中的main,继承了UDF2,表示两个输入参数的Spark UDF函数,本例两个输入是整数,输出双精度

1.3 生成

在IntelliJ IDEA窗口右侧的Maven页中,点Lifecycle->package,即可生成项目的jar包

2 使用

2.1 scala调用

基于刚刚创建的jar文件,在项目设置中,Module,Dependencies,添加Jars,并在左侧打勾,保证输出时把引入的UDF一起打包在jar内

  1. spark.sqlContext.udf.register("myudf",(a,b)=>new myudf().call(a,b))

或者直接在scala里写函数

  1. spark.sqlContext.udf.register("myudf",(a:Int,b:Int)=>1)

以上调用太……不像话了

正经的scala调用

直接把写好的继承了UDF2的类注册为udf

  1. import org.apache.spark.sql.types.DoubleType
  2. object myfunobj{
  3. def myfun():UserDefinedFunction = udf(new myudf().call _);
  4. }
  5. spark.udf.register("myfun",myfunobj.myfun)
  6. println(spark.sql("select myfun(1,2)").collect()(0))

参考链接

2.2 pyspark调用

启动前修改启动参数

  1. os.environ['PYSPARK_SUBMIT_ARGS']='--master local[*] --jars my_jar_file.jar pyspark-shell'

除了上面这种办法,也可以在启动session时加入此项conf,除了当前文件夹,也支持hdfs

  1. .config('spark.jars','hdfs://clusterHA/tmp/my_jar_file.jar')\

spark.jars 支持 hdfs,只是要在启动 Session 前配置好。
其他可以考虑的办法还有,SPARK_CLASSPATH,—driver-class-path,sc.addPyFile,conf 中的 extraClassPath extraLibraryPath 等,注意有的方式只添加包不添加 CLASSPATH、

启动 Session 后使用如下语句注册函数

  1. spark.udf.registerJavaFunction('spark_fun_name','package.class','type_of_output')

具体来说,就是

  1. spark.udf.registerJavaFunction('my_udf','myudf',DoubleType)

其中的DoubleType可以省略,它可以根据jar自动判断
如果找不到,可能是package不对,要写完整,比如,io.github.cdarling.myudf
添加引用后,调用如下

  1. spark.sql("select my_udf(input1,...)")

汇总

  1. import os
  2. os.environ['PYSPARK_SUBMIT_ARGS']='--jars hdfs://clusterHA/tmp/my_jar_file.jar pyspark-shell'
  3. from pyspark.sql import SparkSession
  4. spark=(SparkSession.builder.master('yarn')
  5. .config('spark.jars','hdfs://clusterHA/tmp/my_jar_file.jar')
  6. .appName('2.4.4').getOrCreate())
  7. spark.udf.registerJavaFunction('my_udf','myudf',DoubleType)
  8. df=spark.sql("select my_udf(input1,...)").collect()

A. 参考链接和阅读材料

某medium博客中基本的spark中UDF的介绍,包含与python的UDF、vector udf速度对比
databrick介绍spark2.3推出的python vector udf
cloudera对spark中UDF的介绍
某博客对hive中的UDF的介绍
某spark sql学习笔记
Add jars to a Spark Job - spark-submit SO上对一些添加 jar 方式的介绍