目标
创建基于scala的spark项目,编写用于spark的自定义函数(UDF),并打包成jar
环境
InteliJ IDEA 2019.1
Oracle JDK 8
1. 编译 UDF
1.1 新建项目
在InteliJ IDEA中创建新的Maven项目,在pom文件中添加如下内容
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.0.1</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.12</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.3</version>
</dependency>
</dependencies>
1.2 新建源码
在src/main文件夹下,创建新的文件夹scala,右键Mark Directory as -> Sources Root
在scala文件夹点右键,New -> Scala Class
这时就能新建一个scala源码文件了,比如新建一个class叫做main
import org.apache.spark.sql.api.java.UDF2
class main extends UDF2[Int,Int,Double]{
override def call(t1: Int, t2: Int): Double = {
return t1*t2*t2*3.14
}
}
object run extends App{
val o = new main()
val r = o.call(2,3)
println(r)
}
写完代码点绿三角运行,可得:
56.52
以上程序中的main,继承了UDF2,表示两个输入参数的Spark UDF函数,本例两个输入是整数,输出双精度
1.3 生成
在IntelliJ IDEA窗口右侧的Maven页中,点Lifecycle->package,即可生成项目的jar包
2 使用
2.1 scala调用
基于刚刚创建的jar文件,在项目设置中,Module,Dependencies,添加Jars,并在左侧打勾,保证输出时把引入的UDF一起打包在jar内
spark.sqlContext.udf.register("myudf",(a,b)=>new myudf().call(a,b))
或者直接在scala里写函数
spark.sqlContext.udf.register("myudf",(a:Int,b:Int)=>1)
正经的scala调用
直接把写好的继承了UDF2的类注册为udf
import org.apache.spark.sql.types.DoubleType
object myfunobj{
def myfun():UserDefinedFunction = udf(new myudf().call _);
}
spark.udf.register("myfun",myfunobj.myfun)
println(spark.sql("select myfun(1,2)").collect()(0))
2.2 pyspark调用
启动前修改启动参数
os.environ['PYSPARK_SUBMIT_ARGS']='--master local[*] --jars my_jar_file.jar pyspark-shell'
除了上面这种办法,也可以在启动session时加入此项conf,除了当前文件夹,也支持hdfs
.config('spark.jars','hdfs://clusterHA/tmp/my_jar_file.jar')\
spark.jars 支持 hdfs,只是要在启动 Session 前配置好。
其他可以考虑的办法还有,SPARK_CLASSPATH,—driver-class-path,sc.addPyFile,conf 中的 extraClassPath extraLibraryPath 等,注意有的方式只添加包不添加 CLASSPATH、
启动 Session 后使用如下语句注册函数
spark.udf.registerJavaFunction('spark_fun_name','package.class','type_of_output')
具体来说,就是
spark.udf.registerJavaFunction('my_udf','myudf',DoubleType)
其中的DoubleType可以省略,它可以根据jar自动判断
如果找不到,可能是package不对,要写完整,比如,io.github.cdarling.myudf
添加引用后,调用如下
spark.sql("select my_udf(input1,...)")
汇总
import os
os.environ['PYSPARK_SUBMIT_ARGS']='--jars hdfs://clusterHA/tmp/my_jar_file.jar pyspark-shell'
from pyspark.sql import SparkSession
spark=(SparkSession.builder.master('yarn')
.config('spark.jars','hdfs://clusterHA/tmp/my_jar_file.jar')
.appName('2.4.4').getOrCreate())
spark.udf.registerJavaFunction('my_udf','myudf',DoubleType)
df=spark.sql("select my_udf(input1,...)").collect()
A. 参考链接和阅读材料
某medium博客中基本的spark中UDF的介绍,包含与python的UDF、vector udf速度对比
databrick介绍spark2.3推出的python vector udf
cloudera对spark中UDF的介绍
某博客对hive中的UDF的介绍
某spark sql学习笔记
Add jars to a Spark Job - spark-submit SO上对一些添加 jar 方式的介绍