python-udf-example
from pyspark.sql import SparkSessionspark = SparkSession.builder.appName("Python UDF example").getOrCreate()df = spark.read.json("temperatures.json")df.createOrReplaceTempView("citytemps")# Register the UDF with our SparkSessionspark.udf.register("CTOF", lambda degreesCelsius: ((degreesCelsius * 9.0 / 5.0) + 32.0))spark.sql("SELECT city, CTOF(avgLow) AS avgLowF, CTOF(avgHigh) AS avgHighF FROM citytemps").show()
性能相关
了解 Apache Spark UDF 功能的性能影响很重要。例如,Python UDF(比如上面的 CTOF 函数)会导致数据在执行器的 JVM 和运行 UDF 逻辑的 Python 解释器之间进行序列化操作;与 Java 或 Scala 中的 UDF 实现相比,大大降低了性能。缓解这种序列化瓶颈的解决方案如下:
1、从 PySpark 访问 Hive UDF。 Java UDF实现可以由执行器JVM直接访问。
2、在 PySpark 中访问在 Java 或 Scala 中实现的 UDF 的方法。
在 Python 中使用 Hive UDF
from pyspark import SparkConf, SparkContextfrom pyspark.sql import HiveContextconf = SparkConf().setAppName("Hive UDF example")sc = SparkContext(conf=conf)sqlContext = HiveContext(sc)df = sqlContext.read.json("temperatures.json")df.registerTempTable("citytemps")# Register our Hive UDFsqlContext.sql("CREATE TEMPORARY FUNCTION CTOF AS 'com.cloudera.fce.curtis.sparkudfexamples.hiveudf.CTOF'")sqlContext.sql("SELECT city, CTOF(avgLow) AS avgLowF, CTOF(avgHigh) AS avgHighF FROM citytemps").show()
注意,Hive UDF只能使用 Apache Spark 的 SQL 查询语言来调用 - 换句话说,它们不能与 Dataframe API的领域特定语言(domain-specific-language, DSL)一起使用。
