python-udf-example
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Python UDF example").getOrCreate()
df = spark.read.json("temperatures.json")
df.createOrReplaceTempView("citytemps")
# Register the UDF with our SparkSession
spark.udf.register("CTOF", lambda degreesCelsius: ((degreesCelsius * 9.0 / 5.0) + 32.0))
spark.sql("SELECT city, CTOF(avgLow) AS avgLowF, CTOF(avgHigh) AS avgHighF FROM citytemps").show()
性能相关
了解 Apache Spark UDF 功能的性能影响很重要。例如,Python UDF(比如上面的 CTOF 函数)会导致数据在执行器的 JVM 和运行 UDF 逻辑的 Python 解释器之间进行序列化操作;与 Java 或 Scala 中的 UDF 实现相比,大大降低了性能。缓解这种序列化瓶颈的解决方案如下:
1、从 PySpark 访问 Hive UDF。 Java UDF实现可以由执行器JVM直接访问。
2、在 PySpark 中访问在 Java 或 Scala 中实现的 UDF 的方法。
在 Python 中使用 Hive UDF
from pyspark import SparkConf, SparkContext
from pyspark.sql import HiveContext
conf = SparkConf().setAppName("Hive UDF example")
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
df = sqlContext.read.json("temperatures.json")
df.registerTempTable("citytemps")
# Register our Hive UDF
sqlContext.sql("CREATE TEMPORARY FUNCTION CTOF AS 'com.cloudera.fce.curtis.sparkudfexamples.hiveudf.CTOF'")
sqlContext.sql("SELECT city, CTOF(avgLow) AS avgLowF, CTOF(avgHigh) AS avgHighF FROM citytemps").show()
注意,Hive UDF只能使用 Apache Spark 的 SQL 查询语言来调用 - 换句话说,它们不能与 Dataframe API的领域特定语言(domain-specific-language, DSL)一起使用。