python-udf-example

  1. from pyspark.sql import SparkSession
  2. spark = SparkSession.builder.appName("Python UDF example").getOrCreate()
  3. df = spark.read.json("temperatures.json")
  4. df.createOrReplaceTempView("citytemps")
  5. # Register the UDF with our SparkSession
  6. spark.udf.register("CTOF", lambda degreesCelsius: ((degreesCelsius * 9.0 / 5.0) + 32.0))
  7. spark.sql("SELECT city, CTOF(avgLow) AS avgLowF, CTOF(avgHigh) AS avgHighF FROM citytemps").show()

性能相关
了解 Apache Spark UDF 功能的性能影响很重要。例如,Python UDF(比如上面的 CTOF 函数)会导致数据在执行器的 JVM 和运行 UDF 逻辑的 Python 解释器之间进行序列化操作;与 Java 或 Scala 中的 UDF 实现相比,大大降低了性能。缓解这种序列化瓶颈的解决方案如下:
1、从 PySpark 访问 Hive UDF。 Java UDF实现可以由执行器JVM直接访问。
2、在 PySpark 中访问在 Java 或 Scala 中实现的 UDF 的方法。

在 Python 中使用 Hive UDF

  1. from pyspark import SparkConf, SparkContext
  2. from pyspark.sql import HiveContext
  3. conf = SparkConf().setAppName("Hive UDF example")
  4. sc = SparkContext(conf=conf)
  5. sqlContext = HiveContext(sc)
  6. df = sqlContext.read.json("temperatures.json")
  7. df.registerTempTable("citytemps")
  8. # Register our Hive UDF
  9. sqlContext.sql("CREATE TEMPORARY FUNCTION CTOF AS 'com.cloudera.fce.curtis.sparkudfexamples.hiveudf.CTOF'")
  10. sqlContext.sql("SELECT city, CTOF(avgLow) AS avgLowF, CTOF(avgHigh) AS avgHighF FROM citytemps").show()

注意,Hive UDF只能使用 Apache Spark 的 SQL 查询语言来调用 - 换句话说,它们不能与 Dataframe API的领域特定语言(domain-specific-language, DSL)一起使用。