Spark SQL, DataFrames and Datasets Guide

Overview

Spark SQL 是 Spark 处理结构化数据的一个模块。与基础的 Spark RDD API 不同,Spark SQL 提供了查询结构化数据及计算结果等信息的接口。在内部,Spark SQL 使用这个额外的信息去执行额外的优化。有几种方式可以跟 Spark SQL 进行交互,包括 SQL 和 Dataset API。当使用相同执行引擎进行计算时,无论使用哪种 API / 语言都可以快速的计算。这种统一意味着开发人员能够在基于提供最自然的方式来表达一个给定的 transformation API 之间实现轻松的来回切换不同的。

该页面所有例子使用的示例数据都包含在 Spark 的发布中,并且可以使用 spark-shellpyspark shell,或者 sparkR shell来运行。

SQL

Spark SQL 的功能之一是执行 SQL 查询。Spark SQL 也能够被用于从已存在的 Hive 环境中读取数据。更多关于如何配置这个特性的信息,请参考 Hive 表 这部分。当以另外的编程语言运行SQL 时,查询结果将以 Dataset/DataFrame的形式返回。您也可以使用 命令行或者通过 JDBC/ODBC与 SQL 接口交互。

Datasets and DataFrames

一个 Dataset 是一个分布式的数据集合 Dataset 是在 Spark 1.6 中被添加的新接口,它提供了 RDD 的优点(强类型化,能够使用强大的 lambda 函数)与Spark SQL执行引擎的优点。一个 Dataset 可以从 JVM 对象来 构造 并且使用转换功能(map,flatMap,filter,等等)。Dataset API 在ScalaJava是可用的。Python 不支持 Dataset API。但是由于 Python 的动态特性,许多 Dataset API 的优点已经可用了(也就是说,你可能通过 name 天生的row.columnName属性访问一行中的字段)。这种情况和 R 相似。

一个 DataFrame 是一个 Dataset 组成的指定列。它的概念与一个在关系型数据库或者在 R/Python 中的表是相等的,但是有很多优化。DataFrames 可以从大量的 sources 中构造出来,比如:结构化的文本文件,Hive中的表,外部数据库,或者已经存在的 RDDs。DataFrame API 可以在 Scala,Java,Python,和 R中实现。在 Scala 和 Java中,DataFrame 由 DataSet 中的 RowS(多个 Row)来表示。在 the Scala API中,DataFrame 仅仅是一个 Dataset[Row]类型的别名。然而,在 Java API中,用户需要去使用 Dataset<Row> 去代表一个 DataFrame

在此文档中,我们将常常会引用 Scala/Java Datasets 的 Rows 作为 DataFrames。

开始入门

起始点:SparkSession

Spark SQL中所有功能的入口点是 SparkSession 类。要创建一个 SparkSession,仅使用 SparkSession.builder()就可以了:

  1. import org.apache.spark.sql.SparkSession
  2. val spark = SparkSession
  3. .builder()
  4. .appName("Spark SQL basic example")
  5. .config("spark.some.config.option", "some-value")
  6. .getOrCreate()
  7. // For implicit conversions like converting RDDs to DataFrames
  8. import spark.implicits._

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” in the Spark repo.

Spark SQL中所有功能的入口点是 SparkSession 类。要创建一个 SparkSession,仅使用 SparkSession.builder()就可以了:

  1. import org.apache.spark.sql.SparkSession;
  2. SparkSession spark = SparkSession
  3. .builder()
  4. .appName("Java Spark SQL basic example")
  5. .config("spark.some.config.option", "some-value")
  6. .getOrCreate();

Find full example code at “examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java” in the Spark repo.

Spark SQL中所有功能的入口点是 SparkSession 类。要穿件一个 SparkSession,仅使用 SparkSession.builder就可以了:

  1. from pyspark.sql import SparkSession
  2. spark = SparkSession \
  3. .builder \
  4. .appName("Python Spark SQL basic example") \
  5. .config("spark.some.config.option", "some-value") \
  6. .getOrCreate()

Find full example code at “examples/src/main/python/sql/basic.py” in the Spark repo.

Spark SQL中所有功能的入口点是 SparkSession 类。要初始化一个基本的 SparkSession,仅调用 sparkR.session()即可:

  1. sparkR.session(appName = "R Spark SQL basic example", sparkConfig = list(spark.some.config.option = "some-value"))

Find full example code at “examples/src/main/r/RSparkSQLExample.R” in the Spark repo.

注意第一次调用时,sparkR.session() 初始化一个全局的 SparkSession 单实例,并且总是返回一个引用此实例,可以连续的调用。通过这种方式,用户仅需要创建一次 SparkSession,然后像 read.df SparkR函数就能够立即获取全局的实例,用户不需要再 SparkSession 之间进行实例的传递。

Spark 2.0 中的SparkSession 为 Hive 特性提供了内嵌的支持,包括使用 HiveQL 编写查询的能力,访问 Hive UDF,以及从 Hive 表中读取数据的能力。为了使用这些特性,你不需要去有一个已存在的 Hive 设置。

创建 DataFrames

在一个 SparkSession中,应用程序可以从一个 已经存在的 RDD,从hive表,或者从 Spark数据源中创建一个DataFrames。

举个例子,下面就是基于一个JSON文件创建一个DataFrame:

  1. val df = spark.read.json("examples/src/main/resources/people.json")
  2. // Displays the content of the DataFrame to stdout
  3. df.show()
  4. // +----+-------+
  5. // | age| name|
  6. // +----+-------+
  7. // |null|Michael|
  8. // | 30| Andy|
  9. // | 19| Justin|
  10. // +----+-------+

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” in the Spark repo.

在一个 SparkSession中,应用程序可以从一个 已经存在的 RDD,从hive表,或者从 Spark数据源中创建一个DataFrames。

举个例子,下面就是基于一个JSON文件创建一个DataFrame:

  1. import org.apache.spark.sql.Dataset;
  2. import org.apache.spark.sql.Row;
  3. Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");
  4. // Displays the content of the DataFrame to stdout
  5. df.show();
  6. // +----+-------+
  7. // | age| name|
  8. // +----+-------+
  9. // |null|Michael|
  10. // | 30| Andy|
  11. // | 19| Justin|
  12. // +----+-------+

Find full example code at “examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java” in the Spark repo.

在一个 SparkSession中,应用程序可以从一个 已经存在的 RDD,从hive表,或者从 Spark数据源中创建一个DataFrames。

举个例子,下面就是基于一个JSON文件创建一个DataFrame:

  1. # spark is an existing SparkSession
  2. df = spark.read.json("examples/src/main/resources/people.json")
  3. # Displays the content of the DataFrame to stdout
  4. df.show()
  5. # +----+-------+
  6. # | age| name|
  7. # +----+-------+
  8. # |null|Michael|
  9. # | 30| Andy|
  10. # | 19| Justin|
  11. # +----+-------+

Find full example code at “examples/src/main/python/sql/basic.py” in the Spark repo.

在一个 SparkSession中,应用程序可以从一个本地的R frame 数据,从hive表,或者从Spark数据源

举个例子,下面就是基于一个JSON文件创建一个DataFrame:

  1. df <- read.json("examples/src/main/resources/people.json")
  2. # Displays the content of the DataFrame
  3. head(df)
  4. ## age name
  5. ## 1 NA Michael
  6. ## 2 30 Andy
  7. ## 3 19 Justin
  8. # Another method to print the first few rows and optionally truncate the printing of long values
  9. showDF(df)
  10. ## +----+-------+
  11. ## | age| name|
  12. ## +----+-------+
  13. ## |null|Michael|
  14. ## | 30| Andy|
  15. ## | 19| Justin|
  16. ## +----+-------+

Find full example code at “examples/src/main/r/RSparkSQLExample.R” in the Spark repo.

无类型的 Dataset 操作(aka DataFrame 操作)

DataFrames 提供了一个特定的语法用在 ScalaJavaPython and R中机构化数据的操作。

正如上面提到的一样,Spark 2.0 中,DataFrames 在 Scala 和 Java API 中,仅仅是多个 Rows 的 Dataset。这些操作也参考了与强类型的 Scala/Java Datasets 中的 “类型转换” 对应的 “无类型转换”。

这里包括一些使用 Dataset 进行结构化数据处理的示例 :

  1. // This import is needed to use the $-notation
  2. import spark.implicits._
  3. // Print the schema in a tree format
  4. df.printSchema()
  5. // root
  6. // |-- age: long (nullable = true)
  7. // |-- name: string (nullable = true)
  8. // Select only the "name" column
  9. df.select("name").show()
  10. // +-------+
  11. // | name|
  12. // +-------+
  13. // |Michael|
  14. // | Andy|
  15. // | Justin|
  16. // +-------+
  17. // Select everybody, but increment the age by 1
  18. df.select($"name", $"age" + 1).show()
  19. // +-------+---------+
  20. // | name|(age + 1)|
  21. // +-------+---------+
  22. // |Michael| null|
  23. // | Andy| 31|
  24. // | Justin| 20|
  25. // +-------+---------+
  26. // Select people older than 21
  27. df.filter($"age" > 21).show()
  28. // +---+----+
  29. // |age|name|
  30. // +---+----+
  31. // | 30|Andy|
  32. // +---+----+
  33. // Count people by age
  34. df.groupBy("age").count().show()
  35. // +----+-----+
  36. // | age|count|
  37. // +----+-----+
  38. // | 19| 1|
  39. // |null| 1|
  40. // | 30| 1|
  41. // +----+-----+

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” in the Spark repo.

能够在 DataFrame 上被执行的操作类型的完整列表请参考 API 文档

除了简单的列引用和表达式之外,DataFrame 也有丰富的函数库,包括 string 操作,date 算术,常见的 math 操作以及更多。可用的完整列表请参考 DataFrame 函数指南

  1. // col("...") is preferable to df.col("...")
  2. import static org.apache.spark.sql.functions.col;
  3. // Print the schema in a tree format
  4. df.printSchema();
  5. // root
  6. // |-- age: long (nullable = true)
  7. // |-- name: string (nullable = true)
  8. // Select only the "name" column
  9. df.select("name").show();
  10. // +-------+
  11. // | name|
  12. // +-------+
  13. // |Michael|
  14. // | Andy|
  15. // | Justin|
  16. // +-------+
  17. // Select everybody, but increment the age by 1
  18. df.select(col("name"), col("age").plus(1)).show();
  19. // +-------+---------+
  20. // | name|(age + 1)|
  21. // +-------+---------+
  22. // |Michael| null|
  23. // | Andy| 31|
  24. // | Justin| 20|
  25. // +-------+---------+
  26. // Select people older than 21
  27. df.filter(col("age").gt(21)).show();
  28. // +---+----+
  29. // |age|name|
  30. // +---+----+
  31. // | 30|Andy|
  32. // +---+----+
  33. // Count people by age
  34. df.groupBy("age").count().show();
  35. // +----+-----+
  36. // | age|count|
  37. // +----+-----+
  38. // | 19| 1|
  39. // |null| 1|
  40. // | 30| 1|
  41. // +----+-----+

Find full example code at “examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java” in the Spark repo.

为了能够在 DataFrame 上被执行的操作类型的完整列表请参考 API 文档

除了简单的列引用和表达式之外,DataFrame 也有丰富的函数库,包括 string 操作,date 算术,常见的 math 操作以及更多。可用的完整列表请参考 DataFrame 函数指南

在Python中,可以通过(df.age) 或者(df['age'])来获取DataFrame的列。虽然前者便于交互式操作,但是还是建议用户使用后者,这样不会破坏列名,也能引用DataFrame的类。

  1. # spark, df are from the previous example
  2. # Print the schema in a tree format
  3. df.printSchema()
  4. # root
  5. # |-- age: long (nullable = true)
  6. # |-- name: string (nullable = true)
  7. # Select only the "name" column
  8. df.select("name").show()
  9. # +-------+
  10. # | name|
  11. # +-------+
  12. # |Michael|
  13. # | Andy|
  14. # | Justin|
  15. # +-------+
  16. # Select everybody, but increment the age by 1
  17. df.select(df['name'], df['age'] + 1).show()
  18. # +-------+---------+
  19. # | name|(age + 1)|
  20. # +-------+---------+
  21. # |Michael| null|
  22. # | Andy| 31|
  23. # | Justin| 20|
  24. # +-------+---------+
  25. # Select people older than 21
  26. df.filter(df['age'] > 21).show()
  27. # +---+----+
  28. # |age|name|
  29. # +---+----+
  30. # | 30|Andy|
  31. # +---+----+
  32. # Count people by age
  33. df.groupBy("age").count().show()
  34. # +----+-----+
  35. # | age|count|
  36. # +----+-----+
  37. # | 19| 1|
  38. # |null| 1|
  39. # | 30| 1|
  40. # +----+-----+

Find full example code at “examples/src/main/python/sql/basic.py” in the Spark repo.

为了能够在 DataFrame 上被执行的操作类型的完整列表请参考 API 文档

除了简单的列引用和表达式之外,DataFrame 也有丰富的函数库,包括 string 操作,date 算术,常见的 math 操作以及更多。可用的完整列表请参考 DataFrame 函数指南

  1. # Create the DataFrame
  2. df <- read.json("examples/src/main/resources/people.json")
  3. # Show the content of the DataFrame
  4. head(df)
  5. ## age name
  6. ## 1 NA Michael
  7. ## 2 30 Andy
  8. ## 3 19 Justin
  9. # Print the schema in a tree format
  10. printSchema(df)
  11. ## root
  12. ## |-- age: long (nullable = true)
  13. ## |-- name: string (nullable = true)
  14. # Select only the "name" column
  15. head(select(df, "name"))
  16. ## name
  17. ## 1 Michael
  18. ## 2 Andy
  19. ## 3 Justin
  20. # Select everybody, but increment the age by 1
  21. head(select(df, df$name, df$age + 1))
  22. ## name (age + 1.0)
  23. ## 1 Michael NA
  24. ## 2 Andy 31
  25. ## 3 Justin 20
  26. # Select people older than 21
  27. head(where(df, df$age > 21))
  28. ## age name
  29. ## 1 30 Andy
  30. # Count people by age
  31. head(count(groupBy(df, "age")))
  32. ## age count
  33. ## 1 19 1
  34. ## 2 NA 1
  35. ## 3 30 1

Find full example code at “examples/src/main/r/RSparkSQLExample.R” in the Spark repo.

为了能够在 DataFrame 上被执行的操作类型的完整列表请参考 API 文档

除了简单的列引用和表达式之外,DataFrame 也有丰富的函数库,包括 string 操作,date 算术,常见的 math 操作以及更多。可用的完整列表请参考 DataFrame 函数指南

Running SQL Queries Programmatically

SparkSessionsql 函数可以让应用程序以编程的方式运行 SQL 查询,并将结果作为一个 DataFrame 返回。

  1. // Register the DataFrame as a SQL temporary view
  2. df.createOrReplaceTempView("people")
  3. val sqlDF = spark.sql("SELECT * FROM people")
  4. sqlDF.show()
  5. // +----+-------+
  6. // | age| name|
  7. // +----+-------+
  8. // |null|Michael|
  9. // | 30| Andy|
  10. // | 19| Justin|
  11. // +----+-------+

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” in the Spark repo.

SparkSessionsql 函数可以让应用程序以编程的方式运行 SQL 查询,并将结果作为一个 Dataset&lt;Row&gt; 返回。

  1. import org.apache.spark.sql.Dataset;
  2. import org.apache.spark.sql.Row;
  3. // Register the DataFrame as a SQL temporary view
  4. df.createOrReplaceTempView("people");
  5. Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
  6. sqlDF.show();
  7. // +----+-------+
  8. // | age| name|
  9. // +----+-------+
  10. // |null|Michael|
  11. // | 30| Andy|
  12. // | 19| Justin|
  13. // +----+-------+

Find full example code at “examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java” in the Spark repo.

SparkSessionsql 函数可以让应用程序以编程的方式运行 SQL 查询,并将结果作为一个 DataFrame 返回。

  1. # Register the DataFrame as a SQL temporary view
  2. df.createOrReplaceTempView("people")
  3. sqlDF = spark.sql("SELECT * FROM people")
  4. sqlDF.show()
  5. # +----+-------+
  6. # | age| name|
  7. # +----+-------+
  8. # |null|Michael|
  9. # | 30| Andy|
  10. # | 19| Justin|
  11. # +----+-------+

Find full example code at “examples/src/main/python/sql/basic.py” in the Spark repo.

SparkSessionsql 函数可以让应用程序以编程的方式运行 SQL 查询,并将结果作为一个 DataFrame 返回。

  1. df <- sql("SELECT * FROM table")

Find full example code at “examples/src/main/r/RSparkSQLExample.R” in the Spark repo.

全局临时视图

Spark SQL中的临时视图是session级别的,也就是会随着session的消失而消失。如果你想让一个临时视图在所有session中相互传递并且可用,直到Spark 应用退出,你可以建立一个全局的临时视图。全局的临时视图存在于系统数据库 global_temp中,我们必须加上库名去引用它,比如。 SELECT * FROM global_temp.view1

  1. // Register the DataFrame as a global temporary view
  2. df.createGlobalTempView("people")
  3. // Global temporary view is tied to a system preserved database `global_temp`
  4. spark.sql("SELECT * FROM global_temp.people").show()
  5. // +----+-------+
  6. // | age| name|
  7. // +----+-------+
  8. // |null|Michael|
  9. // | 30| Andy|
  10. // | 19| Justin|
  11. // +----+-------+
  12. // Global temporary view is cross-session
  13. spark.newSession().sql("SELECT * FROM global_temp.people").show()
  14. // +----+-------+
  15. // | age| name|
  16. // +----+-------+
  17. // |null|Michael|
  18. // | 30| Andy|
  19. // | 19| Justin|
  20. // +----+-------+

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” in the Spark repo.

  1. // Register the DataFrame as a global temporary view
  2. df.createGlobalTempView("people");
  3. // Global temporary view is tied to a system preserved database `global_temp`
  4. spark.sql("SELECT * FROM global_temp.people").show();
  5. // +----+-------+
  6. // | age| name|
  7. // +----+-------+
  8. // |null|Michael|
  9. // | 30| Andy|
  10. // | 19| Justin|
  11. // +----+-------+
  12. // Global temporary view is cross-session
  13. spark.newSession().sql("SELECT * FROM global_temp.people").show();
  14. // +----+-------+
  15. // | age| name|
  16. // +----+-------+
  17. // |null|Michael|
  18. // | 30| Andy|
  19. // | 19| Justin|
  20. // +----+-------+

Find full example code at “examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java” in the Spark repo.

  1. # Register the DataFrame as a global temporary view
  2. df.createGlobalTempView("people")
  3. # Global temporary view is tied to a system preserved database `global_temp`
  4. spark.sql("SELECT * FROM global_temp.people").show()
  5. # +----+-------+
  6. # | age| name|
  7. # +----+-------+
  8. # |null|Michael|
  9. # | 30| Andy|
  10. # | 19| Justin|
  11. # +----+-------+
  12. # Global temporary view is cross-session
  13. spark.newSession().sql("SELECT * FROM global_temp.people").show()
  14. # +----+-------+
  15. # | age| name|
  16. # +----+-------+
  17. # |null|Michael|
  18. # | 30| Andy|
  19. # | 19| Justin|
  20. # +----+-------+

Find full example code at “examples/src/main/python/sql/basic.py” in the Spark repo.

  1. CREATE GLOBAL TEMPORARY VIEW temp_view AS SELECT a + 1, b * 2 FROM tbl
  2. SELECT * FROM global_temp.temp_view

创建Datasets

Dataset 与 RDD 相似,然而,并不是使用 Java 序列化或者 Kryo 编码器 来序列化用于处理或者通过网络进行传输的对象。虽然编码器和标准的序列化都负责将一个对象序列化成字节,编码器是动态生成的代码,并且使用了一种允许 Spark 去执行许多像 filtering,sorting 以及 hashing 这样的操作,不需要将字节反序列化成对象的格式。

  1. // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
  2. // you can use custom classes that implement the Product interface
  3. case class Person(name: String, age: Long)
  4. // Encoders are created for case classes
  5. val caseClassDS = Seq(Person("Andy", 32)).toDS()
  6. caseClassDS.show()
  7. // +----+---+
  8. // |name|age|
  9. // +----+---+
  10. // |Andy| 32|
  11. // +----+---+
  12. // Encoders for most common types are automatically provided by importing spark.implicits._
  13. val primitiveDS = Seq(1, 2, 3).toDS()
  14. primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)
  15. // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
  16. val path = "examples/src/main/resources/people.json"
  17. val peopleDS = spark.read.json(path).as[Person]
  18. peopleDS.show()
  19. // +----+-------+
  20. // | age| name|
  21. // +----+-------+
  22. // |null|Michael|
  23. // | 30| Andy|
  24. // | 19| Justin|
  25. // +----+-------+

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” in the Spark repo.

  1. import java.util.Arrays;
  2. import java.util.Collections;
  3. import java.io.Serializable;
  4. import org.apache.spark.api.java.function.MapFunction;
  5. import org.apache.spark.sql.Dataset;
  6. import org.apache.spark.sql.Row;
  7. import org.apache.spark.sql.Encoder;
  8. import org.apache.spark.sql.Encoders;
  9. public static class Person implements Serializable {
  10. private String name;
  11. private int age;
  12. public String getName() {
  13. return name;
  14. }
  15. public void setName(String name) {
  16. this.name = name;
  17. }
  18. public int getAge() {
  19. return age;
  20. }
  21. public void setAge(int age) {
  22. this.age = age;
  23. }
  24. }
  25. // Create an instance of a Bean class
  26. Person person = new Person();
  27. person.setName("Andy");
  28. person.setAge(32);
  29. // Encoders are created for Java beans
  30. Encoder<Person> personEncoder = Encoders.bean(Person.class);
  31. Dataset<Person> javaBeanDS = spark.createDataset(
  32. Collections.singletonList(person),
  33. personEncoder
  34. );
  35. javaBeanDS.show();
  36. // +---+----+
  37. // |age|name|
  38. // +---+----+
  39. // | 32|Andy|
  40. // +---+----+
  41. // Encoders for most common types are provided in class Encoders
  42. Encoder<Integer> integerEncoder = Encoders.INT();
  43. Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
  44. Dataset<Integer> transformedDS = primitiveDS.map(
  45. (MapFunction<Integer, Integer>) value -> value + 1,
  46. integerEncoder);
  47. transformedDS.collect(); // Returns [2, 3, 4]
  48. // DataFrames can be converted to a Dataset by providing a class. Mapping based on name
  49. String path = "examples/src/main/resources/people.json";
  50. Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
  51. peopleDS.show();
  52. // +----+-------+
  53. // | age| name|
  54. // +----+-------+
  55. // |null|Michael|
  56. // | 30| Andy|
  57. // | 19| Justin|
  58. // +----+-------+

Find full example code at “examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java” in the Spark repo.

RDD的互操作性

Spark SQL 支持两种不同的方法用于转换已存在的 RDD 成为 Dataset。第一种方法是使用反射去推断一个包含指定的对象类型的 RDD 的 Schema。在你的 Spark 应用程序中当你已知 Schema 时这个基于方法的反射可以让你的代码更简洁。

第二种用于创建 Dataset 的方法是通过一个允许你构造一个 Schema 然后把它应用到一个已存在的 RDD 的编程接口。然而这种方法更繁琐,当列和它们的类型知道运行时都是未知时它允许你去构造 Dataset。

使用反射推断Schema

Spark SQL 的 Scala 接口支持自动转换一个包含 case classes 的 RDD 为 DataFrame。Case class 定义了表的 Schema。Case class 的参数名使用反射读取并且成为了列名。Case class 也可以是嵌套的或者包含像 Seq 或者 Array 这样的复杂类型。这个 RDD 能够被隐式转换成一个 DataFrame 然后被注册为一个表。表可以用于后续的 SQL 语句。

  1. // For implicit conversions from RDDs to DataFrames
  2. import spark.implicits._
  3. // Create an RDD of Person objects from a text file, convert it to a Dataframe
  4. val peopleDF = spark.sparkContext
  5. .textFile("examples/src/main/resources/people.txt")
  6. .map(_.split(","))
  7. .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
  8. .toDF()
  9. // Register the DataFrame as a temporary view
  10. peopleDF.createOrReplaceTempView("people")
  11. // SQL statements can be run by using the sql methods provided by Spark
  12. val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
  13. // The columns of a row in the result can be accessed by field index
  14. teenagersDF.map(teenager => "Name: " + teenager(0)).show()
  15. // +------------+
  16. // | value|
  17. // +------------+
  18. // |Name: Justin|
  19. // +------------+
  20. // or by field name
  21. teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
  22. // +------------+
  23. // | value|
  24. // +------------+
  25. // |Name: Justin|
  26. // +------------+
  27. // No pre-defined encoders for Dataset[Map[K,V]], define explicitly
  28. implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
  29. // Primitive types and case classes can be also defined as
  30. // implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()
  31. // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
  32. teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
  33. // Array(Map("name" -> "Justin", "age" -> 19))

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” in the Spark repo.

Spark SQL 支持一个[JavaBeans]的RDD(http://stackoverflow.com/questions/3295496/what-is-a-javabean-exactly)自动转换为一个DataFrame。`BeanInfo`利用反射定义表的schema。目前Spark SQL不支持含有Map的JavaBeans。但是支持嵌套List或者 ArrayJavaBeans。你可以通过创建一个有getters和setters的序列化的类来创建一个JavaBean。

  1. import org.apache.spark.api.java.JavaRDD;
  2. import org.apache.spark.api.java.function.Function;
  3. import org.apache.spark.api.java.function.MapFunction;
  4. import org.apache.spark.sql.Dataset;
  5. import org.apache.spark.sql.Row;
  6. import org.apache.spark.sql.Encoder;
  7. import org.apache.spark.sql.Encoders;
  8. // Create an RDD of Person objects from a text file
  9. JavaRDD<Person> peopleRDD = spark.read()
  10. .textFile("examples/src/main/resources/people.txt")
  11. .javaRDD()
  12. .map(line -> {
  13. String[] parts = line.split(",");
  14. Person person = new Person();
  15. person.setName(parts[0]);
  16. person.setAge(Integer.parseInt(parts[1].trim()));
  17. return person;
  18. });
  19. // Apply a schema to an RDD of JavaBeans to get a DataFrame
  20. Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
  21. // Register the DataFrame as a temporary view
  22. peopleDF.createOrReplaceTempView("people");
  23. // SQL statements can be run by using the sql methods provided by spark
  24. Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
  25. // The columns of a row in the result can be accessed by field index
  26. Encoder<String> stringEncoder = Encoders.STRING();
  27. Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(
  28. (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
  29. stringEncoder);
  30. teenagerNamesByIndexDF.show();
  31. // +------------+
  32. // | value|
  33. // +------------+
  34. // |Name: Justin|
  35. // +------------+
  36. // or by field name
  37. Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(
  38. (MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"),
  39. stringEncoder);
  40. teenagerNamesByFieldDF.show();
  41. // +------------+
  42. // | value|
  43. // +------------+
  44. // |Name: Justin|
  45. // +------------+

Find full example code at “examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java” in the Spark repo.

Spark SQL能够把RDD 转换为一个DataFrame,并推断其类型。这些行由一系列key/value键值对组成。key值代表了表的列名,类型按抽样推断整个数据集,同样的也适用于JSON文件。

  1. from pyspark.sql import Row
  2. sc = spark.sparkContext
  3. # Load a text file and convert each line to a Row.
  4. lines = sc.textFile("examples/src/main/resources/people.txt")
  5. parts = lines.map(lambda l: l.split(","))
  6. people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
  7. # Infer the schema, and register the DataFrame as a table.
  8. schemaPeople = spark.createDataFrame(people)
  9. schemaPeople.createOrReplaceTempView("people")
  10. # SQL can be run over DataFrames that have been registered as a table.
  11. teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
  12. # The results of SQL queries are Dataframe objects.
  13. # rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
  14. teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
  15. for name in teenNames:
  16. print(name)
  17. # Name: Justin

Find full example code at “examples/src/main/python/sql/basic.py” in the Spark repo.

以编程的方式指定Schema

当 case class 不能够在执行之前被定义(例如,records 记录的结构在一个 string 字符串中被编码了,或者一个 text 文本 dataset 将被解析并且不同的用户投影的字段是不一样的)。一个 DataFrame 可以使用下面的三步以编程的方式来创建。

  1. 从原始的 RDD 创建 RDD 的 Row(行)。
  2. Step 1 被创建后,创建 Schema 表示一个 StructType 匹配 RDD 中的 Row(行)的结构。
  3. 通过 SparkSession 提供的 createDataFrame 方法应用 Schema 到 RDD 的 RowS(行)。

例如:

  1. import org.apache.spark.sql.types._
  2. // Create an RDD
  3. val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")
  4. // The schema is encoded in a string
  5. val schemaString = "name age"
  6. // Generate the schema based on the string of schema
  7. val fields = schemaString.split(" ")
  8. .map(fieldName => StructField(fieldName, StringType, nullable = true))
  9. val schema = StructType(fields)
  10. // Convert records of the RDD (people) to Rows
  11. val rowRDD = peopleRDD
  12. .map(_.split(","))
  13. .map(attributes => Row(attributes(0), attributes(1).trim))
  14. // Apply the schema to the RDD
  15. val peopleDF = spark.createDataFrame(rowRDD, schema)
  16. // Creates a temporary view using the DataFrame
  17. peopleDF.createOrReplaceTempView("people")
  18. // SQL can be run over a temporary view created using DataFrames
  19. val results = spark.sql("SELECT name FROM people")
  20. // The results of SQL queries are DataFrames and support all the normal RDD operations
  21. // The columns of a row in the result can be accessed by field index or by field name
  22. results.map(attributes => "Name: " + attributes(0)).show()
  23. // +-------------+
  24. // | value|
  25. // +-------------+
  26. // |Name: Michael|
  27. // | Name: Andy|
  28. // | Name: Justin|
  29. // +-------------+

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” in the Spark repo.

When JavaBean classes cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users), a Dataset&lt;Row&gt; can be created programmatically with three steps.

  1. Create an RDD of Rows from the original RDD;
  2. Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
  3. Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession.

For example:

  1. import java.util.ArrayList;
  2. import java.util.List;
  3. import org.apache.spark.api.java.JavaRDD;
  4. import org.apache.spark.api.java.function.Function;
  5. import org.apache.spark.sql.Dataset;
  6. import org.apache.spark.sql.Row;
  7. import org.apache.spark.sql.types.DataTypes;
  8. import org.apache.spark.sql.types.StructField;
  9. import org.apache.spark.sql.types.StructType;
  10. // Create an RDD
  11. JavaRDD<String> peopleRDD = spark.sparkContext()
  12. .textFile("examples/src/main/resources/people.txt", 1)
  13. .toJavaRDD();
  14. // The schema is encoded in a string
  15. String schemaString = "name age";
  16. // Generate the schema based on the string of schema
  17. List<StructField> fields = new ArrayList<>();
  18. for (String fieldName : schemaString.split(" ")) {
  19. StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
  20. fields.add(field);
  21. }
  22. StructType schema = DataTypes.createStructType(fields);
  23. // Convert records of the RDD (people) to Rows
  24. JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {
  25. String[] attributes = record.split(",");
  26. return RowFactory.create(attributes[0], attributes[1].trim());
  27. });
  28. // Apply the schema to the RDD
  29. Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);
  30. // Creates a temporary view using the DataFrame
  31. peopleDataFrame.createOrReplaceTempView("people");
  32. // SQL can be run over a temporary view created using DataFrames
  33. Dataset<Row> results = spark.sql("SELECT name FROM people");
  34. // The results of SQL queries are DataFrames and support all the normal RDD operations
  35. // The columns of a row in the result can be accessed by field index or by field name
  36. Dataset<String> namesDS = results.map(
  37. (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
  38. Encoders.STRING());
  39. namesDS.show();
  40. // +-------------+
  41. // | value|
  42. // +-------------+
  43. // |Name: Michael|
  44. // | Name: Andy|
  45. // | Name: Justin|
  46. // +-------------+

Find full example code at “examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java” in the Spark repo.

当一个字典不能被提前定义(例如,记录的结构是在一个字符串中,抑或一个文本中解析,被不同的用户所属),一个 DataFrame 可以通过以下 3 步来创建。

  1. RDD 从原始的 RDD 穿件一个 RDD 的 toples 或者一个列表;
  2. Step 1 被创建后,创建 Schema 表示一个 StructType 匹配 RDD 中的结构。
  3. 通过 SparkSession 提供的 createDataFrame 方法应用 Schema 到 RDD。

For example:

  1. # Import data types
  2. from pyspark.sql.types import *
  3. sc = spark.sparkContext
  4. # Load a text file and convert each line to a Row.
  5. lines = sc.textFile("examples/src/main/resources/people.txt")
  6. parts = lines.map(lambda l: l.split(","))
  7. # Each line is converted to a tuple.
  8. people = parts.map(lambda p: (p[0], p[1].strip()))
  9. # The schema is encoded in a string.
  10. schemaString = "name age"
  11. fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
  12. schema = StructType(fields)
  13. # Apply the schema to the RDD.
  14. schemaPeople = spark.createDataFrame(people, schema)
  15. # Creates a temporary view using the DataFrame
  16. schemaPeople.createOrReplaceTempView("people")
  17. # SQL can be run over DataFrames that have been registered as a table.
  18. results = spark.sql("SELECT name FROM people")
  19. results.show()
  20. # +-------+
  21. # | name|
  22. # +-------+
  23. # |Michael|
  24. # | Andy|
  25. # | Justin|
  26. # +-------+

Find full example code at “examples/src/main/python/sql/basic.py” in the Spark repo.

Aggregations

The built-in DataFrames functions provide common aggregations such as count(), countDistinct(), avg(), max(), min(), etc. While those functions are designed for DataFrames, Spark SQL also has type-safe versions for some of them in Scala and Java to work with strongly typed Datasets. Moreover, users are not limited to the predefined aggregate functions and can create their own.

Untyped User-Defined Aggregate Functions

Users have to extend the UserDefinedAggregateFunction abstract class to implement a custom untyped aggregate function. For example, a user-defined average can look like:

  1. import org.apache.spark.sql.expressions.MutableAggregationBuffer
  2. import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
  3. import org.apache.spark.sql.types._
  4. import org.apache.spark.sql.Row
  5. import org.apache.spark.sql.SparkSession
  6. object MyAverage extends UserDefinedAggregateFunction {
  7. // Data types of input arguments of this aggregate function
  8. def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
  9. // Data types of values in the aggregation buffer
  10. def bufferSchema: StructType = {
  11. StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
  12. }
  13. // The data type of the returned value
  14. def dataType: DataType = DoubleType
  15. // Whether this function always returns the same output on the identical input
  16. def deterministic: Boolean = true
  17. // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
  18. // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
  19. // the opportunity to update its values. Note that arrays and maps inside the buffer are still
  20. // immutable.
  21. def initialize(buffer: MutableAggregationBuffer): Unit = {
  22. buffer(0) = 0L
  23. buffer(1) = 0L
  24. }
  25. // Updates the given aggregation buffer `buffer` with new input data from `input`
  26. def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
  27. if (!input.isNullAt(0)) {
  28. buffer(0) = buffer.getLong(0) + input.getLong(0)
  29. buffer(1) = buffer.getLong(1) + 1
  30. }
  31. }
  32. // Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
  33. def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
  34. buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
  35. buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  36. }
  37. // Calculates the final result
  38. def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
  39. }
  40. // Register the function to access it
  41. spark.udf.register("myAverage", MyAverage)
  42. val df = spark.read.json("examples/src/main/resources/employees.json")
  43. df.createOrReplaceTempView("employees")
  44. df.show()
  45. // +-------+------+
  46. // | name|salary|
  47. // +-------+------+
  48. // |Michael| 3000|
  49. // | Andy| 4500|
  50. // | Justin| 3500|
  51. // | Berta| 4000|
  52. // +-------+------+
  53. val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
  54. result.show()
  55. // +--------------+
  56. // |average_salary|
  57. // +--------------+
  58. // | 3750.0|
  59. // +--------------+

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedUntypedAggregation.scala” in the Spark repo.

  1. import java.util.ArrayList;
  2. import java.util.List;
  3. import org.apache.spark.sql.Dataset;
  4. import org.apache.spark.sql.Row;
  5. import org.apache.spark.sql.SparkSession;
  6. import org.apache.spark.sql.expressions.MutableAggregationBuffer;
  7. import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
  8. import org.apache.spark.sql.types.DataType;
  9. import org.apache.spark.sql.types.DataTypes;
  10. import org.apache.spark.sql.types.StructField;
  11. import org.apache.spark.sql.types.StructType;
  12. public static class MyAverage extends UserDefinedAggregateFunction {
  13. private StructType inputSchema;
  14. private StructType bufferSchema;
  15. public MyAverage() {
  16. List<StructField> inputFields = new ArrayList<>();
  17. inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.LongType, true));
  18. inputSchema = DataTypes.createStructType(inputFields);
  19. List<StructField> bufferFields = new ArrayList<>();
  20. bufferFields.add(DataTypes.createStructField("sum", DataTypes.LongType, true));
  21. bufferFields.add(DataTypes.createStructField("count", DataTypes.LongType, true));
  22. bufferSchema = DataTypes.createStructType(bufferFields);
  23. }
  24. // Data types of input arguments of this aggregate function
  25. public StructType inputSchema() {
  26. return inputSchema;
  27. }
  28. // Data types of values in the aggregation buffer
  29. public StructType bufferSchema() {
  30. return bufferSchema;
  31. }
  32. // The data type of the returned value
  33. public DataType dataType() {
  34. return DataTypes.DoubleType;
  35. }
  36. // Whether this function always returns the same output on the identical input
  37. public boolean deterministic() {
  38. return true;
  39. }
  40. // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
  41. // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
  42. // the opportunity to update its values. Note that arrays and maps inside the buffer are still
  43. // immutable.
  44. public void initialize(MutableAggregationBuffer buffer) {
  45. buffer.update(0, 0L);
  46. buffer.update(1, 0L);
  47. }
  48. // Updates the given aggregation buffer `buffer` with new input data from `input`
  49. public void update(MutableAggregationBuffer buffer, Row input) {
  50. if (!input.isNullAt(0)) {
  51. long updatedSum = buffer.getLong(0) + input.getLong(0);
  52. long updatedCount = buffer.getLong(1) + 1;
  53. buffer.update(0, updatedSum);
  54. buffer.update(1, updatedCount);
  55. }
  56. }
  57. // Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
  58. public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
  59. long mergedSum = buffer1.getLong(0) + buffer2.getLong(0);
  60. long mergedCount = buffer1.getLong(1) + buffer2.getLong(1);
  61. buffer1.update(0, mergedSum);
  62. buffer1.update(1, mergedCount);
  63. }
  64. // Calculates the final result
  65. public Double evaluate(Row buffer) {
  66. return ((double) buffer.getLong(0)) / buffer.getLong(1);
  67. }
  68. }
  69. // Register the function to access it
  70. spark.udf().register("myAverage", new MyAverage());
  71. Dataset<Row> df = spark.read().json("examples/src/main/resources/employees.json");
  72. df.createOrReplaceTempView("employees");
  73. df.show();
  74. // +-------+------+
  75. // | name|salary|
  76. // +-------+------+
  77. // |Michael| 3000|
  78. // | Andy| 4500|
  79. // | Justin| 3500|
  80. // | Berta| 4000|
  81. // +-------+------+
  82. Dataset<Row> result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees");
  83. result.show();
  84. // +--------------+
  85. // |average_salary|
  86. // +--------------+
  87. // | 3750.0|
  88. // +--------------+

Find full example code at “examples/src/main/java/org/apache/spark/examples/sql/JavaUserDefinedUntypedAggregation.java” in the Spark repo.

Type-Safe User-Defined Aggregate Functions

User-defined aggregations for strongly typed Datasets revolve around the Aggregator abstract class. For example, a type-safe user-defined average can look like:

  1. import org.apache.spark.sql.expressions.Aggregator
  2. import org.apache.spark.sql.Encoder
  3. import org.apache.spark.sql.Encoders
  4. import org.apache.spark.sql.SparkSession
  5. case class Employee(name: String, salary: Long)
  6. case class Average(var sum: Long, var count: Long)
  7. object MyAverage extends Aggregator[Employee, Average, Double] {
  8. // A zero value for this aggregation. Should satisfy the property that any b + zero = b
  9. def zero: Average = Average(0L, 0L)
  10. // Combine two values to produce a new value. For performance, the function may modify `buffer`
  11. // and return it instead of constructing a new object
  12. def reduce(buffer: Average, employee: Employee): Average = {
  13. buffer.sum += employee.salary
  14. buffer.count += 1
  15. buffer
  16. }
  17. // Merge two intermediate values
  18. def merge(b1: Average, b2: Average): Average = {
  19. b1.sum += b2.sum
  20. b1.count += b2.count
  21. b1
  22. }
  23. // Transform the output of the reduction
  24. def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
  25. // Specifies the Encoder for the intermediate value type
  26. def bufferEncoder: Encoder[Average] = Encoders.product
  27. // Specifies the Encoder for the final output value type
  28. def outputEncoder: Encoder[Double] = Encoders.scalaDouble
  29. }
  30. val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
  31. ds.show()
  32. // +-------+------+
  33. // | name|salary|
  34. // +-------+------+
  35. // |Michael| 3000|
  36. // | Andy| 4500|
  37. // | Justin| 3500|
  38. // | Berta| 4000|
  39. // +-------+------+
  40. // Convert the function to a `TypedColumn` and give it a name
  41. val averageSalary = MyAverage.toColumn.name("average_salary")
  42. val result = ds.select(averageSalary)
  43. result.show()
  44. // +--------------+
  45. // |average_salary|
  46. // +--------------+
  47. // | 3750.0|
  48. // +--------------+

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala” in the Spark repo.

  1. import java.io.Serializable;
  2. import org.apache.spark.sql.Dataset;
  3. import org.apache.spark.sql.Encoder;
  4. import org.apache.spark.sql.Encoders;
  5. import org.apache.spark.sql.SparkSession;
  6. import org.apache.spark.sql.TypedColumn;
  7. import org.apache.spark.sql.expressions.Aggregator;
  8. public static class Employee implements Serializable {
  9. private String name;
  10. private long salary;
  11. // Constructors, getters, setters...
  12. }
  13. public static class Average implements Serializable {
  14. private long sum;
  15. private long count;
  16. // Constructors, getters, setters...
  17. }
  18. public static class MyAverage extends Aggregator<Employee, Average, Double> {
  19. // A zero value for this aggregation. Should satisfy the property that any b + zero = b
  20. public Average zero() {
  21. return new Average(0L, 0L);
  22. }
  23. // Combine two values to produce a new value. For performance, the function may modify `buffer`
  24. // and return it instead of constructing a new object
  25. public Average reduce(Average buffer, Employee employee) {
  26. long newSum = buffer.getSum() + employee.getSalary();
  27. long newCount = buffer.getCount() + 1;
  28. buffer.setSum(newSum);
  29. buffer.setCount(newCount);
  30. return buffer;
  31. }
  32. // Merge two intermediate values
  33. public Average merge(Average b1, Average b2) {
  34. long mergedSum = b1.getSum() + b2.getSum();
  35. long mergedCount = b1.getCount() + b2.getCount();
  36. b1.setSum(mergedSum);
  37. b1.setCount(mergedCount);
  38. return b1;
  39. }
  40. // Transform the output of the reduction
  41. public Double finish(Average reduction) {
  42. return ((double) reduction.getSum()) / reduction.getCount();
  43. }
  44. // Specifies the Encoder for the intermediate value type
  45. public Encoder<Average> bufferEncoder() {
  46. return Encoders.bean(Average.class);
  47. }
  48. // Specifies the Encoder for the final output value type
  49. public Encoder<Double> outputEncoder() {
  50. return Encoders.DOUBLE();
  51. }
  52. }
  53. Encoder<Employee> employeeEncoder = Encoders.bean(Employee.class);
  54. String path = "examples/src/main/resources/employees.json";
  55. Dataset<Employee> ds = spark.read().json(path).as(employeeEncoder);
  56. ds.show();
  57. // +-------+------+
  58. // | name|salary|
  59. // +-------+------+
  60. // |Michael| 3000|
  61. // | Andy| 4500|
  62. // | Justin| 3500|
  63. // | Berta| 4000|
  64. // +-------+------+
  65. MyAverage myAverage = new MyAverage();
  66. // Convert the function to a `TypedColumn` and give it a name
  67. TypedColumn<Employee, Double> averageSalary = myAverage.toColumn().name("average_salary");
  68. Dataset<Double> result = ds.select(averageSalary);
  69. result.show();
  70. // +--------------+
  71. // |average_salary|
  72. // +--------------+
  73. // | 3750.0|
  74. // +--------------+

Find full example code at “examples/src/main/java/org/apache/spark/examples/sql/JavaUserDefinedTypedAggregation.java” in the Spark repo.

Data Sources(数据源)

Spark SQL 支持通过 DataFrame 接口对各种 data sources(数据源)进行操作。DataFrame 可以使用 relational transformations(关系转换)操作,也可用于创建 temporary view(临时视图)。将 DataFrame 注册为 temporary view(临时视图)允许您对其数据运行 SQL 查询。本节 描述了使用 Spark Data Sources 加载和保存数据的一般方法,然后涉及可用于 built-in data sources(内置数据源)的 specific options(特定选项)。

Generic Load/Save Functions(通用 加载/保存 功能)

在最简单的形式中,默认数据源(parquet,除非另有配置 spark.sql.sources.default)将用于所有操作。

  1. val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
  2. usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala” in the Spark repo.

  1. Dataset<Row> usersDF = spark.read().load("examples/src/main/resources/users.parquet");
  2. usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet");

Find full example code at “examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java” in the Spark repo.

  1. df = spark.read.load("examples/src/main/resources/users.parquet")
  2. df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

Find full example code at “examples/src/main/python/sql/datasource.py” in the Spark repo.

  1. df <- read.df("examples/src/main/resources/users.parquet")
  2. write.df(select(df, "name", "favorite_color"), "namesAndFavColors.parquet")

Find full example code at “examples/src/main/r/RSparkSQLExample.R” in the Spark repo.

Manually Specifying Options(手动指定选项)

您还可以 manually specify(手动指定)将与任何你想传递给 data source 的其他选项一起使用的 data source。Data sources 由其 fully qualified name(完全限定名称)(即 org.apache.spark.sql.parquet),但是对于 built-in sources(内置的源),你也可以使用它们的 shortnames(短名称)(jsonparquetjdbcorclibsvmcsvtext)。从任何 data source type(数据源类型)加载 DataFrames 可以使用此 syntax(语法)转换为其他类型。

  1. val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
  2. peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala” in the Spark repo.

  1. Dataset<Row> peopleDF =
  2. spark.read().format("json").load("examples/src/main/resources/people.json");
  3. peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet");

Find full example code at “examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java” in the Spark repo.

  1. df = spark.read.load("examples/src/main/resources/people.json", format="json")
  2. df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")

Find full example code at “examples/src/main/python/sql/datasource.py” in the Spark repo.

  1. df <- read.df("examples/src/main/resources/people.json", "json")
  2. namesAndAges <- select(df, "name", "age")
  3. write.df(namesAndAges, "namesAndAges.parquet", "parquet")

Find full example code at “examples/src/main/r/RSparkSQLExample.R” in the Spark repo.

Run SQL on files directly(直接在文件上运行 SQL)

不使用读取 API 将文件加载到 DataFrame 并进行查询,也可以直接用 SQL 查询该文件.

  1. val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala” in the Spark repo.

  1. Dataset<Row> sqlDF =
  2. spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");

Find full example code at “examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java” in the Spark repo.

  1. df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

Find full example code at “examples/src/main/python/sql/datasource.py” in the Spark repo.

  1. df <- sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

Find full example code at “examples/src/main/r/RSparkSQLExample.R” in the Spark repo.

Save Modes(保存模式)

Save operations(保存操作)可以选择使用 SaveMode,它指定如何处理现有数据如果存在的话。重要的是要意识到,这些 save modes(保存模式)不使用任何 locking(锁定)并且不是 atomic(原子)。另外,当执行 Overwrite 时,数据将在新数据写出之前被删除。

Scala/Java Any Language Meaning
SaveMode.ErrorIfExists (default) "error" (default) 将 DataFrame 保存到 data source(数据源)时,如果数据已经存在,则会抛出异常。
SaveMode.Append "append" 将 DataFrame 保存到 data source(数据源)时,如果 data/table 已存在,则 DataFrame 的内容将被 append(附加)到现有数据中。
SaveMode.Overwrite "overwrite" Overwrite mode(覆盖模式)意味着将 DataFrame 保存到 data source(数据源)时,如果 data/table 已经存在,则预期 DataFrame 的内容将 overwritten(覆盖)现有数据。
SaveMode.Ignore "ignore" Ignore mode(忽略模式)意味着当将 DataFrame 保存到 data source(数据源)时,如果数据已经存在,则保存操作预期不会保存 DataFrame 的内容,并且不更改现有数据。这与 SQL 中的 CREATE TABLE IF NOT EXISTS 类似。

Saving to Persistent Tables(保存到持久表)

DataFrames 也可以使用 saveAsTable 命令作为 persistent tables(持久表)保存到 Hive metastore 中。请注意,existing Hive deployment(现有的 Hive 部署)不需要使用此功能。Spark 将为您创建默认的 local Hive metastore(本地 Hive metastore)(使用 Derby)。与 createOrReplaceTempView 命令不同,saveAsTable 将 materialize(实现)DataFrame 的内容,并创建一个指向 Hive metastore 中数据的指针。即使您的 Spark 程序重新启动,Persistent tables(持久性表)仍然存在,因为您保持与同一个 metastore 的连接。可以通过使用表的名称在 SparkSession 上调用 table 方法来创建 persistent tabl(持久表)的 DataFrame。

对于 file-based(基于文件)的 data source(数据源),例如 text,parquet,json等,您可以通过 path 选项指定 custom table path(自定义表路径),例如 df.write.option("path", "/some/path").saveAsTable("t")。当表被 dropped(删除)时,custom table path(自定义表路径)将不会被删除,并且表数据仍然存在。如果未指定自定义表路径,Spark 将把数据写入 warehouse directory(仓库目录)下的默认表路径。当表被删除时,默认的表路径也将被删除。

从 Spark 2.1 开始,persistent datasource tables(持久性数据源表)将 per-partition metadata(每个分区元数据)存储在 Hive metastore 中。这带来了几个好处:

  • 由于 metastore 只能返回查询的必要 partitions(分区),因此不再需要将第一个查询上的所有 partitions discovering 到表中。
  • Hive DDLs 如 ALTER TABLE PARTITION ... SET LOCATION 现在可用于使用 Datasource API 创建的表。

请注意,创建 external datasource tables(外部数据源表)(带有 path 选项)的表时,默认情况下不会收集 partition information(分区信息)。要 sync(同步)metastore 中的分区信息,可以调用 MSCK REPAIR TABLE

Bucketing, Sorting and Partitioning(分桶,排序和分区)

对于 file-based data source(基于文件的数据源),也可以对 output(输出)进行 bucket 和 sort 或者 partition。Bucketing 和 sorting 仅适用于 persistent tables :

  1. peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala” in the Spark repo.

  1. peopleDF.write().bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed");

Find full example code at “examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java” in the Spark repo.

  1. df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")

Find full example code at “examples/src/main/python/sql/datasource.py” in the Spark repo.

  1. CREATE TABLE users_bucketed_by_name(
  2. name STRING,
  3. favorite_color STRING,
  4. favorite_numbers array<integer>
  5. ) USING parquet
  6. CLUSTERED BY(name) INTO 42 BUCKETS;

在使用 Dataset API 时,partitioning 可以同时与 savesaveAsTable 一起使用.

  1. usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala” in the Spark repo.

  1. usersDF
  2. .write()
  3. .partitionBy("favorite_color")
  4. .format("parquet")
  5. .save("namesPartByColor.parquet");

Find full example code at “examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java” in the Spark repo.

  1. df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")

Find full example code at “examples/src/main/python/sql/datasource.py” in the Spark repo.

  1. CREATE TABLE users_by_favorite_color(
  2. name STRING,
  3. favorite_color STRING,
  4. favorite_numbers array<integer>
  5. ) USING csv PARTITIONED BY(favorite_color);

可以为 single table(单个表)使用 partitioning 和 bucketing:

  1. peopleDF
  2. .write
  3. .partitionBy("favorite_color")
  4. .bucketBy(42, "name")
  5. .saveAsTable("people_partitioned_bucketed")

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala” in the Spark repo.

  1. peopleDF
  2. .write()
  3. .partitionBy("favorite_color")
  4. .bucketBy(42, "name")
  5. .saveAsTable("people_partitioned_bucketed");

Find full example code at “examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java” in the Spark repo.

  1. df = spark.read.parquet("examples/src/main/resources/users.parquet")
  2. (df
  3. .write
  4. .partitionBy("favorite_color")
  5. .bucketBy(42, "name")
  6. .saveAsTable("people_partitioned_bucketed"))

Find full example code at “examples/src/main/python/sql/datasource.py” in the Spark repo.

  1. CREATE TABLE users_bucketed_and_partitioned(
  2. name STRING,
  3. favorite_color STRING,
  4. favorite_numbers array<integer>
  5. ) USING parquet
  6. PARTITIONED BY (favorite_color)
  7. CLUSTERED BY(name) SORTED BY (favorite_numbers) INTO 42 BUCKETS;

partitionBy 创建一个 directory structure(目录结构),如 Partition Discovery 部分所述。因此,对 cardinality(基数)较高的 columns 的适用性有限。相反,bucketBy 可以在固定数量的 buckets 中分配数据,并且可以在 a number of unique values is unbounded(多个唯一值无界时)使用数据。

Parquet Files

Parquet 是许多其他数据处理系统支持的 columnar format(柱状格式)。Spark SQL 支持读写 Parquet 文件,可自动保留 schema of the original data(原始数据的模式)。当编写 Parquet 文件时,出于兼容性原因,所有 columns 都将自动转换为可空。

Loading Data Programmatically(以编程的方式加载数据)

使用上面例子中的数据:

  1. // Encoders for most common types are automatically provided by importing spark.implicits._
  2. import spark.implicits._
  3. val peopleDF = spark.read.json("examples/src/main/resources/people.json")
  4. // DataFrames can be saved as Parquet files, maintaining the schema information
  5. peopleDF.write.parquet("people.parquet")
  6. // Read in the parquet file created above
  7. // Parquet files are self-describing so the schema is preserved
  8. // The result of loading a Parquet file is also a DataFrame
  9. val parquetFileDF = spark.read.parquet("people.parquet")
  10. // Parquet files can also be used to create a temporary view and then used in SQL statements
  11. parquetFileDF.createOrReplaceTempView("parquetFile")
  12. val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
  13. namesDF.map(attributes => "Name: " + attributes(0)).show()
  14. // +------------+
  15. // | value|
  16. // +------------+
  17. // |Name: Justin|
  18. // +------------+

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala” in the Spark repo.

  1. import org.apache.spark.api.java.function.MapFunction;
  2. import org.apache.spark.sql.Encoders;
  3. import org.apache.spark.sql.Dataset;
  4. import org.apache.spark.sql.Row;
  5. Dataset<Row> peopleDF = spark.read().json("examples/src/main/resources/people.json");
  6. // DataFrames can be saved as Parquet files, maintaining the schema information
  7. peopleDF.write().parquet("people.parquet");
  8. // Read in the Parquet file created above.
  9. // Parquet files are self-describing so the schema is preserved
  10. // The result of loading a parquet file is also a DataFrame
  11. Dataset<Row> parquetFileDF = spark.read().parquet("people.parquet");
  12. // Parquet files can also be used to create a temporary view and then used in SQL statements
  13. parquetFileDF.createOrReplaceTempView("parquetFile");
  14. Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19");
  15. Dataset<String> namesDS = namesDF.map(
  16. (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
  17. Encoders.STRING());
  18. namesDS.show();
  19. // +------------+
  20. // | value|
  21. // +------------+
  22. // |Name: Justin|
  23. // +------------+

Find full example code at “examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java” in the Spark repo.

  1. peopleDF = spark.read.json("examples/src/main/resources/people.json")
  2. # DataFrames can be saved as Parquet files, maintaining the schema information.
  3. peopleDF.write.parquet("people.parquet")
  4. # Read in the Parquet file created above.
  5. # Parquet files are self-describing so the schema is preserved.
  6. # The result of loading a parquet file is also a DataFrame.
  7. parquetFile = spark.read.parquet("people.parquet")
  8. # Parquet files can also be used to create a temporary view and then used in SQL statements.
  9. parquetFile.createOrReplaceTempView("parquetFile")
  10. teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
  11. teenagers.show()
  12. # +------+
  13. # | name|
  14. # +------+
  15. # |Justin|
  16. # +------+

Find full example code at “examples/src/main/python/sql/datasource.py” in the Spark repo.

  1. df <- read.df("examples/src/main/resources/people.json", "json")
  2. # SparkDataFrame can be saved as Parquet files, maintaining the schema information.
  3. write.parquet(df, "people.parquet")
  4. # Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.
  5. # The result of loading a parquet file is also a DataFrame.
  6. parquetFile <- read.parquet("people.parquet")
  7. # Parquet files can also be used to create a temporary view and then used in SQL statements.
  8. createOrReplaceTempView(parquetFile, "parquetFile")
  9. teenagers <- sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
  10. head(teenagers)
  11. ## name
  12. ## 1 Justin
  13. # We can also run custom R-UDFs on Spark DataFrames. Here we prefix all the names with "Name:"
  14. schema <- structType(structField("name", "string"))
  15. teenNames <- dapply(df, function(p) { cbind(paste("Name:", p$name)) }, schema)
  16. for (teenName in collect(teenNames)$name) {
  17. cat(teenName, "\n")
  18. }
  19. ## Name: Michael
  20. ## Name: Andy
  21. ## Name: Justin

Find full example code at “examples/src/main/r/RSparkSQLExample.R” in the Spark repo.

  1. CREATE TEMPORARY VIEW parquetTable
  2. USING org.apache.spark.sql.parquet
  3. OPTIONS (
  4. path "examples/src/main/resources/people.parquet"
  5. )
  6. SELECT * FROM parquetTable

Partition Discovery(分区发现)

Table partitioning(表分区)是在像 Hive 这样的系统中使用的常见的优化方法。在 partitioned table(分区表)中,数据通常存储在不同的目录中,partitioning column values encoded(分区列值编码)在每个 partition directory(分区目录)的路径中。Parquet data source(Parquet 数据源)现在可以自动 discover(发现)和 infer(推断)分区信息。例如,我们可以使用以下 directory structure(目录结构)将所有以前使用的 population data(人口数据)存储到 partitioned table(分区表)中,其中有两个额外的列 gendercountry 作为 partitioning columns(分区列):

  1. path
  2. └── to
  3. └── table
  4. ├── gender=male
  5. ├── ...
  6. ├── country=US
  7. └── data.parquet
  8. ├── country=CN
  9. └── data.parquet
  10. └── ...
  11. └── gender=female
  12. ├── ...
  13. ├── country=US
  14. └── data.parquet
  15. ├── country=CN
  16. └── data.parquet
  17. └── ...

通过将 path/to/table 传递给 SparkSession.read.parquetSparkSession.read.load,Spark SQL 将自动从路径中提取 partitioning information(分区信息)。现在返回的 DataFrame 的 schema(模式)变成:

  1. root
  2. |-- name: string (nullable = true)
  3. |-- age: long (nullable = true)
  4. |-- gender: string (nullable = true)
  5. |-- country: string (nullable = true)

请注意,会自动 inferred(推断)partitioning columns(分区列)的 data types(数据类型)。目前,支持 numeric data types(数字数据类型)和 string type(字符串类型)。有些用户可能不想自动推断 partitioning columns(分区列)的数据类型。对于这些用例,automatic type inference(自动类型推断)可以由 spark.sql.sources.partitionColumnTypeInference.enabled 配置,默认为 true。当禁用 type inference(类型推断)时,string type(字符串类型)将用于 partitioning columns(分区列)。

从 Spark 1.6.0 开始,默认情况下,partition discovery(分区发现)只能找到给定路径下的 partitions(分区)。对于上述示例,如果用户将 path/to/table/gender=male 传递给 SparkSession.read.parquetSparkSession.read.load,则 gender 将不被视为 partitioning column(分区列)。如果用户需要指定 partition discovery(分区发现)应该开始的基本路径,则可以在数据源选项中设置 basePath。例如,当 path/to/table/gender=male 是数据的路径并且用户将 basePath 设置为 path/to/table/gender 将是一个 partitioning column(分区列)。

Schema Merging(模式合并)

像 ProtocolBuffer,Avro 和 Thrift 一样,Parquet 也支持 schema evolution(模式演进)。用户可以从一个 simple schema(简单的架构)开始,并根据需要逐渐向 schema 添加更多的 columns(列)。以这种方式,用户可能会使用不同但相互兼容的 schemas 的 multiple Parquet files(多个 Parquet 文件)。Parquet data source(Parquet 数据源)现在能够自动检测这种情况并 merge(合并)所有这些文件的 schemas。

由于 schema merging(模式合并)是一个 expensive operation(相对昂贵的操作),并且在大多数情况下不是必需的,所以默认情况下从 1.5.0 开始。你可以按照如下的方式启用它:

  1. 读取 Parquet 文件时,将 data source option(数据源选项)mergeSchema 设置为 true(如下面的例子所示),或
  2. 将 global SQL option(全局 SQL 选项)spark.sql.parquet.mergeSchema 设置为 true
  1. // This is used to implicitly convert an RDD to a DataFrame.
  2. import spark.implicits._
  3. // Create a simple DataFrame, store into a partition directory
  4. val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
  5. squaresDF.write.parquet("data/test_table/key=1")
  6. // Create another DataFrame in a new partition directory,
  7. // adding a new column and dropping an existing column
  8. val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
  9. cubesDF.write.parquet("data/test_table/key=2")
  10. // Read the partitioned table
  11. val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
  12. mergedDF.printSchema()
  13. // The final schema consists of all 3 columns in the Parquet files together
  14. // with the partitioning column appeared in the partition directory paths
  15. // root
  16. // |-- value: int (nullable = true)
  17. // |-- square: int (nullable = true)
  18. // |-- cube: int (nullable = true)
  19. // |-- key: int (nullable = true)

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala” in the Spark repo.

  1. import java.io.Serializable;
  2. import java.util.ArrayList;
  3. import java.util.Arrays;
  4. import java.util.List;
  5. import org.apache.spark.sql.Dataset;
  6. import org.apache.spark.sql.Row;
  7. public static class Square implements Serializable {
  8. private int value;
  9. private int square;
  10. // Getters and setters...
  11. }
  12. public static class Cube implements Serializable {
  13. private int value;
  14. private int cube;
  15. // Getters and setters...
  16. }
  17. List<Square> squares = new ArrayList<>();
  18. for (int value = 1; value <= 5; value++) {
  19. Square square = new Square();
  20. square.setValue(value);
  21. square.setSquare(value * value);
  22. squares.add(square);
  23. }
  24. // Create a simple DataFrame, store into a partition directory
  25. Dataset<Row> squaresDF = spark.createDataFrame(squares, Square.class);
  26. squaresDF.write().parquet("data/test_table/key=1");
  27. List<Cube> cubes = new ArrayList<>();
  28. for (int value = 6; value <= 10; value++) {
  29. Cube cube = new Cube();
  30. cube.setValue(value);
  31. cube.setCube(value * value * value);
  32. cubes.add(cube);
  33. }
  34. // Create another DataFrame in a new partition directory,
  35. // adding a new column and dropping an existing column
  36. Dataset<Row> cubesDF = spark.createDataFrame(cubes, Cube.class);
  37. cubesDF.write().parquet("data/test_table/key=2");
  38. // Read the partitioned table
  39. Dataset<Row> mergedDF = spark.read().option("mergeSchema", true).parquet("data/test_table");
  40. mergedDF.printSchema();
  41. // The final schema consists of all 3 columns in the Parquet files together
  42. // with the partitioning column appeared in the partition directory paths
  43. // root
  44. // |-- value: int (nullable = true)
  45. // |-- square: int (nullable = true)
  46. // |-- cube: int (nullable = true)
  47. // |-- key: int (nullable = true)

Find full example code at “examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java” in the Spark repo.

  1. from pyspark.sql import Row
  2. # spark is from the previous example.
  3. # Create a simple DataFrame, stored into a partition directory
  4. sc = spark.sparkContext
  5. squaresDF = spark.createDataFrame(sc.parallelize(range(1, 6))
  6. .map(lambda i: Row(single=i, double=i ** 2)))
  7. squaresDF.write.parquet("data/test_table/key=1")
  8. # Create another DataFrame in a new partition directory,
  9. # adding a new column and dropping an existing column
  10. cubesDF = spark.createDataFrame(sc.parallelize(range(6, 11))
  11. .map(lambda i: Row(single=i, triple=i ** 3)))
  12. cubesDF.write.parquet("data/test_table/key=2")
  13. # Read the partitioned table
  14. mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
  15. mergedDF.printSchema()
  16. # The final schema consists of all 3 columns in the Parquet files together
  17. # with the partitioning column appeared in the partition directory paths.
  18. # root
  19. # |-- double: long (nullable = true)
  20. # |-- single: long (nullable = true)
  21. # |-- triple: long (nullable = true)
  22. # |-- key: integer (nullable = true)

Find full example code at “examples/src/main/python/sql/datasource.py” in the Spark repo.

  1. df1 <- createDataFrame(data.frame(single=c(12, 29), double=c(19, 23)))
  2. df2 <- createDataFrame(data.frame(double=c(19, 23), triple=c(23, 18)))
  3. # Create a simple DataFrame, stored into a partition directory
  4. write.df(df1, "data/test_table/key=1", "parquet", "overwrite")
  5. # Create another DataFrame in a new partition directory,
  6. # adding a new column and dropping an existing column
  7. write.df(df2, "data/test_table/key=2", "parquet", "overwrite")
  8. # Read the partitioned table
  9. df3 <- read.df("data/test_table", "parquet", mergeSchema = "true")
  10. printSchema(df3)
  11. # The final schema consists of all 3 columns in the Parquet files together
  12. # with the partitioning column appeared in the partition directory paths
  13. ## root
  14. ## |-- single: double (nullable = true)
  15. ## |-- double: double (nullable = true)
  16. ## |-- triple: double (nullable = true)
  17. ## |-- key: integer (nullable = true)

Find full example code at “examples/src/main/r/RSparkSQLExample.R” in the Spark repo.

Hive metastore Parquet table conversion(Hive metastore Parquet table 转换)

当读取和写入 Hive metastore Parquet 表时,Spark SQL 将尝试使用自己的 Parquet support(Parquet 支持),而不是 Hive SerDe 来获得更好的性能。此 behavior(行为)由 spark.sql.hive.convertMetastoreParquet 配置控制,默认情况下 turned on(打开)。

Hive/Parquet Schema Reconciliation

从 table schema processing(表格模式处理)的角度来说,Hive 和 Parquet 之间有两个关键的区别。

  1. Hive 不区分大小写,而 Parquet 不是
  2. Hive 认为所有 columns(列)都可以为空,而 Parquet 中的可空性是 significant(重要)的。

由于这个原因,当将 Hive metastore Parquet 表转换为 Spark SQL Parquet 表时,我们必须调整 metastore schema 与 Parquet schema。reconciliation 规则是:

  1. 在两个 schema 中具有 same name(相同名称)的 Fields(字段)必须具有 same data type(相同的数据类型),而不管 nullability(可空性)。reconciled field 应具有 Parquet 的数据类型,以便 nullability(可空性)得到尊重。

  2. reconciled schema(调和模式)正好包含 Hive metastore schema 中定义的那些字段。

    • 只出现在 Parquet schema 中的任何字段将被 dropped(删除)在 reconciled schema 中。
    • 仅在 Hive metastore schema 中出现的任何字段在 reconciled schema 中作为 nullable field(可空字段)添加。

Metadata Refreshing(元数据刷新)

Spark SQL 缓存 Parquet metadata 以获得更好的性能。当启用 Hive metastore Parquet table conversion(转换)时,这些 converted tables(转换表)的 metadata(元数据)也被 cached(缓存)。如果这些表由 Hive 或其他外部工具更新,则需要手动刷新以确保 consistent metadata(一致的元数据)。

  1. // spark is an existing SparkSession
  2. spark.catalog.refreshTable("my_table")
  1. // spark is an existing SparkSession
  2. spark.catalog().refreshTable("my_table");
  1. # spark is an existing SparkSession
  2. spark.catalog.refreshTable("my_table")
  1. REFRESH TABLE my_table;

Configuration(配置)

可以使用 SparkSession 上的 setConf 方法或使用 SQL 运行 SET key = value 命令来完成 Parquet 的配置.

Property Name(参数名称) Default(默认) Meaning(含义)
spark.sql.parquet.binaryAsString false 一些其他 Parquet-producing systems(Parquet 生产系统),特别是 Impala,Hive 和旧版本的 Spark SQL,在 writing out(写出)Parquet schema 时,不区分 binary data(二进制数据)和 strings(字符串)。该 flag 告诉 Spark SQL 将 binary data(二进制数据)解释为 string(字符串)以提供与这些系统的兼容性。
spark.sql.parquet.int96AsTimestamp true 一些 Parquet-producing systems,特别是 Impala 和 Hive,将 Timestamp 存入INT96。该 flag 告诉 Spark SQL 将 INT96 数据解析为 timestamp 以提供与这些系统的兼容性。
spark.sql.parquet.cacheMetadata true 打开 Parquet schema metadata 的缓存。可以加快查询静态数据。
spark.sql.parquet.compression.codec snappy 在编写 Parquet 文件时设置 compression codec(压缩编解码器)的使用。可接受的值包括:uncompressed,snappy,gzip,lzo。
spark.sql.parquet.filterPushdown true 设置为 true 时启用 Parquet filter push-down optimization。
spark.sql.hive.convertMetastoreParquet true 当设置为 false 时,Spark SQL 将使用 Hive SerDe 作为 parquet tables,而不是内置的支持。
spark.sql.parquet.mergeSchema false 当为 true 时,Parquet data source(Parquet 数据源)merges(合并)从所有 data files(数据文件)收集的 schemas,否则如果没有可用的 summary file,则从 summary file 或 random data file 中挑选 schema。
spark.sql.optimizer.metadataOnly true 如果为 true,则启用使用表的 metadata 的 metadata-only query optimization 来生成 partition columns(分区列)而不是 table scans(表扫描)。当 scanned(扫描)的所有 columns(列)都是 partition columns(分区列)并且 query(查询)具有满足 distinct semantics(不同语义)的 aggregate operator(聚合运算符)时,它将适用。

JSON Datasets(JSON 数据集)

Spark SQL 可以 automatically infer(自动推断)JSON dataset 的 schema,并将其作为 Dataset[Row] 加载。这个 conversion(转换)可以在 Dataset[String] 上使用 SparkSession.read.json() 来完成,或 JSON 文件。

请注意,以 a json file 提供的文件不是典型的 JSON 文件。每行必须包含一个 separate(单独的),self-contained valid(独立的有效的)JSON 对象。有关更多信息,请参阅 JSON Lines text format, also called newline-delimited JSON

对于 regular multi-line JSON file(常规的多行 JSON 文件),将 multiLine 选项设置为 true

  1. // Primitive types (Int, String, etc) and Product types (case classes) encoders are
  2. // supported by importing this when creating a Dataset.
  3. import spark.implicits._
  4. // A JSON dataset is pointed to by path.
  5. // The path can be either a single text file or a directory storing text files
  6. val path = "examples/src/main/resources/people.json"
  7. val peopleDF = spark.read.json(path)
  8. // The inferred schema can be visualized using the printSchema() method
  9. peopleDF.printSchema()
  10. // root
  11. // |-- age: long (nullable = true)
  12. // |-- name: string (nullable = true)
  13. // Creates a temporary view using the DataFrame
  14. peopleDF.createOrReplaceTempView("people")
  15. // SQL statements can be run by using the sql methods provided by spark
  16. val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
  17. teenagerNamesDF.show()
  18. // +------+
  19. // | name|
  20. // +------+
  21. // |Justin|
  22. // +------+
  23. // Alternatively, a DataFrame can be created for a JSON dataset represented by
  24. // a Dataset[String] storing one JSON object per string
  25. val otherPeopleDataset = spark.createDataset(
  26. """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
  27. val otherPeople = spark.read.json(otherPeopleDataset)
  28. otherPeople.show()
  29. // +---------------+----+
  30. // | address|name|
  31. // +---------------+----+
  32. // |[Columbus,Ohio]| Yin|
  33. // +---------------+----+

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala” in the Spark repo.

Spark SQL 可以 automatically infer(自动推断)JSON dataset 的 schema,并将其作为 Dataset&lt;Row&gt; 加载。这个 conversion(转换)可以在 Dataset&lt;String&gt; 上使用 SparkSession.read.json() 来完成,或 JSON 文件。

请注意,以 a json file 提供的文件不是典型的 JSON 文件。每行必须包含一个 separate(单独的),self-contained valid(独立的有效的)JSON 对象。有关更多信息,请参阅 JSON Lines text format, also called newline-delimited JSON

对于 regular multi-line JSON file(常规的多行 JSON 文件),将 multiLine 选项设置为 true

  1. import org.apache.spark.sql.Dataset;
  2. import org.apache.spark.sql.Row;
  3. // A JSON dataset is pointed to by path.
  4. // The path can be either a single text file or a directory storing text files
  5. Dataset<Row> people = spark.read().json("examples/src/main/resources/people.json");
  6. // The inferred schema can be visualized using the printSchema() method
  7. people.printSchema();
  8. // root
  9. // |-- age: long (nullable = true)
  10. // |-- name: string (nullable = true)
  11. // Creates a temporary view using the DataFrame
  12. people.createOrReplaceTempView("people");
  13. // SQL statements can be run by using the sql methods provided by spark
  14. Dataset<Row> namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
  15. namesDF.show();
  16. // +------+
  17. // | name|
  18. // +------+
  19. // |Justin|
  20. // +------+
  21. // Alternatively, a DataFrame can be created for a JSON dataset represented by
  22. // a Dataset<String> storing one JSON object per string.
  23. List<String> jsonData = Arrays.asList(
  24. "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
  25. Dataset<String> anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING());
  26. Dataset<Row> anotherPeople = spark.read().json(anotherPeopleDataset);
  27. anotherPeople.show();
  28. // +---------------+----+
  29. // | address|name|
  30. // +---------------+----+
  31. // |[Columbus,Ohio]| Yin|
  32. // +---------------+----+

Find full example code at “examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java” in the Spark repo.

Spark SQL 可以 automatically infer(自动推断)JSON dataset 的 schema,并将其作为 DataFrame 加载。可以使用 JSON 文件中的 SparkSession.read.json 进行此 conversion(转换)。

请注意,以 a json file 提供的文件不是典型的 JSON 文件。每行必须包含一个 separate(单独的),self-contained valid(独立的有效的)JSON 对象。有关更多信息,请参阅 JSON Lines text format, also called newline-delimited JSON

对于 regular multi-line JSON file(常规的多行 JSON 文件),将 multiLine 选项设置为 true

  1. # spark is from the previous example.
  2. sc = spark.sparkContext
  3. # A JSON dataset is pointed to by path.
  4. # The path can be either a single text file or a directory storing text files
  5. path = "examples/src/main/resources/people.json"
  6. peopleDF = spark.read.json(path)
  7. # The inferred schema can be visualized using the printSchema() method
  8. peopleDF.printSchema()
  9. # root
  10. # |-- age: long (nullable = true)
  11. # |-- name: string (nullable = true)
  12. # Creates a temporary view using the DataFrame
  13. peopleDF.createOrReplaceTempView("people")
  14. # SQL statements can be run by using the sql methods provided by spark
  15. teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
  16. teenagerNamesDF.show()
  17. # +------+
  18. # | name|
  19. # +------+
  20. # |Justin|
  21. # +------+
  22. # Alternatively, a DataFrame can be created for a JSON dataset represented by
  23. # an RDD[String] storing one JSON object per string
  24. jsonStrings = ['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']
  25. otherPeopleRDD = sc.parallelize(jsonStrings)
  26. otherPeople = spark.read.json(otherPeopleRDD)
  27. otherPeople.show()
  28. # +---------------+----+
  29. # | address|name|
  30. # +---------------+----+
  31. # |[Columbus,Ohio]| Yin|
  32. # +---------------+----+

Find full example code at “examples/src/main/python/sql/datasource.py” in the Spark repo.

Spark SQL 可以 automatically infer(自动推断)JSON dataset 的 schema,并将其作为 DataFrame 加载。使用 read.json() 函数,它从 JSON 文件的目录中加载数据,其中每一行文件都是一个 JSON 对象。

请注意,以 a json file 提供的文件不是典型的 JSON 文件。每行必须包含一个 separate(单独的),self-contained valid(独立的有效的)JSON 对象。有关更多信息,请参阅 JSON Lines text format, also called newline-delimited JSON

对于 regular multi-line JSON file(常规的多行 JSON 文件),将 multiLine 选项设置为 true

  1. # A JSON dataset is pointed to by path.
  2. # The path can be either a single text file or a directory storing text files.
  3. path <- "examples/src/main/resources/people.json"
  4. # Create a DataFrame from the file(s) pointed to by path
  5. people <- read.json(path)
  6. # The inferred schema can be visualized using the printSchema() method.
  7. printSchema(people)
  8. ## root
  9. ## |-- age: long (nullable = true)
  10. ## |-- name: string (nullable = true)
  11. # Register this DataFrame as a table.
  12. createOrReplaceTempView(people, "people")
  13. # SQL statements can be run by using the sql methods.
  14. teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
  15. head(teenagers)
  16. ## name
  17. ## 1 Justin

Find full example code at “examples/src/main/r/RSparkSQLExample.R” in the Spark repo.

  1. CREATE TEMPORARY VIEW jsonTable
  2. USING org.apache.spark.sql.json
  3. OPTIONS (
  4. path "examples/src/main/resources/people.json"
  5. )
  6. SELECT * FROM jsonTable

Hive 表

Spark SQL 还支持读取和写入存储在 Apache Hive 中的数据。但是,由于 Hive 具有大量依赖关系,因此这些依赖关系不包含在默认 Spark 分发中。如果在类路径中找到 Hive 依赖项,Spark 将自动加载它们。请注意,这些 Hive 依赖关系也必须存在于所有工作节点上,因为它们将需要访问 Hive 序列化和反序列化库(SerDes),以访问存储在 Hive 中的数据。

通过将 hive-site.xmlcore-site.xml(用于安全配置)和 hdfs-site.xml(用于 HDFS 配置)文件放在 conf/ 中来完成配置。

当使用 Hive 时,必须用 Hive 支持实例化 SparkSession,包括连接到持续的 Hive 转移,支持 Hive serdes 和 Hive 用户定义的功能。没有现有 Hive 部署的用户仍然可以启用 Hive 支持。当 hive-site.xml 未配置时,上下文会自动在当前目录中创建 metastore_db,并创建由 spark.sql.warehouse.dir 配置的目录,该目录默认为Spark应用程序当前目录中的 spark-warehouse 目录 开始了 请注意,自从2.0.0以来,hive-site.xml 中的 hive.metastore.warehouse.dir 属性已被弃用。而是使用 spark.sql.warehouse.dir 来指定仓库中数据库的默认位置。您可能需要向启动 Spark 应用程序的用户授予写权限。å

  1. import java.io.File
  2. import org.apache.spark.sql.Row
  3. import org.apache.spark.sql.SparkSession
  4. case class Record(key: Int, value: String)
  5. // warehouseLocation points to the default location for managed databases and tables
  6. val warehouseLocation = new File("spark-warehouse").getAbsolutePath
  7. val spark = SparkSession
  8. .builder()
  9. .appName("Spark Hive Example")
  10. .config("spark.sql.warehouse.dir", warehouseLocation)
  11. .enableHiveSupport()
  12. .getOrCreate()
  13. import spark.implicits._
  14. import spark.sql
  15. sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
  16. sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
  17. // Queries are expressed in HiveQL
  18. sql("SELECT * FROM src").show()
  19. // +---+-------+
  20. // |key| value|
  21. // +---+-------+
  22. // |238|val_238|
  23. // | 86| val_86|
  24. // |311|val_311|
  25. // ...
  26. // Aggregation queries are also supported.
  27. sql("SELECT COUNT(*) FROM src").show()
  28. // +--------+
  29. // |count(1)|
  30. // +--------+
  31. // | 500 |
  32. // +--------+
  33. // The results of SQL queries are themselves DataFrames and support all normal functions.
  34. val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
  35. // The items in DataFrames are of type Row, which allows you to access each column by ordinal.
  36. val stringsDS = sqlDF.map {
  37. case Row(key: Int, value: String) => s"Key: $key, Value: $value"
  38. }
  39. stringsDS.show()
  40. // +--------------------+
  41. // | value|
  42. // +--------------------+
  43. // |Key: 0, Value: val_0|
  44. // |Key: 0, Value: val_0|
  45. // |Key: 0, Value: val_0|
  46. // ...
  47. // You can also use DataFrames to create temporary views within a SparkSession.
  48. val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
  49. recordsDF.createOrReplaceTempView("records")
  50. // Queries can then join DataFrame data with data stored in Hive.
  51. sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
  52. // +---+------+---+------+
  53. // |key| value|key| value|
  54. // +---+------+---+------+
  55. // | 2| val_2| 2| val_2|
  56. // | 4| val_4| 4| val_4|
  57. // | 5| val_5| 5| val_5|
  58. // ...

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala” in the Spark repo.

  1. import java.io.File;
  2. import java.io.Serializable;
  3. import java.util.ArrayList;
  4. import java.util.List;
  5. import org.apache.spark.api.java.function.MapFunction;
  6. import org.apache.spark.sql.Dataset;
  7. import org.apache.spark.sql.Encoders;
  8. import org.apache.spark.sql.Row;
  9. import org.apache.spark.sql.SparkSession;
  10. public static class Record implements Serializable {
  11. private int key;
  12. private String value;
  13. public int getKey() {
  14. return key;
  15. }
  16. public void setKey(int key) {
  17. this.key = key;
  18. }
  19. public String getValue() {
  20. return value;
  21. }
  22. public void setValue(String value) {
  23. this.value = value;
  24. }
  25. }
  26. // warehouseLocation points to the default location for managed databases and tables
  27. String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
  28. SparkSession spark = SparkSession
  29. .builder()
  30. .appName("Java Spark Hive Example")
  31. .config("spark.sql.warehouse.dir", warehouseLocation)
  32. .enableHiveSupport()
  33. .getOrCreate();
  34. spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive");
  35. spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");
  36. // Queries are expressed in HiveQL
  37. spark.sql("SELECT * FROM src").show();
  38. // +---+-------+
  39. // |key| value|
  40. // +---+-------+
  41. // |238|val_238|
  42. // | 86| val_86|
  43. // |311|val_311|
  44. // ...
  45. // Aggregation queries are also supported.
  46. spark.sql("SELECT COUNT(*) FROM src").show();
  47. // +--------+
  48. // |count(1)|
  49. // +--------+
  50. // | 500 |
  51. // +--------+
  52. // The results of SQL queries are themselves DataFrames and support all normal functions.
  53. Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");
  54. // The items in DataFrames are of type Row, which lets you to access each column by ordinal.
  55. Dataset<String> stringsDS = sqlDF.map(
  56. (MapFunction<Row, String>) row -> "Key: " + row.get(0) + ", Value: " + row.get(1),
  57. Encoders.STRING());
  58. stringsDS.show();
  59. // +--------------------+
  60. // | value|
  61. // +--------------------+
  62. // |Key: 0, Value: val_0|
  63. // |Key: 0, Value: val_0|
  64. // |Key: 0, Value: val_0|
  65. // ...
  66. // You can also use DataFrames to create temporary views within a SparkSession.
  67. List<Record> records = new ArrayList<>();
  68. for (int key = 1; key < 100; key++) {
  69. Record record = new Record();
  70. record.setKey(key);
  71. record.setValue("val_" + key);
  72. records.add(record);
  73. }
  74. Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class);
  75. recordsDF.createOrReplaceTempView("records");
  76. // Queries can then join DataFrames data with data stored in Hive.
  77. spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();
  78. // +---+------+---+------+
  79. // |key| value|key| value|
  80. // +---+------+---+------+
  81. // | 2| val_2| 2| val_2|
  82. // | 2| val_2| 2| val_2|
  83. // | 4| val_4| 4| val_4|
  84. // ...

Find full example code at “examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java” in the Spark repo.

  1. from os.path import expanduser, join, abspath
  2. from pyspark.sql import SparkSession
  3. from pyspark.sql import Row
  4. # warehouse_location points to the default location for managed databases and tables
  5. warehouse_location = abspath('spark-warehouse')
  6. spark = SparkSession \
  7. .builder \
  8. .appName("Python Spark SQL Hive integration example") \
  9. .config("spark.sql.warehouse.dir", warehouse_location) \
  10. .enableHiveSupport() \
  11. .getOrCreate()
  12. # spark is an existing SparkSession
  13. spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
  14. spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
  15. # Queries are expressed in HiveQL
  16. spark.sql("SELECT * FROM src").show()
  17. # +---+-------+
  18. # |key| value|
  19. # +---+-------+
  20. # |238|val_238|
  21. # | 86| val_86|
  22. # |311|val_311|
  23. # ...
  24. # Aggregation queries are also supported.
  25. spark.sql("SELECT COUNT(*) FROM src").show()
  26. # +--------+
  27. # |count(1)|
  28. # +--------+
  29. # | 500 |
  30. # +--------+
  31. # The results of SQL queries are themselves DataFrames and support all normal functions.
  32. sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
  33. # The items in DataFrames are of type Row, which allows you to access each column by ordinal.
  34. stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value))
  35. for record in stringsDS.collect():
  36. print(record)
  37. # Key: 0, Value: val_0
  38. # Key: 0, Value: val_0
  39. # Key: 0, Value: val_0
  40. # ...
  41. # You can also use DataFrames to create temporary views within a SparkSession.
  42. Record = Row("key", "value")
  43. recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)])
  44. recordsDF.createOrReplaceTempView("records")
  45. # Queries can then join DataFrame data with data stored in Hive.
  46. spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
  47. # +---+------+---+------+
  48. # |key| value|key| value|
  49. # +---+------+---+------+
  50. # | 2| val_2| 2| val_2|
  51. # | 4| val_4| 4| val_4|
  52. # | 5| val_5| 5| val_5|
  53. # ...

Find full example code at “examples/src/main/python/sql/hive.py” in the Spark repo.

当使用 Hive 时,必须使用 Hive 支持实例化 SparkSession。这个增加了在 MetaStore 中查找表并使用 HiveQL 编写查询的支持。

  1. # enableHiveSupport defaults to TRUE
  2. sparkR.session(enableHiveSupport = TRUE)
  3. sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
  4. sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
  5. # Queries can be expressed in HiveQL.
  6. results <- collect(sql("FROM src SELECT key, value"))

Find full example code at “examples/src/main/r/RSparkSQLExample.R” in the Spark repo.

指定 Hive 表的存储格式

创建 Hive 表时,需要定义如何 从/向 文件系统 read/write 数据,即 “输入格式” 和 “输出格式”。您还需要定义该表如何将数据反序列化为行,或将行序列化为数据,即 “serde”。以下选项可用于指定存储格式(“serde”, “input format”, “output format”),例如,CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')。默认情况下,我们将以纯文本形式读取表格文件。请注意,Hive 存储处理程序在创建表时不受支持,您可以使用 Hive 端的存储处理程序创建一个表,并使用 Spark SQL 来读取它。

Property Name Meaning
fileFormat fileFormat是一种存储格式规范的包,包括 “serde”,”input format” 和 “output format”。目前我们支持6个文件格式:’sequencefile’,’rcfile’,’orc’,’parquet’,’textfile’和’avro’。
inputFormat, outputFormat 这两个选项将相应的 “InputFormat” 和 “OutputFormat” 类的名称指定为字符串文字,例如:org.apache.hadoop.hive.ql.io.orc.OrcInputFormat。这两个选项必须成对出现,如果您已经指定了 “fileFormat” 选项,则无法指定它们。
serde 此选项指定 serde 类的名称。当指定 fileFormat 选项时,如果给定的 fileFormat 已经包含 serde 的信息,那么不要指定这个选项。目前的 “sequencefile”,”textfile” 和 “rcfile” 不包含 serde 信息,你可以使用这3个文件格式的这个选项。
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim 这些选项只能与 “textfile” 文件格式一起使用。它们定义如何将分隔的文件读入行。

使用 OPTIONS 定义的所有其他属性将被视为 Hive serde 属性。

与不同版本的 Hive Metastore 进行交互

Spark SQL 的 Hive 支持的最重要的部分之一是与 Hive metastore 进行交互,这使得 Spark SQL 能够访问 Hive 表的元数据。从 Spark 1.4.0 开始,使用 Spark SQL 的单一二进制构建可以使用下面所述的配置来查询不同版本的 Hive 转移。请注意,独立于用于与转移点通信的 Hive 版本,内部 Spark SQL 将针对 Hive 1.2.1 进行编译,并使用这些类进行内部执行(serdes,UDF,UDAF等)。

以下选项可用于配置用于检索元数据的 Hive 版本:

属性名称 默认值 含义
spark.sql.hive.metastore.version 1.2.1 Hive metastore 版本。可用选项为 0.12.01.2.1
spark.sql.hive.metastore.jars builtin 当启用 -Phive 时,使用 Hive 1.2.1,它与 Spark 程序集捆绑在一起。选择此选项时,spark.sql.hive.metastore.version 必须为 1.2.1 或未定义。行家 使用从Maven存储库下载的指定版本的Hive jar。通常不建议在生产部署中使用此配置。* 应用于实例化 HiveMetastoreClient 的 jar 的位置。该属性可以是三个选项之一:
  1. builtin

-Phive

spark.sql.hive.metastore.version

1.2.1

  1. maven

  2. JVM 的标准格式的 classpath。该类路径必须包含所有 Hive 及其依赖项,包括正确版本的 Hadoop。这些罐只需要存在于 driver 程序中,但如果您正在运行在 yarn 集群模式,那么您必须确保它们与应用程序一起打包。

    | | spark.sql.hive.metastore.sharedPrefixes | com.mysql.jdbc, org.postgresql, com.microsoft.sqlserver, oracle.jdbc | 使用逗号分隔的类前缀列表,应使用在 Spark SQL 和特定版本的 Hive 之间共享的类加载器来加载。一个共享类的示例就是用来访问 Hive metastore 的 JDBC driver。其它需要共享的类,是需要与已经共享的类进行交互的。例如,log4j 使用的自定义 appender。 | | spark.sql.hive.metastore.barrierPrefixes | (empty) | 一个逗号分隔的类前缀列表,应该明确地为 Spark SQL 正在通信的 Hive 的每个版本重新加载。例如,在通常将被共享的前缀中声明的 Hive UDF(即: org.apache.spark.*)。 |

JDBC 连接其它数据库

Spark SQL 还包括可以使用 JDBC 从其他数据库读取数据的数据源。此功能应优于使用 JdbcRDD。这是因为结果作为 DataFrame 返回,并且可以轻松地在 Spark SQL 中处理或与其他数据源连接。JDBC 数据源也更容易从 Java 或 Python 使用,因为它不需要用户提供 ClassTag。(请注意,这不同于 Spark SQL JDBC 服务器,允许其他应用程序使用 Spark SQL 运行查询)。

要开始使用,您需要在 Spark 类路径中包含特定数据库的 JDBC driver 程序。例如,要从 Spark Shell 连接到 postgres,您将运行以下命令:

  1. bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

可以使用 Data Sources API 将来自远程数据库的表作为 DataFrame 或 Spark SQL 临时视图进行加载。用户可以在数据源选项中指定 JDBC 连接属性。用户密码通常作为登录数据源的连接属性提供。除了连接属性外,Spark 还支持以下不区分大小写的选项:

属性名称 含义
url 要连接的JDBC URL。源特定的连接属性可以在URL中指定。例如jdbc:jdbc:postgresql://localhost/test?user=fred&password=secret
dbtable 应该读取的 JDBC 表。请注意,可以使用在SQL查询的 FROM 子句中有效的任何内容。例如,您可以使用括号中的子查询代替完整表。
driver 用于连接到此 URL 的 JDBC driver 程序的类名。
partitionColumn, lowerBound, upperBound 如果指定了这些选项,则必须指定这些选项。另外,必须指定 numPartitions。他们描述如何从多个 worker 并行读取数据时将表给分区。partitionColumn 必须是有问题的表中的数字列。请注意,lowerBoundupperBound 仅用于决定分区的大小,而不是用于过滤表中的行。因此,表中的所有行将被分区并返回。此选项仅适用于读操作。
numPartitions 在表读写中可以用于并行度的最大分区数。这也确定并发JDBC连接的最大数量。如果要写入的分区数超过此限制,则在写入之前通过调用 coalesce(numPartitions) 将其减少到此限制。
fetchsize JDBC 抓取的大小,用于确定每次数据往返传递的行数。这有利于提升 JDBC driver 的性能,它们的默认值较小(例如:Oracle 是 10 行)。该选项仅适用于读取操作。
batchsize JDBC 批处理的大小,用于确定每次数据往返传递的行数。这有利于提升 JDBC driver 的性能。该选项仅适用于写操作。默认值为 1000
isolationLevel 事务隔离级别,适用于当前连接。它可以是 NONEREAD_COMMITTEDREAD_UNCOMMITTEDREPEATABLE_READ,或 SERIALIZABLE 之一,对应于 JDBC 连接对象定义的标准事务隔离级别,默认为 READ_UNCOMMITTED。此选项仅适用于写操作。请参考 java.sql.Connection 中的文档。
truncate 这是一个与 JDBC 相关的选项。启用 SaveMode.Overwrite 时,此选项会导致 Spark 截断现有表,而不是删除并重新创建。这可以更有效,并且防止表元数据(例如,索引)被移除。但是,在某些情况下,例如当新数据具有不同的模式时,它将无法工作。它默认为 false。此选项仅适用于写操作。
createTableOptions 这是一个与JDBC相关的选项。如果指定,此选项允许在创建表时设置特定于数据库的表和分区选项(例如:CREATE TABLE t (name string) ENGINE=InnoDB.)。此选项仅适用于写操作。
createTableColumnTypes 使用数据库列数据类型而不是默认值,创建表时。数据类型信息应以与 CREATE TABLE 列语法相同的格式指定(例如:"name CHAR(64), comments VARCHAR(1024)")。指定的类型应该是有效的 spark sql 数据类型。此选项仅适用于写操作。
  1. // Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
  2. // Loading data from a JDBC source
  3. val jdbcDF = spark.read
  4. .format("jdbc")
  5. .option("url", "jdbc:postgresql:dbserver")
  6. .option("dbtable", "schema.tablename")
  7. .option("user", "username")
  8. .option("password", "password")
  9. .load()
  10. val connectionProperties = new Properties()
  11. connectionProperties.put("user", "username")
  12. connectionProperties.put("password", "password")
  13. val jdbcDF2 = spark.read
  14. .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
  15. // Saving data to a JDBC source
  16. jdbcDF.write
  17. .format("jdbc")
  18. .option("url", "jdbc:postgresql:dbserver")
  19. .option("dbtable", "schema.tablename")
  20. .option("user", "username")
  21. .option("password", "password")
  22. .save()
  23. jdbcDF2.write
  24. .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
  25. // Specifying create table column data types on write
  26. jdbcDF.write
  27. .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
  28. .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala” in the Spark repo.

  1. // Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
  2. // Loading data from a JDBC source
  3. Dataset<Row> jdbcDF = spark.read()
  4. .format("jdbc")
  5. .option("url", "jdbc:postgresql:dbserver")
  6. .option("dbtable", "schema.tablename")
  7. .option("user", "username")
  8. .option("password", "password")
  9. .load();
  10. Properties connectionProperties = new Properties();
  11. connectionProperties.put("user", "username");
  12. connectionProperties.put("password", "password");
  13. Dataset<Row> jdbcDF2 = spark.read()
  14. .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
  15. // Saving data to a JDBC source
  16. jdbcDF.write()
  17. .format("jdbc")
  18. .option("url", "jdbc:postgresql:dbserver")
  19. .option("dbtable", "schema.tablename")
  20. .option("user", "username")
  21. .option("password", "password")
  22. .save();
  23. jdbcDF2.write()
  24. .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
  25. // Specifying create table column data types on write
  26. jdbcDF.write()
  27. .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
  28. .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

Find full example code at “examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java” in the Spark repo.

  1. # Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
  2. # Loading data from a JDBC source
  3. jdbcDF = spark.read \
  4. .format("jdbc") \
  5. .option("url", "jdbc:postgresql:dbserver") \
  6. .option("dbtable", "schema.tablename") \
  7. .option("user", "username") \
  8. .option("password", "password") \
  9. .load()
  10. jdbcDF2 = spark.read \
  11. .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
  12. properties={"user": "username", "password": "password"})
  13. # Saving data to a JDBC source
  14. jdbcDF.write \
  15. .format("jdbc") \
  16. .option("url", "jdbc:postgresql:dbserver") \
  17. .option("dbtable", "schema.tablename") \
  18. .option("user", "username") \
  19. .option("password", "password") \
  20. .save()
  21. jdbcDF2.write \
  22. .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
  23. properties={"user": "username", "password": "password"})
  24. # Specifying create table column data types on write
  25. jdbcDF.write \
  26. .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \
  27. .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
  28. properties={"user": "username", "password": "password"})

Find full example code at “examples/src/main/python/sql/datasource.py” in the Spark repo.

  1. # Loading data from a JDBC source
  2. df <- read.jdbc("jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")
  3. # Saving data to a JDBC source
  4. write.jdbc(df, "jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")

Find full example code at “examples/src/main/r/RSparkSQLExample.R” in the Spark repo.

  1. CREATE TEMPORARY VIEW jdbcTable
  2. USING org.apache.spark.sql.jdbc
  3. OPTIONS (
  4. url "jdbc:postgresql:dbserver",
  5. dbtable "schema.tablename",
  6. user 'username',
  7. password 'password'
  8. )
  9. INSERT INTO TABLE jdbcTable
  10. SELECT * FROM resultTable

故障排除

  • JDBC driver 程序类必须对客户端会话和所有执行程序上的原始类加载器可见。这是因为 Java 的 DriverManager 类执行安全检查,导致它忽略原始类加载器不可见的所有 driver 程序,当打开连接时。一个方便的方法是修改所有工作节点上的compute_classpath.sh 以包含您的 driver 程序 JAR。
  • 一些数据库,例如 H2,将所有名称转换为大写。您需要使用大写字母来引用 Spark SQL 中的这些名称。

性能调优

对于某些工作负载,可以通过缓存内存中的数据或打开一些实验选项来提高性能。

在内存中缓存数据

Spark SQL 可以通过调用 spark.catalog.cacheTable("tableName")dataFrame.cache() 来使用内存中的列格式来缓存表。然后,Spark SQL 将只扫描所需的列,并将自动调整压缩以最小化内存使用量和 GC 压力。您可以调用 spark.catalog.uncacheTable("tableName") 从内存中删除该表。

内存缓存的配置可以使用 SparkSession 上的 setConf 方法或使用 SQL 运行 SET key=value 命令来完成。

属性名称 默认 含义
spark.sql.inMemoryColumnarStorage.compressed true 当设置为 true 时,Spark SQL 将根据数据的统计信息为每个列自动选择一个压缩编解码器。
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制批量的柱状缓存的大小。更大的批量大小可以提高内存利用率和压缩率,但是在缓存数据时会冒出 OOM 风险。

其他配置选项

以下选项也可用于调整查询执行的性能。这些选项可能会在将来的版本中被废弃,因为更多的优化是自动执行的。

属性名称 默认值 含义
spark.sql.files.maxPartitionBytes 134217728 (128 MB) 在读取文件时,将单个分区打包的最大字节数。
spark.sql.files.openCostInBytes 4194304 (4 MB) 按照字节数来衡量的打开文件的估计费用可以在同一时间进行扫描。将多个文件放入分区时使用。最好过度估计,那么具有小文件的分区将比具有较大文件的分区(首先计划的)更快。
spark.sql.broadcastTimeout 300 广播连接中的广播等待时间超时(秒)
spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 配置执行连接时将广播给所有工作节点的表的最大大小(以字节为单位)。通过将此值设置为-1可以禁用广播。请注意,目前的统计信息仅支持 Hive Metastore 表,其中已运行命令 ANALYZE TABLE &lt;tableName&gt; COMPUTE STATISTICS noscan
spark.sql.shuffle.partitions 200 Configures the number of partitions to use when shuffling data for joins or aggregations。

分布式 SQL 引擎

Spark SQL 也可以充当使用其 JDBC/ODBC 或命令行界面的分布式查询引擎。在这种模式下,最终用户或应用程序可以直接与 Spark SQL 交互运行 SQL 查询,而不需要编写任何代码。

运行 Thrift JDBC/ODBC 服务器

这里实现的 Thrift JDBC/ODBC 服务器对应于 Hive 1.2 中的 HiveServer2。您可以使用 Spark 或 Hive 1.2.1 附带的直线脚本测试 JDBC 服务器。

要启动 JDBC/ODBC 服务器,请在 Spark 目录中运行以下命令:

  1. ./sbin/start-thriftserver.sh

此脚本接受所有 bin/spark-submit 命令行选项,以及 --hiveconf 选项来指定 Hive 属性。您可以运行 ./sbin/start-thriftserver.sh --help 查看所有可用选项的完整列表。默认情况下,服务器监听 localhost:10000\。您可以通过环境变量覆盖此行为,即:

  1. export HIVE_SERVER2_THRIFT_PORT=<listening-port>
  2. export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
  3. ./sbin/start-thriftserver.sh \
  4. --master <master-uri> \
  5. ...

or system properties:

  1. ./sbin/start-thriftserver.sh \
  2. --hiveconf hive.server2.thrift.port=<listening-port> \
  3. --hiveconf hive.server2.thrift.bind.host=<listening-host> \
  4. --master <master-uri>
  5. ...

现在,您可以使用 beeline 来测试 Thrift JDBC/ODBC 服务器:

  1. ./bin/beeline

使用 beeline 方式连接到 JDBC/ODBC 服务器:

  1. beeline> !connect jdbc:hive2://localhost:10000

Beeline 将要求您输入用户名和密码。在非安全模式下,只需输入机器上的用户名和空白密码即可。对于安全模式,请按照 beeline 文档 中的说明进行操作。

配置Hive是通过将 hive-site.xml, core-site.xmlhdfs-site.xml 文件放在 conf/ 中完成的。

您也可以使用 Hive 附带的 beeline 脚本。

Thrift JDBC 服务器还支持通过 HTTP 传输发送 thrift RPC 消息。使用以下设置启用 HTTP 模式作为系统属性或在 conf/ 中的 hive-site.xml 文件中启用:

  1. hive.server2.transport.mode - Set this to value: http
  2. hive.server2.thrift.http.port - HTTP port number to listen on; default is 10001
  3. hive.server2.http.endpoint - HTTP endpoint; default is cliservice

要测试,请使用 beeline 以 http 模式连接到 JDBC/ODBC 服务器:

  1. beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>

运行 Spark SQL CLI

Spark SQL CLI 是在本地模式下运行 Hive 转移服务并执行从命令行输入的查询的方便工具。请注意,Spark SQL CLI 不能与 Thrift JDBC 服务器通信。

要启动 Spark SQL CLI,请在 Spark 目录中运行以下命令:

  1. ./bin/spark-sql

配置 Hive 是通过将 hive-site.xmlcore-site.xmlhdfs-site.xml 文件放在 conf/ 中完成的。您可以运行 ./bin/spark-sql --help 获取所有可用选项的完整列表。

迁移指南

从 Spark SQL 2.1 升级到 2.2

  • Spark 2.1.1 介绍了一个新的配置 key:spark.sql.hive.caseSensitiveInferenceMode。它的默认设置是 NEVER_INFER,其行为与 2.1.0 保持一致。但是,Spark 2.2.0 将此设置的默认值更改为 “INFER_AND_SAVE”,以恢复与底层文件 schema(模式)具有大小写混合的列名称的 Hive metastore 表的兼容性。使用 INFER_AND_SAVE 配置的 value,在第一次访问 Spark 将对其尚未保存推测 schema(模式)的任何 Hive metastore 表执行 schema inference(模式推断)。请注意,对于具有数千个 partitions(分区)的表,模式推断可能是非常耗时的操作。如果不兼容大小写混合的列名,您可以安全地将spark.sql.hive.caseSensitiveInferenceMode 设置为 NEVER_INFER,以避免模式推断的初始开销。请注意,使用新的默认INFER_AND_SAVE 设置,模式推理的结果被保存为 metastore key 以供将来使用。因此,初始模式推断仅发生在表的第一次访问。

从 Spark SQL 2.0 升级到 2.1

  • Datasource tables(数据源表)现在存储了 Hive metastore 中的 partition metadata(分区元数据)。这意味着诸如 ALTER TABLE PARTITION ... SET LOCATION 这样的 Hive DDLs 现在使用 Datasource API 可用于创建 tables(表)。
    • 遗留的数据源表可以通过 MSCK REPAIR TABLE 命令迁移到这种格式。建议迁移遗留表利用 Hive DDL 的支持和提供的计划性能。
    • 要确定表是否已迁移,当在表上发出 DESCRIBE FORMATTED 命令时请查找 PartitionProvider: Catalog 属性。
  • Datasource tables(数据源表)的 INSERT OVERWRITE TABLE ... PARTITION ... 行为的更改。
    • 在以前的 Spark 版本中,INSERT OVERWRITE 覆盖了整个 Datasource table,即使给出一个指定的 partition。现在只有匹配规范的 partition 被覆盖。
    • 请注意,这仍然与 Hive 表的行为不同,Hive 表仅覆盖与新插入数据重叠的分区。

从 Spark SQL 1.6 升级到 2.0

  • SparkSession 现在是 Spark 新的切入点,它替代了老的 SQLContextHiveContext。注意:为了向下兼容,老的 SQLContext 和 HiveContext 仍然保留。可以从 SparkSession 获取一个新的 catalog 接口 — 现有的访问数据库和表的 API,如 listTablescreateExternalTabledropTempViewcacheTable 都被移到该接口。

  • Dataset API 和 DataFrame API 进行了统一。在 Scala 中,DataFrame 变成了 Dataset[Row] 类型的一个别名,而 Java API 使用者必须将 DataFrame 替换成 Dataset&lt;Row&gt;。Dataset 类既提供了强类型转换操作(如 mapfilter 以及 groupByKey)也提供了非强类型转换操作(如 selectgroupBy)。由于编译期的类型安全不是 Python 和 R 语言的一个特性,Dataset 的概念并不适用于这些语言的 API。相反,DataFrame 仍然是最基本的编程抽象,就类似于这些语言中单节点 data frame 的概念。

  • Dataset 和 DataFrame API 中 unionAll 已经过时并且由 union 替代。

  • Dataset 和 DataFrame API 中 explode 已经过时,作为选择,可以结合 select 或 flatMap 使用 functions.explode()
  • Dataset 和 DataFrame API 中 registerTempTable 已经过时并且由 createOrReplaceTempView 替代。

  • 对 Hive tables CREATE TABLE ... LOCATION 行为的更改。

    • 从 Spark 2.0 开始,CREATE TABLE ... LOCATIONCREATE EXTERNAL TABLE ... LOCATION 是相同的,以防止意外丢弃用户提供的 locations(位置)中的现有数据。这意味着,在用户指定位置的 Spark SQL 中创建的 Hive 表始终是 Hive 外部表。删除外部表将不会删除数据。用户不能指定 Hive managed tables(管理表)的位置。请注意,这与Hive行为不同。
    • 因此,这些表上的 “DROP TABLE” 语句不会删除数据。

从 Spark SQL 1.5 升级到 1.6

  • 从 Spark 1.6 开始,默认情况下服务器在多 session(会话)模式下运行。这意味着每个 JDBC/ODBC 连接拥有一份自己的 SQL 配置和临时函数注册。缓存表仍在并共享。如果您希望以旧的单会话模式运行 Thrift server,请设置选项 spark.sql.hive.thriftServer.singleSessiontrue。您既可以将此选项添加到 spark-defaults.conf,或者通过 --conf 将它传递给 start-thriftserver.sh
  1. ./sbin/start-thriftserver.sh \
  2. --conf spark.sql.hive.thriftServer.singleSession=true \
  3. ...
  • 从 1.6.1 开始,在 sparkR 中 withColumn 方法支持添加一个新列或更换 DataFrame 同名的现有列。

  • 从 Spark 1.6 开始,LongType 强制转换为 TimestampType 期望是秒,而不是微秒。这种更改是为了匹配 Hive 1.2 的行为,以便从 numeric(数值)类型进行更一致的类型转换到 TimestampType。更多详情请参阅 SPARK-11724

从 Spark SQL 1.4 升级到 1.5

  • 使用手动管理的内存优化执行,现在是默认启用的,以及代码生成表达式求值。这些功能既可以通过设置 spark.sql.tungsten.enabledfalse 来禁止使用。
  • Parquet 的模式合并默认情况下不再启用。它可以通过设置 spark.sql.parquet.mergeSchematrue 以重新启用。
  • 字符串在 Python 列的 columns(列)现在支持使用点(.)来限定列或访问嵌套值。例如 df['table.column.nestedField']。但是,这意味着如果你的列名中包含任何圆点,你现在必须避免使用反引号(如 table.column.with.dots.nested)。
  • 在内存中的列存储分区修剪默认是开启的。它可以通过设置 spark.sql.inMemoryColumnarStorage.partitionPruningfalse 来禁用。
  • 无限精度的小数列不再支持,而不是 Spark SQL 最大精度为 38。当从 BigDecimal 对象推断模式时,现在使用(38,18)。在 DDL 没有指定精度时,则默认保留 Decimal(10, 0)
  • 时间戳现在存储在 1 微秒的精度,而不是 1 纳秒的。
  • 在 sql 语句中,floating point(浮点数)现在解析为 decimal。HiveQL 解析保持不变。
  • SQL / DataFrame 函数的规范名称现在是小写(例如 sum vs SUM)。
  • JSON 数据源不会自动加载由其他应用程序(未通过 Spark SQL 插入到数据集的文件)创建的新文件。对于 JSON 持久表(即表的元数据存储在 Hive Metastore),用户可以使用 REFRESH TABLE SQL 命令或 HiveContextrefreshTable 方法,把那些新文件列入到表中。对于代表一个 JSON dataset 的 DataFrame,用户需要重新创建 DataFrame,同时 DataFrame 中将包括新的文件。
  • PySpark 中 DataFrame 的 withColumn 方法支持添加新的列或替换现有的同名列。

从 Spark SQL 1.3 升级到 1.4

DataFrame data reader/writer interface

基于用户反馈,我们创建了一个新的更流畅的 API,用于读取(SQLContext.read)中的数据并写入数据(DataFrame.write),并且旧的 API 将过时(例如,SQLContext.parquetFileSQLContext.jsonFile)。

针对 SQLContext.readScalaJava),Python) 和 DataFrame.writeScalaJava),Python)的更多细节,请看 API 文档。

DataFrame.groupBy 保留 grouping columns(分组的列)

根据用户的反馈,我们更改了 DataFrame.groupBy().agg() 的默认行为以保留 DataFrame 结果中的 grouping columns(分组列)。为了在 1.3 中保持该行为,请设置 spark.sql.retainGroupColumnsfalse

  1. // In 1.3.x, in order for the grouping column "department" to show up,
  2. // it must be included explicitly as part of the agg function call.
  3. df.groupBy("department").agg($"department", max("age"), sum("expense"))
  4. // In 1.4+, grouping column "department" is included automatically.
  5. df.groupBy("department").agg(max("age"), sum("expense"))
  6. // Revert to 1.3 behavior (not retaining grouping column) by:
  7. sqlContext.setConf("spark.sql.retainGroupColumns", "false")
  1. // In 1.3.x, in order for the grouping column "department" to show up,
  2. // it must be included explicitly as part of the agg function call.
  3. df.groupBy("department").agg(col("department"), max("age"), sum("expense"));
  4. // In 1.4+, grouping column "department" is included automatically.
  5. df.groupBy("department").agg(max("age"), sum("expense"));
  6. // Revert to 1.3 behavior (not retaining grouping column) by:
  7. sqlContext.setConf("spark.sql.retainGroupColumns", "false");
  1. import pyspark.sql.functions as func
  2. # In 1.3.x, in order for the grouping column "department" to show up,
  3. # it must be included explicitly as part of the agg function call.
  4. df.groupBy("department").agg(df["department"], func.max("age"), func.sum("expense"))
  5. # In 1.4+, grouping column "department" is included automatically.
  6. df.groupBy("department").agg(func.max("age"), func.sum("expense"))
  7. # Revert to 1.3.x behavior (not retaining grouping column) by:
  8. sqlContext.setConf("spark.sql.retainGroupColumns", "false")

DataFrame.withColumn 上的行为更改

之前 1.4 版本中,DataFrame.withColumn() 只支持添加列。该列将始终在 DateFrame 结果中被加入作为新的列,即使现有的列可能存在相同的名称。从 1.4 版本开始,DataFrame.withColumn() 支持添加与所有现有列的名称不同的列或替换现有的同名列。

请注意,这一变化仅适用于 Scala API,并不适用于 PySpark 和 SparkR。

从 Spark SQL 1.0-1.2 升级到 1.3

在 Spark 1.3 中,我们从 Spark SQL 中删除了 “Alpha” 的标签,作为一部分已经清理过的可用的 API。从 Spark 1.3 版本以上,Spark SQL 将提供在 1.X 系列的其他版本的二进制兼容性。这种兼容性保证不包括被明确标记为不稳定的(即 DeveloperApi 类或 Experimental)API。

重命名 DataFrame 的 SchemaRDD

升级到 Spark SQL 1.3 版本时,用户会发现最大的变化是,SchemaRDD 已更名为 DataFrame。这主要是因为 DataFrames 不再从 RDD 直接继承,而是由 RDDS 自己来实现这些功能。DataFrames 仍然可以通过调用 .rdd 方法转换为 RDDS。

在 Scala 中,有一个从 SchemaRDDDataFrame 类型别名,可以为一些情况提供源代码兼容性。它仍然建议用户更新他们的代码以使用 DataFrame 来代替。Java 和 Python 用户需要更新他们的代码。

Java 和 Scala APIs 的统一

此前 Spark 1.3 有单独的Java兼容类(JavaSQLContextJavaSchemaRDD),借鉴于 Scala API。在 Spark 1.3 中,Java API 和 Scala API 已经统一。两种语言的用户可以使用 SQLContextDataFrame。一般来说论文类尝试使用两种语言的共有类型(如 Array 替代了一些特定集合)。在某些情况下不通用的类型情况下,(例如,passing in closures 或 Maps)使用函数重载代替。

此外,该 Java 的特定类型的 API 已被删除。Scala 和 Java 的用户可以使用存在于 org.apache.spark.sql.types 类来描述编程模式。

隔离隐式转换和删除 dsl 包(仅Scala)

许多 Spark 1.3 版本以前的代码示例都以 import sqlContext._ 开始,这提供了从 sqlContext 范围的所有功能。在 Spark 1.3 中,我们移除了从 RDDs 到 DateFrame 再到 SQLContext 内部对象的隐式转换。用户现在应该写成 import sqlContext.implicits._.

此外,隐式转换现在只能使用方法 toDF 来增加由 Product(即 case classes or tuples)构成的 RDD,而不是自动应用。

当使用 DSL 内部的函数时(现在使用 DataFrame API 来替换),用户习惯导入 org.apache.spark.sql.catalyst.dsl。相反,应该使用公共的 dataframe 函数 API:import org.apache.spark.sql.functions._.

针对 DataType 删除在 org.apache.spark.sql 包中的一些类型别名(仅限于 Scala)

Spark 1.3 移除存在于基本 SQL 包的 DataType 类型别名。开发人员应改为导入类 org.apache.spark.sql.types

UDF 注册迁移到 sqlContext.udf 中(Java & Scala)

用于注册 UDF 的函数,不管是 DataFrame DSL 还是 SQL 中用到的,都被迁移到 SQLContext 中的 udf 对象中。

  1. sqlContext.udf.register("strLen", (s: String) => s.length())
  1. sqlContext.udf().register("strLen", (String s) -> s.length(), DataTypes.IntegerType);

Python UDF 注册保持不变。

Python DataTypes 不再是 Singletons(单例的)

在 Python 中使用 DataTypes 时,你需要先构造它们(如:StringType()),而不是引用一个单例对象。

与 Apache Hive 的兼容

Spark SQL 在设计时就考虑到了和 Hive metastore,SerDes 以及 UDF 之间的兼容性。目前 Hive SerDes 和 UDF 都是基于 Hive 1.2.1 版本,并且Spark SQL 可以连接到不同版本的Hive metastore(从 0.12.0 到 1.2.1,可以参考 与不同版本的 Hive Metastore 交互))

在现有的 Hive Warehouses 中部署

Spark SQL Thrift JDBC server 采用了开箱即用的设计以兼容已有的 Hive 安装版本。你不需要修改现有的 Hive Metastore,或者改变数据的位置和表的分区。

所支持的 Hive 特性

Spark SQL 支持绝大部分的 Hive 功能,如:

  • Hive query(查询)语句,包括:
    • SELECT
    • GROUP BY
    • ORDER BY
    • CLUSTER BY
    • SORT BY
  • 所有 Hive 操作,包括:
    • 关系运算符(===&lt;&gt;&lt;&gt;&gt;=&lt;=,等等)
    • 算术运算符(+-*/%,等等)
    • 逻辑运算符(AND&&OR||,等等)
    • 复杂类型的构造
    • 数学函数(signlncos,等等)
    • String 函数(instrlengthprintf,等等)
  • 用户定义函数(UDF)
  • 用户定义聚合函数(UDAF)
  • 用户定义 serialization formats(SerDes)
  • 窗口函数
  • Joins
    • JOIN
    • {LEFT|RIGHT|FULL} OUTER JOIN
    • LEFT SEMI JOIN
    • CROSS JOIN
  • Unions
  • Sub-queries(子查询)
    • SELECT col FROM (SELECT a + b AS col from t1) t2
  • Sampling
  • Explain
  • Partitioned tables including dynamic partition insertion
  • View
  • 所有的 Hive DDL 函数,包括:
    • CREATE TABLE
    • CREATE TABLE AS SELECT
    • ALTER TABLE
  • 大部分的 Hive Data types(数据类型),包括:
    • TINYINT
    • SMALLINT
    • INT
    • BIGINT
    • BOOLEAN
    • FLOAT
    • DOUBLE
    • STRING
    • BINARY
    • TIMESTAMP
    • DATE
    • ARRAY&lt;&gt;
    • MAP&lt;&gt;
    • STRUCT&lt;&gt;

未支持的 Hive 函数

以下是目前还不支持的 Hive 函数列表。在 Hive 部署中这些功能大部分都用不到。

主要的 Hive 功能

  • Tables 使用 buckets 的 Tables:bucket 是 Hive table partition 中的 hash partitioning。Spark SQL 还不支持 buckets.

Esoteric Hive 功能

  • UNION 类型
  • Unique join
  • Column 统计信息的收集:Spark SQL does not piggyback scans to collect column statistics at the moment and only supports populating the sizeInBytes field of the hive metastore.

Hive Input/Output Formats

  • File format for CLI: For results showing back to the CLI, Spark SQL only supports TextOutputFormat.
  • Hadoop archive

Hive 优化

有少数 Hive 优化还没有包含在 Spark 中。其中一些(比如 indexes 索引)由于 Spark SQL 的这种内存计算模型而显得不那么重要。另外一些在 Spark SQL 未来的版本中会持续跟踪。

  • Block 级别的 bitmap indexes 和虚拟 columns(用于构建 indexes)
  • 自动为 join 和 groupBy 计算 reducer 个数:目前在 Spark SQL 中,你需要使用 “SET spark.sql.shuffle.partitions=[num_tasks];” 来控制 post-shuffle 的并行度。
  • 仅 Meta-data 的 query:对于只使用 metadata 就能回答的查询,Spark SQL 仍然会启动计算结果的任务。
  • Skew data flag:Spark SQL 不遵循 Hive 中 skew 数据的标记。
  • STREAMTABLE hint in join:Spark SQL 不遵循 STREAMTABLE hint。
  • 对于查询结果合并多个小文件:如果输出的结果包括多个小文件,Hive 可以可选的合并小文件到一些大文件中去,以避免溢出 HDFS metadata。Spark SQL 还不支持这样。

参考

数据类型

Spark SQL 和 DataFrames 支持下面的数据类型:

  • Numeric types
    • ByteType: Represents 1-byte signed integer numbers. The range of numbers is from -128 to 127.
    • ShortType: Represents 2-byte signed integer numbers. The range of numbers is from -32768 to 32767.
    • IntegerType: Represents 4-byte signed integer numbers. The range of numbers is from -2147483648 to 2147483647.
    • LongType: Represents 8-byte signed integer numbers. The range of numbers is from -9223372036854775808 to 9223372036854775807.
    • FloatType: Represents 4-byte single-precision floating point numbers.
    • DoubleType: Represents 8-byte double-precision floating point numbers.
    • DecimalType: Represents arbitrary-precision signed decimal numbers. Backed internally by java.math.BigDecimal. A BigDecimal consists of an arbitrary precision integer unscaled value and a 32-bit integer scale.
  • String type
    • StringType: Represents character string values.
  • Binary type
    • BinaryType: Represents byte sequence values.
  • Boolean type
    • BooleanType: Represents boolean values.
  • Datetime type
    • TimestampType: Represents values comprising values of fields year, month, day, hour, minute, and second.
    • DateType: Represents values comprising values of fields year, month, day.
  • Complex types
    • ArrayType(elementType, containsNull): Represents values comprising a sequence of elements with the type of elementType. containsNull is used to indicate if elements in a ArrayType value can have null values.
    • MapType(keyType, valueType, valueContainsNull): Represents values comprising a set of key-value pairs. The data type of keys are described by keyType and the data type of values are described by valueType. For a MapType value, keys are not allowed to have null values. valueContainsNull is used to indicate if values of a MapType value can have null values.
    • StructType(fields): Represents values with the structure described by a sequence of StructFields (fields).
      • StructField(name, dataType, nullable): Represents a field in a StructType. The name of a field is indicated by name. The data type of a field is indicated by dataType. nullable is used to indicate if values of this fields can have null values.

Spark SQL 的所有数据类型都在包 org.apache.spark.sql.types 中。你可以用下示例示例来访问它们.

  1. import org.apache.spark.sql.types._

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” in the Spark repo.

Data type(数据类型) Scala 中的 Value 类型 访问或创建数据类型的 API
ByteType Byte ByteType
ShortType Short ShortType
IntegerType Int IntegerType
LongType Long LongType
FloatType Float FloatType
DoubleType Double DoubleType
DecimalType java.math.BigDecimal DecimalType
StringType String StringType
BinaryType Array[Byte] BinaryType
BooleanType Boolean BooleanType
TimestampType java.sql.Timestamp TimestampType
DateType java.sql.Date DateType
ArrayType scala.collection.Seq ArrayType(elementType, [containsNull])

Note(注意): containsNull 的默认值是 true。 | MapType | scala.collection.Map | MapType(keyType, valueType, [valueContainsNull]) Note(注意): valueContainsNull 的默认值是 true。 | StructType | org.apache.spark.sql.Row | StructType(fields) Note(注意): fields 是 StructFields 的 Seq。所有,两个 fields 拥有相同的名称是不被允许的。 | StructField | 该 field(字段)数据类型的 Scala 中的 value 类型(例如,数据类型为 IntegerType 的 StructField 是 Int) | StructField(name, dataType, [nullable]) Note: nullable 的默认值是 true

Spark SQL 的所有数据类型都在 org.apache.spark.sql.types 的包中。要访问或者创建一个数据类型,请使用 org.apache.spark.sql.types.DataTypes 中提供的 factory 方法.

Data type Value type in Java API to access or create a data type
ByteType byte or Byte DataTypes.ByteType
ShortType short or Short DataTypes.ShortType
IntegerType int or Integer DataTypes.IntegerType
LongType long or Long DataTypes.LongType
FloatType float or Float DataTypes.FloatType
DoubleType double or Double DataTypes.DoubleType
DecimalType java.math.BigDecimal DataTypes.createDecimalType()

DataTypes.createDecimalType(precision, scale). | | StringType | String | DataTypes.StringType | | BinaryType | byte[] | DataTypes.BinaryType | | BooleanType | boolean or Boolean | DataTypes.BooleanType | | TimestampType | java.sql.Timestamp | DataTypes.TimestampType | | DateType | java.sql.Date | DataTypes.DateType | | ArrayType | java.util.List | DataTypes.createArrayType(elementType) Note: The value of containsNull will be true DataTypes.createArrayType(elementType, containsNull). | | MapType | java.util.Map | DataTypes.createMapType(keyType, valueType) Note: The value of valueContainsNull will be true. DataTypes.createMapType(keyType, valueType, valueContainsNull) | | StructType | org.apache.spark.sql.Row | DataTypes.createStructType(fields) Note: fields is a List or an array of StructFields. Also, two fields with the same name are not allowed. | | StructField | The value type in Java of the data type of this field (For example, int for a StructField with the data type IntegerType) | DataTypes.createStructField(name, dataType, nullable) |

Spark SQL 的所有数据类型都在 pyspark.sql.types 的包中。你可以通过如下方式来访问它们.

  1. from pyspark.sql.types import *
Data type Value type in Python API to access or create a data type
ByteType int or long

Note: Numbers will be converted to 1-byte signed integer numbers at runtime. Please make sure that numbers are within the range of -128 to 127. | ByteType() | | ShortType | int or long Note: Numbers will be converted to 2-byte signed integer numbers at runtime. Please make sure that numbers are within the range of -32768 to 32767. | ShortType() | | IntegerType | int or long | IntegerType() | | LongType | long Note: Numbers will be converted to 8-byte signed integer numbers at runtime. Please make sure that numbers are within the range of -9223372036854775808 to 9223372036854775807. Otherwise, please convert data to decimal.Decimal and use DecimalType. | LongType() | | FloatType | float Note: Numbers will be converted to 4-byte single-precision floating point numbers at runtime. | FloatType() | | DoubleType | float | DoubleType() | | DecimalType | decimal.Decimal | DecimalType() | | StringType | string | StringType() | | BinaryType | bytearray | BinaryType() | | BooleanType | bool | BooleanType() | | TimestampType | datetime.datetime | TimestampType() | | DateType | datetime.date | DateType() | | ArrayType | list, tuple, or array | ArrayType(elementType, [containsNull]) Note: The default value of containsNull is True. | | MapType | dict | MapType(keyType, valueType, [valueContainsNull]) Note: The default value of valueContainsNull is True. | | StructType | list or tuple | StructType(fields) Note: fields is a Seq of StructFields. Also, two fields with the same name are not allowed. | | StructField | The value type in Python of the data type of this field (For example, Int for a StructField with the data type IntegerType) | StructField(name, dataType, [nullable]) Note: The default value of nullable is True. |

Data type Value type in R API to access or create a data type
ByteType integer

Note: Numbers will be converted to 1-byte signed integer numbers at runtime. Please make sure that numbers are within the range of -128 to 127. | “byte” | | ShortType | integer Note: Numbers will be converted to 2-byte signed integer numbers at runtime. Please make sure that numbers are within the range of -32768 to 32767. | “short” | | IntegerType | integer | “integer” | | LongType | integer Note: Numbers will be converted to 8-byte signed integer numbers at runtime. Please make sure that numbers are within the range of -9223372036854775808 to 9223372036854775807. Otherwise, please convert data to decimal.Decimal and use DecimalType. | “long” | | FloatType | numeric Note: Numbers will be converted to 4-byte single-precision floating point numbers at runtime. | “float” | | DoubleType | numeric | “double” | | DecimalType | Not supported | Not supported | | StringType | character | “string” | | BinaryType | raw | “binary” | | BooleanType | logical | “bool” | | TimestampType | POSIXct | “timestamp” | | DateType | Date | “date” | | ArrayType | vector or list | list(type=”array”, elementType=elementType, containsNull=[containsNull]) Note: The default value of containsNull is TRUE. | | MapType | environment | list(type=”map”, keyType=keyType, valueType=valueType, valueContainsNull=[valueContainsNull]) Note: The default value of valueContainsNull is TRUE. | | StructType | named list | list(type=”struct”, fields=fields) Note: fields is a Seq of StructFields. Also, two fields with the same name are not allowed. | | StructField | The value type in R of the data type of this field (For example, integer for a StructField with the data type IntegerType) | list(name=name, type=dataType, nullable=[nullable]) Note: The default value of nullable is TRUE. |

NaN Semantics

当处理一些不符合标准浮点数语义的 floatdouble 类型时,对于 Not-a-Number(NaN) 需要做一些特殊处理。具体如下:

  • NaN = NaN 返回 true。
  • 在 aggregations(聚合)操作中,所有的 NaN values 将被分到同一个组中。
  • 在 join key 中 NaN 可以当做一个普通的值。
  • NaN 值在升序排序中排到最后,比任何其他数值都大。