Overview|概述

Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations. There are several ways to interact with Spark SQL including SQL and the Dataset API. When computing a result the same execution engine is used, independent of which API/language you are using to express the computation. This unification means that developers can easily switch back and forth between different APIs based on which provides the most natural way to express a given transformation.
All of the examples on this page use sample data included in the Spark distribution and can be run in the spark-shell, pyspark shell, or sparkR shell.


Spark SQL是用于结构化数据处理的Spark模块。与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。在内部,Spark SQL使用此额外信息来执行额外的优化。有几种与Spark SQL交互的方法,包括SQL和Dataset API。在计算结果时,将使用相同的执行引擎,而与您用来表示计算的API/语言无关。这种统一意味着开发人员可以很容易地在不同的api之间来回切换,基于此api提供了最自然的方式来表达给定的转换。
此页面上的所有示例均使用Spark分布中包含的示例数据,并且可以在spark-shellpyspark shellsparkR shell中运行。


SQL

One use of Spark SQL is to execute SQL queries. Spark SQL can also be used to read data from an existing Hive installation. For more on how to configure this feature, please refer to the Hive Tables section. When running SQL from within another programming language the results will be returned as a Dataset/DataFrame. You can also interact with the SQL interface using the command-line or over JDBC/ODBC.


Spark SQL的一个用途是执行SQL查询。Spark SQL还可用于从现有Hive安装中读取数据。有关如何配置此功能的更多信息,请参阅Hive Tables部分。从另一种编程语言运行SQL时,结果将作为Dataset/DataFrame返回。您还可以使用命令行或通过JDBC/ODBC与SQL接口进行交互。


Datasets and DataFrames

A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.). The Dataset API is available in Scala and Java. Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName). The case for R is similar.
A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R. In Scala and Java, a DataFrame is represented by a Dataset of Rows. In the Scala API, DataFrame is simply a type alias of Dataset[Row]. While, in Java API, users need to use Dataset<Row> to represent a DataFrame.
Throughout this document, we will often refer to Scala/Java Datasets of Rows as DataFrames.


数据集是数据的分布式集合。Dataset是Spark 1.6中添加的一个新接口,它提供了RDDs的优势 (强大的类型、使用强大的lambda函数的能力) 以及Spark SQL优化的执行引擎的优势。可以从JVM对象构造数据集,然后使用功能转换 (map、flat map、filter等) 进行操作。Dataset API在Scala和Java中可用。Python不支持数据集API。但是由于Python的动态特性,数据集API的许多好处已经可用 (即您可以按名称自然地row.columnName访问行的字段)。R的情况类似。
DataFrame是组织成命名列的数据集。它在概念上等同于关系数据库中的表或R/Python中的数据帧,但在后台进行了更丰富的优化。数据框架可以从多种来源构建,例如: 结构化数据文件、Hive中的表、外部数据库或现有RDDs。DataFrame API在Scala、Java、Python和R中可用。在Scala和Java中,数据框架由行的数据集表示。在Scala API中,DataFrame只是数据集 [Row] 的类型别名。而在Java API中,用户需要使用Dataset 来表示数据框架。
在本文档中,我们经常将行的Scala/Java数据集称为数据框架。

Getting Started|入门

Starting Point: SparkSession|起点: SparkSession

The entry point into all functionality in Spark is the SparkSession class. To create a basic SparkSession, just use SparkSession.builder():


Spark中所有功能的入口是SparkSession类。要创建基本的SparkSession,只需使用SparkSession.builder():

  1. import org.apache.spark.sql.SparkSession
  2. val spark = SparkSession
  3. .builder()
  4. .appName("Spark SQL basic example")
  5. .config("spark.some.config.option", "some-value")
  6. .getOrCreate()
  7. // For implicit conversions like converting RDDs to DataFrames
  8. // 对于隐式转换,如将RDDs转换为数据框架
  9. import spark.implicits._

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” in the Spark repo.
SparkSession in Spark 2.0 provides builtin support for Hive features including the ability to write queries using HiveQL, access to Hive UDFs, and the ability to read data from Hive tables. To use these features, you do not need to have an existing Hive setup.


在spark存储库中的 “examples/src/main/scala/org/apache/Spark/examples/sql/SparkSQLExample.scala” 中查找完整的示例代码。
Spark 2.0中的SparkSession提供了对Hive功能的内置支持,包括使用HiveQL编写查询的能力、对Hive udf的访问以及从Hive表读取数据的能力。要使用这些功能,您不需要现有的Hive设置。

Creating DataFrames|创建数据框架

With a SparkSession, applications can create DataFrames from an existing RDD, from a Hive table, or from Spark data sources.
As an example, the following creates a DataFrame based on the content of a JSON file:


使用SparkSession,应用程序可以从现有RDD、Hive表或Spark数据源创建数据框架。
例如,以下内容基于JSON文件的内容创建DataFrame:

  1. val df = spark.read.json("examples/src/main/resources/people.json")
  2. // Displays the content of the DataFrame to stdout
  3. // 显示stdout的数据框架的内容
  4. df.show()
  5. // +----+-------+
  6. // | age| name|
  7. // +----+-------+
  8. // |null|Michael|
  9. // | 30| Andy|
  10. // | 19| Justin|
  11. // +----+-------+

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” in the Spark repo.


在spark存储库中的 “示例/src/main/scala/org/apache/Spark/示例/sql/SparkSQLExample.scala” 中查找完整的示例代码。

Untyped Dataset Operations (aka DataFrame Operations)

非类型化数据集操作 (又名数据框架操作)

DataFrames provide a domain-specific language for structured data manipulation in Scala, Java, Python and R.
As mentioned above, in Spark 2.0, DataFrames are just Dataset of Rows in Scala and Java API. These operations are also referred as “untyped transformations” in contrast to “typed transformations” come with strongly typed Scala/Java Datasets.
Here we include some basic examples of structured data processing using Datasets:


数据框架为Scala、Java、Python和R中的结构化数据操作提供了一种特定于域的语言。
如上所述,在Spark 2.0中,数据框架只是Scala和Java API中行的数据集。这些操作也被称为 “非类型化转换”,与带有强类型Scala/Java数据集的 “类型化转换” 形成对比。
在这里,我们包括一些使用数据集的结构化数据处理的基本示例:

  • Scala
    1. // This import is needed to use the $-notation
    2. import spark.implicits._
    3. // Print the schema in a tree format
    4. df.printSchema()
    5. // root
    6. // |-- age: long (nullable = true)
    7. // |-- name: string (nullable = true)
    8. // Select only the "name" column
    9. df.select("name").show()
    10. // +-------+
    11. // | name|
    12. // +-------+
    13. // |Michael|
    14. // | Andy|
    15. // | Justin|
    16. // +-------+
    17. // Select everybody, but increment the age by 1
    18. df.select($"name", $"age" + 1).show()
    19. // +-------+---------+
    20. // | name|(age + 1)|
    21. // +-------+---------+
    22. // |Michael| null|
    23. // | Andy| 31|
    24. // | Justin| 20|
    25. // +-------+---------+
    26. // Select people older than 21
    27. df.filter($"age" > 21).show()
    28. // +---+----+
    29. // |age|name|
    30. // +---+----+
    31. // | 30|Andy|
    32. // +---+----+
    33. // Count people by age
    34. df.groupBy("age").count().show()
    35. // +----+-----+
    36. // | age|count|
    37. // +----+-----+
    38. // | 19| 1|
    39. // |null| 1|
    40. // | 30| 1|
    41. // +----+-----+
    _
    Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” in the Spark repo.
    For a complete list of the types of operations that can be performed on a Dataset refer to the API Documentation.
    In addition to simple column references and expressions, Datasets also have a rich library of functions including string manipulation, date arithmetic, common math operations and more. The complete list is available in the DataFrame Function Reference.

在spark存储库中的 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中查找完整的示例代码。
有关可对数据集执行的操作类型的完整列表,请参阅API文档。
除了简单的列引用和表达式,数据集还具有丰富的函数库,包括字符串操作、日期算术、常见的数学运算等。完整列表可在DataFrame函数参考中找到。

Running SQL Queries Programmatically|以编程方式运行SQL查询

The sql function on a SparkSession enables applications to run SQL queries programmatically and returns the result as a DataFrame.


SparkSession上的sql函数使应用程序能够以编程方式运行SQL查询,并将结果作为数据框架返回。

  1. // Register the DataFrame as a SQL temporary view
  2. df.createOrReplaceTempView("people")
  3. val sqlDF = spark.sql("SELECT * FROM people")
  4. sqlDF.show()
  5. // +----+-------+
  6. // | age| name|
  7. // +----+-------+
  8. // |null|Michael|
  9. // | 30| Andy|
  10. // | 19| Justin|
  11. // +----+-------+

_
Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” in the Spark repo.


在spark存储库中的 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中查找完整的示例代码。

Global Temporary View|全局临时视图

Temporary views in Spark SQL are session-scoped and will disappear if the session that creates it terminates. If you want to have a temporary view that is shared among all sessions and keep alive until the Spark application terminates, you can create a global temporary view. Global temporary view is tied to a system preserved database global_temp, and we must use the qualified name to refer it, e.g. SELECT * FROM global_temp.view1.

  • Scala
    1. // Register the DataFrame as a global temporary view
    2. df.createGlobalTempView("people")
    3. // Global temporary view is tied to a system preserved database `global_temp`
    4. spark.sql("SELECT * FROM global_temp.people").show()
    5. // +----+-------+
    6. // | age| name|
    7. // +----+-------+
    8. // |null|Michael|
    9. // | 30| Andy|
    10. // | 19| Justin|
    11. // +----+-------+
    12. // Global temporary view is cross-session
    13. spark.newSession().sql("SELECT * FROM global_temp.people").show()
    14. // +----+-------+
    15. // | age| name|
    16. // +----+-------+
    17. // |null|Michael|
    18. // | 30| Andy|
    19. // | 19| Justin|
    20. // +----+-------+
    Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” in the Spark repo.

在spark存储库中的 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中查找完整的示例代码。

Creating Datasets|创建数据集

Datasets are similar to RDDs, however, instead of using Java serialization or Kryo they use a specialized Encoder to serialize the objects for processing or transmitting over the network. While both encoders and standard serialization are responsible for turning an object into bytes, encoders are code generated dynamically and use a format that allows Spark to perform many operations like filtering, sorting and hashing without deserializing the bytes back into an object.


数据集与RDDs相似,但是,它们使用专门的编码器对对象进行序列化,以便通过网络进行处理或传输,而不是使用Java序列化或Kryo。虽然编码器和标准序列化都负责将对象转换为字节,但编码器是动态生成的代码,并使用允许Spark执行许多操作 (如过滤) 的格式,在不将字节反序列化回对象的情况下对字节进行排序和散列。

  • Scala
    1. // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
    2. // you can use custom classes that implement the Product interface
    3. case class Person(name: String, age: Long)
    4. // Encoders are created for case classes
    5. val caseClassDS = Seq(Person("Andy", 32)).toDS()
    6. caseClassDS.show()
    7. // +----+---+
    8. // |name|age|
    9. // +----+---+
    10. // |Andy| 32|
    11. // +----+---+
    12. // Encoders for most common types are automatically provided by importing spark.implicits._
    13. val primitiveDS = Seq(1, 2, 3).toDS()
    14. primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)
    15. // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
    16. val path = "examples/src/main/resources/people.json"
    17. val peopleDS = spark.read.json(path).as[Person]
    18. peopleDS.show()
    19. // +----+-------+
    20. // | age| name|
    21. // +----+-------+
    22. // |null|Michael|
    23. // | 30| Andy|
    24. // | 19| Justin|
    25. // +----+-------+
    Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” in the Spark repo.

    Interoperating with RDDs|与RDDs交互

    Spark SQL supports two different methods for converting existing RDDs into Datasets. The first method uses reflection to infer the schema of an RDD that contains specific types of objects. This reflection based approach leads to more concise code and works well when you already know the schema while writing your Spark application.
    The second method for creating Datasets is through a programmatic interface that allows you to construct a schema and then apply it to an existing RDD. While this method is more verbose, it allows you to construct Datasets when the columns and their types are not known until runtime.

Spark SQL支持两种不同的方法将现有RDDs转换为数据集。第一种方法使用反射来推断包含特定类型对象的RDD的schema。这种基于反射的方法会导致更简洁的代码,并且当您在编写Spark应用程序时已经知道模式时,可以很好地工作。
创建数据集的第二种方法是通过编程接口,该接口允许您构造schema,然后将其应用于现有RDD。虽然此方法更冗长,但它允许您在运行时才知道列及其类型时构造数据集。

Inferring the Schema Using Reflection|使用反射推断模式

The Scala interface for Spark SQL supports automatically converting an RDD containing case classes to a DataFrame. The case class defines the schema of the table. The names of the arguments to the case class are read using reflection and become the names of the columns. Case classes can also be nested or contain complex types such as Seqs or Arrays. This RDD can be implicitly converted to a DataFrame and then be registered as a table. Tables can be used in subsequent SQL statements.


Spark SQL的Scala接口支持将包含case类的RDD自动转换为数据框架。Case类定义表的schema。使用反射读取case类的参数名称,并成为列的名称。Case类也可以嵌套或包含复杂类型,如Seqs或数组。此RDD可以隐式转换为数据框架,然后注册为表。表可用于后续SQL语句。

  1. // For implicit conversions from RDDs to DataFrames
  2. import spark.implicits._
  3. // Create an RDD of Person objects from a text file, convert it to a Dataframe
  4. val peopleDF = spark.sparkContext
  5. .textFile("examples/src/main/resources/people.txt")
  6. .map(_.split(","))
  7. .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
  8. .toDF()
  9. // Register the DataFrame as a temporary view
  10. peopleDF.createOrReplaceTempView("people")
  11. // SQL statements can be run by using the sql methods provided by Spark
  12. val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
  13. // The columns of a row in the result can be accessed by field index
  14. teenagersDF.map(teenager => "Name: " + teenager(0)).show()
  15. // +------------+
  16. // | value|
  17. // +------------+
  18. // |Name: Justin|
  19. // +------------+
  20. // or by field name
  21. teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
  22. // +------------+
  23. // | value|
  24. // +------------+
  25. // |Name: Justin|
  26. // +------------+
  27. // No pre-defined encoders for Dataset[Map[K,V]], define explicitly
  28. implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
  29. // Primitive types and case classes can be also defined as
  30. // implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()
  31. // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
  32. teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
  33. // Array(Map("name" -> "Justin", "age" -> 19))

_
Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” in the Spark repo.

Programmatically Specifying the Schema|以编程方式指定Schema

When case classes cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users), a DataFrame can be created programmatically with three steps.

  1. Create an RDD of Rows from the original RDD;
  2. Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
  3. Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession.

For example:


当无法提前定义case类时 (例如,记录的结构以字符串形式编码,或者将分析文本数据集,并为不同的用户以不同的方式投影字段),可以通过三个步骤以编程方式创建数据框架。

例如:

  1. import org.apache.spark.sql.types._
  2. // Create an RDD
  3. val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")
  4. // The schema is encoded in a string
  5. val schemaString = "name age"
  6. // Generate the schema based on the string of schema
  7. val fields = schemaString.split(" ")
  8. .map(fieldName => StructField(fieldName, StringType, nullable = true))
  9. val schema = StructType(fields)
  10. // Convert records of the RDD (people) to Rows
  11. val rowRDD = peopleRDD
  12. .map(_.split(","))
  13. .map(attributes => Row(attributes(0), attributes(1).trim))
  14. // Apply the schema to the RDD
  15. val peopleDF = spark.createDataFrame(rowRDD, schema)
  16. // Creates a temporary view using the DataFrame
  17. peopleDF.createOrReplaceTempView("people")
  18. // SQL can be run over a temporary view created using DataFrames
  19. val results = spark.sql("SELECT name FROM people")
  20. // The results of SQL queries are DataFrames and support all the normal RDD operations
  21. // The columns of a row in the result can be accessed by field index or by field name
  22. results.map(attributes => "Name: " + attributes(0)).show()
  23. // +-------------+
  24. // | value|
  25. // +-------------+
  26. // |Name: Michael|
  27. // | Name: Andy|
  28. // | Name: Justin|
  29. // +-------------+

**
Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” in the Spark repo.

Aggregations聚合

The built-in DataFrames functions provide common aggregations such as count(), countDistinct(), avg(), max(), min(), etc. While those functions are designed for DataFrames, Spark SQL also has type-safe versions for some of them in Scala and Java to work with strongly typed Datasets. Moreover, users are not limited to the predefined aggregate functions and can create their own.
内置的数据框架函数提供常见的聚合,如count() 、countDistinct() 、avg() 、max() 、min() 等。虽然这些函数是为数据框架设计的,但Spark SQL还为Scala和Java中的一些函数提供了类型安全版本,以便与强类型数据集一起使用。此外,用户不限于预定义的聚合函数,可以创建自己的函数。

Untyped User-Defined Aggregate Functions非类型化用户定义聚合函数

Users have to extend the UserDefinedAggregateFunction abstract class to implement a custom untyped aggregate function. For example, a user-defined average can look like:


用户必须扩展userdefined聚合函数抽象类才能实现自定义的非类型化聚合函数。例如,用户定义的平均值可以如下所示:

  1. import org.apache.spark.sql.expressions.MutableAggregationBuffer
  2. import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
  3. import org.apache.spark.sql.types._
  4. import org.apache.spark.sql.Row
  5. import org.apache.spark.sql.SparkSession
  6. object MyAverage extends UserDefinedAggregateFunction {
  7. // Data types of input arguments of this aggregate function
  8. def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
  9. // Data types of values in the aggregation buffer
  10. def bufferSchema: StructType = {
  11. StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
  12. }
  13. // The data type of the returned value
  14. def dataType: DataType = DoubleType
  15. // Whether this function always returns the same output on the identical input
  16. def deterministic: Boolean = true
  17. // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
  18. // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
  19. // the opportunity to update its values. Note that arrays and maps inside the buffer are still
  20. // immutable.
  21. def initialize(buffer: MutableAggregationBuffer): Unit = {
  22. buffer(0) = 0L
  23. buffer(1) = 0L
  24. }
  25. // Updates the given aggregation buffer `buffer` with new input data from `input`
  26. def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
  27. if (!input.isNullAt(0)) {
  28. buffer(0) = buffer.getLong(0) + input.getLong(0)
  29. buffer(1) = buffer.getLong(1) + 1
  30. }
  31. }
  32. // Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
  33. def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
  34. buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
  35. buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  36. }
  37. // Calculates the final result
  38. def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
  39. }
  40. // Register the function to access it
  41. spark.udf.register("myAverage", MyAverage)
  42. val df = spark.read.json("examples/src/main/resources/employees.json")
  43. df.createOrReplaceTempView("employees")
  44. df.show()
  45. // +-------+------+
  46. // | name|salary|
  47. // +-------+------+
  48. // |Michael| 3000|
  49. // | Andy| 4500|
  50. // | Justin| 3500|
  51. // | Berta| 4000|
  52. // +-------+------+
  53. val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
  54. result.show()
  55. // +--------------+
  56. // |average_salary|
  57. // +--------------+
  58. // | 3750.0|
  59. // +--------------+

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedUntypedAggregation.scala” in the Spark repo.

Type-Safe User-Defined Aggregate Functions类型安全用户定义的聚合函数

User-defined aggregations for strongly typed Datasets revolve around the Aggregator abstract class. For example, a type-safe user-defined average can look like:

强类型数据集的用户定义聚合围绕聚合器抽象类进行。例如,类型安全的用户定义平均值可以如下所示:

  1. import org.apache.spark.sql.expressions.Aggregator
  2. import org.apache.spark.sql.Encoder
  3. import org.apache.spark.sql.Encoders
  4. import org.apache.spark.sql.SparkSession
  5. case class Employee(name: String, salary: Long)
  6. case class Average(var sum: Long, var count: Long)
  7. object MyAverage extends Aggregator[Employee, Average, Double] {
  8. // A zero value for this aggregation. Should satisfy the property that any b + zero = b
  9. def zero: Average = Average(0L, 0L)
  10. // Combine two values to produce a new value. For performance, the function may modify `buffer`
  11. // and return it instead of constructing a new object
  12. def reduce(buffer: Average, employee: Employee): Average = {
  13. buffer.sum += employee.salary
  14. buffer.count += 1
  15. buffer
  16. }
  17. // Merge two intermediate values
  18. def merge(b1: Average, b2: Average): Average = {
  19. b1.sum += b2.sum
  20. b1.count += b2.count
  21. b1
  22. }
  23. // Transform the output of the reduction
  24. def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
  25. // Specifies the Encoder for the intermediate value type
  26. def bufferEncoder: Encoder[Average] = Encoders.product
  27. // Specifies the Encoder for the final output value type
  28. def outputEncoder: Encoder[Double] = Encoders.scalaDouble
  29. }
  30. val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
  31. ds.show()
  32. // +-------+------+
  33. // | name|salary|
  34. // +-------+------+
  35. // |Michael| 3000|
  36. // | Andy| 4500|
  37. // | Justin| 3500|
  38. // | Berta| 4000|
  39. // +-------+------+
  40. // Convert the function to a `TypedColumn` and give it a name
  41. val averageSalary = MyAverage.toColumn.name("average_salary")
  42. val result = ds.select(averageSalary)
  43. result.show()
  44. // +--------------+
  45. // |average_salary|
  46. // +--------------+
  47. // | 3750.0|
  48. // +--------------+

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala” in the Spark repo.

Data Sources数据源

Spark SQL supports operating on a variety of data sources through the DataFrame interface. A DataFrame can be operated on using relational transformations and can also be used to create a temporary view. Registering a DataFrame as a temporary view allows you to run SQL queries over its data. This section describes the general methods for loading and saving data using the Spark Data Sources and then goes into specific options that are available for the built-in data sources.
Spark SQL支持通过DataFrame接口对多种数据源进行操作。数据框架可以使用关系转换进行操作,也可以用于创建临时视图。将DataFrame注册为临时视图允许您对其数据运行SQL查询。本节介绍使用Spark数据源加载和保存数据的一般方法,然后介绍可用于内置数据源的特定选项。

Generic Load/Save Functions通用加载/保存函数

In the simplest form, the default data source (parquet unless otherwise configured by spark.sql.sources.default) will be used for all operations.

在最简单的形式中,默认数据源 (parquet,除非由spark.sql.sources.de error另行配置) 将用于所有操作。

  1. val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
  2. usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala” in the Spark repo.

Manually Specifying Options手动指定选项

You can also manually specify the data source that will be used along with any extra options that you would like to pass to the data source. Data sources are specified by their fully qualified name (i.e., org.apache.spark.sql.parquet), but for built-in sources you can also use their short names (json, parquet, jdbc, orc, libsvm, csv, text). DataFrames loaded from any data source type can be converted into other types using this syntax.

您还可以手动指定将与要传递给数据源的任何其他选项一起使用的数据源。数据源由其完全限定的名称指定 (i.e.,org.apache.spark.sql.parquet),但对于内置源,您也可以使用它们的简称 (json、parquet、jdbc、orc、libsvm、csv、text)。可以使用此语法将从任何数据源类型加载的数据框架转换为其他类型。

  1. val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
  2. peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala” in the Spark repo.

Run SQL on files directly直接对文件运行SQL

Instead of using read API to load a file into DataFrame and query it, you can also query that file directly with SQL.

您也可以直接使用SQL查询该文件,而不是使用read API将文件加载到数据框架中并进行查询。

  1. val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala” in the Spark repo.

Save Modes保存模式

Save operations can optionally take a SaveMode, that specifies how to handle existing data if present. It is important to realize that these save modes do not utilize any locking and are not atomic. Additionally, when performing an Overwrite, the data will be deleted before writing out the new data.
保存操作可以选择采用保存模式,该模式指定如何处理现有数据 (如果存在)。重要的是要意识到这些保存模式不使用任何锁定,也不是原子的。此外,在执行覆盖时,数据将在写出新数据之前被删除。

Scala/Java Any Language Meaning
SaveMode.ErrorIfExists (default) "error" (default) 保存模式,该模式指定如何处理现有数据 (如果存在)。重要的是要意识到这些保存模式不使用任何锁定,也不是原子的。此外,在执行覆盖时,数据将在写出新数据之前被删除。 When saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown.
SaveMode.Append "append" When saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data.
SaveMode.Overwrite "overwrite" Overwrite mode means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame.
SaveMode.Ignore "ignore" Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data. This is similar to a CREATE TABLE IF NOT EXISTS in SQL.

Saving to Persistent Tables保存到持久表

DataFrames can also be saved as persistent tables into Hive metastore using the saveAsTable command. Notice that an existing Hive deployment is not necessary to use this feature. Spark will create a default local Hive metastore (using Derby) for you. Unlike the createOrReplaceTempView command, saveAsTable will materialize the contents of the DataFrame and create a pointer to the data in the Hive metastore. Persistent tables will still exist even after your Spark program has restarted, as long as you maintain your connection to the same metastore. A DataFrame for a persistent table can be created by calling the table method on a SparkSession with the name of the table.
For file-based data source, e.g. text, parquet, json, etc. you can specify a custom table path via the path option, e.g. df.write.option("path", "/some/path").saveAsTable("t"). When the table is dropped, the custom table path will not be removed and the table data is still there. If no custom table path is specified, Spark will write data to a default table path under the warehouse directory. When the table is dropped, the default table path will be removed too.
Starting from Spark 2.1, persistent datasource tables have per-partition metadata stored in the Hive metastore. This brings several benefits:

  • Since the metastore can return only necessary partitions for a query, discovering all the partitions on the first query to the table is no longer needed.
  • Hive DDLs such as ALTER TABLE PARTITION ... SET LOCATION are now available for tables created with the Datasource API.

Note that partition information is not gathered by default when creating external datasource tables (those with a path option). To sync the partition information in the metastore, you can invoke MSCK REPAIR TABLE.


DataFrames也可以使用saveAsTable命令作为持久表保存到Hive元存储中。请注意,使用此功能不需要现有的Hive部署。Spark将为您创建一个默认的本地Hive元存储 (使用Derby)。与createOrReplaceTempView命令不同,saveAsTable将实现数据框架的内容,并创建指向Hive元存储中的数据的指针。即使您的Spark程序重新启动后,只要您保持与同一元存储的连接,持久表仍然存在。可以通过使用表的名称在SparkSession上调用table方法来创建持久表的DataFrame。
对于基于文件的数据源,例如text、parquet、json等,您可以通过path选项 (例如df) 指定自定义表路径。e.g. df.write.option("path", "/some/path").saveAsTable("t").删除表时,不会删除自定义表路径,并且表数据仍然存在。如果未指定自定义表路径,Spark会将数据写入warehouse目录下的默认表路径。删除表时,也将删除默认表路径。
从Spark 2.1开始,持久数据源表具有存储在Hive元存储中的每个分区元数据。这带来了几个好处:

  • 由于metastore只能为查询返回必要的分区,因此不再需要在对表的第一个查询中发现所有分区。
  • Hive DDLs,如ALTER TABLE PARTITION。..SET LOCATION现在可用于使用Datasource API创建的表。

请注意,创建外部数据源表 (具有path选项的表) 时,默认情况下不会收集分区信息。要同步元存储中的分区信息,您可以调用MSCK修复表。

Bucketing, Sorting and Partitioning分组、排序和分区

For file-based data source, it is also possible to bucket and sort or partition the output. Bucketing and sorting are applicable only to persistent tables:

对于基于文件的数据源,也可以对输出进行存储和排序或分区。分组和排序仅适用于持久表:

  1. peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala” in the Spark repo.
while partitioning can be used with both save and saveAsTable when using the Dataset APIs.

使用数据集api时,分区可与save和saveAsTable一起使用。

  1. usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala” in the Spark repo.
It is possible to use both partitioning and bucketing for a single table:

  • Scala

    1. peopleDF
    2. .write
    3. .partitionBy("favorite_color")
    4. .bucketBy(42, "name")
    5. .saveAsTable("people_partitioned_bucketed")

    Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala” in the Spark repo.
    partitionBy creates a directory structure as described in the Partition Discovery section. Thus, it has limited applicability to columns with high cardinality. In contrast bucketBy distributes data across a fixed number of buckets and can be used when a number of unique values is unbounded.
    PartitionBy创建目录结构,如分区发现部分所述。因此,它对高基数列的适用性有限。相比之下,bucketBy将数据分布在固定数量的存储桶中,并且可以在许多唯一值不受限制时使用。

    Parquet Files文件

    Parquet is a columnar format that is supported by many other data processing systems. Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema of the original data. When writing Parquet files, all columns are automatically converted to be nullable for compatibility reasons.
    Parquet是许多其他数据处理系统支持的柱状格式。Spark SQL支持读取和写入自动保留原始数据架构的Parquet文件。写入Parquet文件时,出于兼容性原因,所有列都会自动转换为可为空。

    Loading Data Programmatically以编程方式加载数据

    Using the data from the above example:

  • Scala

    1. // Encoders for most common types are automatically provided by importing spark.implicits._
    2. import spark.implicits._
    3. val peopleDF = spark.read.json("examples/src/main/resources/people.json")
    4. // DataFrames can be saved as Parquet files, maintaining the schema information
    5. peopleDF.write.parquet("people.parquet")
    6. // Read in the parquet file created above
    7. // Parquet files are self-describing so the schema is preserved
    8. // The result of loading a Parquet file is also a DataFrame
    9. val parquetFileDF = spark.read.parquet("people.parquet")
    10. // Parquet files can also be used to create a temporary view and then used in SQL statements
    11. parquetFileDF.createOrReplaceTempView("parquetFile")
    12. val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
    13. namesDF.map(attributes => "Name: " + attributes(0)).show()
    14. // +------------+
    15. // | value|
    16. // +------------+
    17. // |Name: Justin|
    18. // +------------+

    Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala” in the Spark repo.

    Partition Discovery分区发现

    Table partitioning is a common optimization approach used in systems like Hive. In a partitioned table, data are usually stored in different directories, with partitioning column values encoded in the path of each partition directory. The Parquet data source is now able to discover and infer partitioning information automatically. For example, we can store all our previously used population data into a partitioned table using the following directory structure, with two extra columns, gender and country as partitioning columns:
    表分区是Hive等系统中常用的优化方法。在已分区表中,数据通常存储在不同的目录中,分区列值编码在每个分区目录的路径中。Parquet数据源现在能够自动发现和推断分区信息。例如,我们可以使用以下目录结构将所有以前使用的人口数据存储到分区表中,并使用两个额外的列,性别和国家/地区作为分区列:

    1. path
    2. └── to
    3. └── table
    4. ├── gender=male
    5. ├── ...
    6. ├── country=US
    7. └── data.parquet
    8. ├── country=CN
    9. └── data.parquet
    10. └── ...
    11. └── gender=female
    12. ├── ...
    13. ├── country=US
    14. └── data.parquet
    15. ├── country=CN
    16. └── data.parquet
    17. └── ...

    By passing path/to/table to either SparkSession.read.parquet or SparkSession.read.load, Spark SQL will automatically extract the partitioning information from the paths. Now the schema of the returned DataFrame becomes:
    通过将path/to/table传递到SparkSession.read.parquet或SparkSession.read.load,Spark SQL将自动从路径中提取分区信息。现在,返回的数据框架的schema变为:

    1. root
    2. |-- name: string (nullable = true)
    3. |-- age: long (nullable = true)
    4. |-- gender: string (nullable = true)
    5. |-- country: string (nullable = true)

    Notice that the data types of the partitioning columns are automatically inferred. Currently, numeric data types and string type are supported. Sometimes users may not want to automatically infer the data types of the partitioning columns. For these use cases, the automatic type inference can be configured by spark.sql.sources.partitionColumnTypeInference.enabled, which is default to true. When type inference is disabled, string type will be used for the partitioning columns.
    Starting from Spark 1.6.0, partition discovery only finds partitions under the given paths by default. For the above example, if users pass path/to/table/gender=male to either SparkSession.read.parquet or SparkSession.read.load, gender will not be considered as a partitioning column. If users need to specify the base path that partition discovery should start with, they can set basePath in the data source options. For example, when path/to/table/gender=male is the path of the data and users set basePath to path/to/table/, gender will be a partitioning column.
    请注意,分区列的数据类型是自动推断的。目前支持数字数据类型和字符串类型。有时,用户可能不想自动推断分区列的数据类型。对于这些用例,可以通过spark.sql.Source.partitionColumnTypeInference.enabled配置自动类型推理,默认为true。禁用类型推断时,将对分区列使用字符串类型。
    从Spark 1.6.0开始,默认情况下,分区发现仅查找给定路径下的分区。对于上述示例,如果用户将路径/传递/表/性别 = 男性传递给任一SparkSession。阅读。镶木地板或火花。阅读。加载,性别将不被视为分区列。如果用户需要指定分区发现应开始使用的基本路径,则可以在数据源选项中设置basePath。例如,当路径/到/表/性别 = 男性是数据的路径,并且用户将basePath设置为路径/到/表/时,性别将是分区列。

    Schema Merging架构合并

    Like ProtocolBuffer, Avro, and Thrift, Parquet also supports schema evolution. Users can start with a simple schema, and gradually add more columns to the schema as needed. In this way, users may end up with multiple Parquet files with different but mutually compatible schemas. The Parquet data source is now able to automatically detect this case and merge schemas of all these files.
    Since schema merging is a relatively expensive operation, and is not a necessity in most cases, we turned it off by default starting from 1.5.0. You may enable it by

  1. setting data source option mergeSchema to true when reading Parquet files (as shown in the examples below), or
  2. setting the global SQL option spark.sql.parquet.mergeSchema to true.

与协议缓冲区、Avro和Thrift一样,Parquet也支持模式演变。用户可以从一个简单的schema开始,并根据需要逐步向schema添加更多列。这样,用户最终可能会得到具有不同但相互兼容的模式的多个Parquet文件。Parquet数据源现在能够自动检测这种情况并合并所有这些文件的模式。
由于schema合并是一个相对昂贵的操作,并且在大多数情况下不是必需的,因此我们默认情况下从1.5.0开始将其关闭。您可以通过以下方式启用它

  1. // This is used to implicitly convert an RDD to a DataFrame.
  2. import spark.implicits._
  3. // Create a simple DataFrame, store into a partition directory
  4. val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
  5. squaresDF.write.parquet("data/test_table/key=1")
  6. // Create another DataFrame in a new partition directory,
  7. // adding a new column and dropping an existing column
  8. val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
  9. cubesDF.write.parquet("data/test_table/key=2")
  10. // Read the partitioned table
  11. val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
  12. mergedDF.printSchema()
  13. // The final schema consists of all 3 columns in the Parquet files together
  14. // with the partitioning column appeared in the partition directory paths
  15. // root
  16. // |-- value: int (nullable = true)
  17. // |-- square: int (nullable = true)
  18. // |-- cube: int (nullable = true)
  19. // |-- key: int (nullable = true)

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala” in the Spark repo.

Hive metastore Parquet table conversion|Hive metastore Parquet表转换

When reading from and writing to Hive metastore Parquet tables, Spark SQL will try to use its own Parquet support instead of Hive SerDe for better performance. This behavior is controlled by the spark.sql.hive.convertMetastoreParquet configuration, and is turned on by default.
在读取和写入Hive metastore Parquet表时,Spark SQL将尝试使用自己的Parquet支持而不是Hive SerDe,以获得更好的性能。此行为由spark.sql.hive.convertMetastoreParquet配置控制,默认情况下处于打开状态。

Hive/Parquet Schema Reconciliation | Hive/Parquet模式调节

There are two key differences between Hive and Parquet from the perspective of table schema processing.

  1. Hive is case insensitive, while Parquet is not
  2. Hive considers all columns nullable, while nullability in Parquet is significant

Due to this reason, we must reconcile Hive metastore schema with Parquet schema when converting a Hive metastore Parquet table to a Spark SQL Parquet table. The reconciliation rules are:

  1. Fields that have the same name in both schema must have the same data type regardless of nullability. The reconciled field should have the data type of the Parquet side, so that nullability is respected.
  2. The reconciled schema contains exactly those fields defined in Hive metastore schema.
    • Any fields that only appear in the Parquet schema are dropped in the reconciled schema.
    • Any fields that only appear in the Hive metastore schema are added as nullable field in the reconciled schema.

从表schema处理的角度来看,Hive和Parquet有两个关键区别。

由于这个原因,在将Hive metastore Parquet表转换为Spark SQL Parquet表时,我们必须将Hive metastore schema与Parquet schema进行协调。对账规则是:

  • 仅出现在Parquet schema中的任何字段都将被删除到已对账的schema中。
  • 仅出现在Hive元存储schema中的任何字段都将作为可空字段添加到调节schema中。

    Metadata Refreshing元数据刷新

    Spark SQL caches Parquet metadata for better performance. When Hive metastore Parquet table conversion is enabled, metadata of those converted tables are also cached. If these tables are updated by Hive or other external tools, you need to refresh them manually to ensure consistent metadata.

Spark SQL缓存Parquet元数据以获得更好的性能。启用Hive metastore Parquet表转换后,这些转换后的表的元数据也会被缓存。如果这些表由Hive或其他外部工具更新,则需要手动刷新它们以确保元数据一致。

  1. // spark is an existing SparkSession
  2. spark.catalog.refreshTable("my_table")

Configuration配置

Configuration of Parquet can be done using the setConf method on SparkSession or by running SET key=value commands using SQL.
可以使用SparkSession上的setConf方法或使用SQL运行SET key = value命令来完成Parquet的配置。

Property Name Default Meaning
spark.sql.parquet.binaryAsString false Some other Parquet-producing systems, in particular Impala, Hive, and older versions of Spark SQL, do not differentiate between binary data and strings when writing out the Parquet schema. This flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems.
spark.sql.parquet.int96AsTimestamp true Some Parquet-producing systems, in particular Impala and Hive, store Timestamp into INT96. This flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems.
spark.sql.parquet.cacheMetadata true Turns on caching of Parquet schema metadata. Can speed up querying of static data.
spark.sql.parquet.compression.codec snappy Sets the compression codec use when writing Parquet files. Acceptable values include: uncompressed, snappy, gzip, lzo.
spark.sql.parquet.filterPushdown true Enables Parquet filter push-down optimization when set to true.
spark.sql.hive.convertMetastoreParquet true When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of the built in support.
spark.sql.parquet.mergeSchema false When true, the Parquet data source merges schemas collected from all data files, otherwise the schema is picked from the summary file or a random data file if no summary file is available.
spark.sql.optimizer.metadataOnly true When true, enable the metadata-only query optimization that use the table’s metadata to produce the partition columns instead of table scans. It applies when all the columns scanned are partition columns and the query has an aggregate operator that satisfies distinct semantics.

JSON Datasets | JSON数据集

Spark SQL can automatically infer the schema of a JSON dataset and load it as a Dataset[Row]. This conversion can be done using SparkSession.read.json() on either a Dataset[String], or a JSON file.
Note that the file that is offered as a json file is not a typical JSON file. Each line must contain a separate, self-contained valid JSON object. For more information, please see JSON Lines text format, also called newline-delimited JSON.
For a regular multi-line JSON file, set the multiLine option to true.
Spark SQL可以自动推断JSON数据集的schema,并将其加载为数据集 [Row]。此转换可以在数据集 [字符串] 或json文件上使用SparkSession.read.JSON () 来完成。
请注意,作为json文件提供的文件不是典型的JSON文件。每行必须包含一个单独的、独立的有效JSON对象。有关更多信息,请参见JSON行文本格式,也称为换行符分隔的JSON。
对于常规的多行JSON文件,请将多行选项设置为true。

  1. // Primitive types (Int, String, etc) and Product types (case classes) encoders are
  2. // supported by importing this when creating a Dataset.
  3. import spark.implicits._
  4. // A JSON dataset is pointed to by path.
  5. // The path can be either a single text file or a directory storing text files
  6. val path = "examples/src/main/resources/people.json"
  7. val peopleDF = spark.read.json(path)
  8. // The inferred schema can be visualized using the printSchema() method
  9. peopleDF.printSchema()
  10. // root
  11. // |-- age: long (nullable = true)
  12. // |-- name: string (nullable = true)
  13. // Creates a temporary view using the DataFrame
  14. peopleDF.createOrReplaceTempView("people")
  15. // SQL statements can be run by using the sql methods provided by spark
  16. val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
  17. teenagerNamesDF.show()
  18. // +------+
  19. // | name|
  20. // +------+
  21. // |Justin|
  22. // +------+
  23. // Alternatively, a DataFrame can be created for a JSON dataset represented by
  24. // a Dataset[String] storing one JSON object per string
  25. val otherPeopleDataset = spark.createDataset(
  26. """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
  27. val otherPeople = spark.read.json(otherPeopleDataset)
  28. otherPeople.show()
  29. // +---------------+----+
  30. // | address|name|
  31. // +---------------+----+
  32. // |[Columbus,Ohio]| Yin|
  33. // +---------------+----+

Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala” in the Spark repo.

Hive Tables

Spark SQL also supports reading and writing data stored in Apache Hive. However, since Hive has a large number of dependencies, these dependencies are not included in the default Spark distribution. If Hive dependencies can be found on the classpath, Spark will load them automatically. Note that these Hive dependencies must also be present on all of the worker nodes, as they will need access to the Hive serialization and deserialization libraries (SerDes) in order to access data stored in Hive.
Configuration of Hive is done by placing your hive-site.xml, core-site.xml (for security configuration), and hdfs-site.xml (for HDFS configuration) file in conf/.
When working with Hive, one must instantiate SparkSession with Hive support, including connectivity to a persistent Hive metastore, support for Hive serdes, and Hive user-defined functions. Users who do not have an existing Hive deployment can still enable Hive support. When not configured by the hive-site.xml, the context automatically creates metastore_db in the current directory and creates a directory configured by spark.sql.warehouse.dir, which defaults to the directory spark-warehouse in the current directory that the Spark application is started. Note that the hive.metastore.warehouse.dir property in hive-site.xml is deprecated since Spark 2.0.0. Instead, use spark.sql.warehouse.dir to specify the default location of database in warehouse. You may need to grant write privilege to the user who starts the Spark application.

import java.io.File

import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

case class Record(key: Int, value: String)

// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = new File(“spark-warehouse”).getAbsolutePath

val spark = SparkSession
.builder()
.appName(“Spark Hive Example”)
.config(“spark.sql.warehouse.dir”, warehouseLocation)
.enableHiveSupport()
.getOrCreate()

import spark.implicits._
import spark.sql

sql(“CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive”)
sql(“LOAD DATA LOCAL INPATH ‘examples/src/main/resources/kv1.txt’ INTO TABLE src”)

// Queries are expressed in HiveQL
sql(“SELECT * FROM src”).show()
// +—-+———-+
// |key| value|
// +—-+———-+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// …

// Aggregation queries are also supported.
sql(“SELECT COUNT(*) FROM src”).show()
// +————+
// |count(1)|
// +————+
// | 500 |
// +————+

// The results of SQL queries are themselves DataFrames and support all normal functions.
val sqlDF = sql(“SELECT key, value FROM src WHERE key < 10 ORDER BY key”)

// The items in DataFrames are of type Row, which allows you to access each column by ordinal.
val stringsDS = sqlDF.map {
case Row(key: Int, value: String) => s”Key: $key, Value: $value”
}
stringsDS.show()
// +——————————+
// | value|
// +——————————+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// …

// You can also use DataFrames to create temporary views within a SparkSession.
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s”val_$i”)))
recordsDF.createOrReplaceTempView(“records”)

// Queries can then join DataFrame data with data stored in Hive.
sql(“SELECT * FROM records r JOIN src s ON r.key = s.key”).show()
// +—-+———+—-+———+
// |key| value|key| value|
// +—-+———+—-+———+
// | 2| val_2| 2| val_2|
// | 4| val_4| 4| val_4|
// | 5| val_5| 5| val_5|
// …
Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala” in the Spark repo.

Specifying storage format for Hive tables

When you create a Hive table, you need to define how this table should read/write data from/to file system, i.e. the “input format” and “output format”. You also need to define how this table should deserialize the data to rows, or serialize rows to data, i.e. the “serde”. The following options can be used to specify the storage format(“serde”, “input format”, “output format”), e.g. CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet'). By default, we will read the table files as plain text. Note that, Hive storage handler is not supported yet when creating table, you can create a table using storage handler at Hive side, and use Spark SQL to read it.

Property Name Meaning
fileFormat A fileFormat is kind of a package of storage format specifications, including “serde”, “input format” and “output format”. Currently we support 6 fileFormats: ‘sequencefile’, ‘rcfile’, ‘orc’, ‘parquet’, ‘textfile’ and ‘avro’.
inputFormat, outputFormat These 2 options specify the name of a corresponding InputFormat and OutputFormat class as a string literal, e.g. org.apache.hadoop.hive.ql.io.orc.OrcInputFormat. These 2 options must be appeared in pair, and you can not specify them if you already specified the fileFormat option.
serde This option specifies the name of a serde class. When the fileFormat option is specified, do not specify this option if the given fileFormat already include the information of serde. Currently “sequencefile”, “textfile” and “rcfile” don’t include the serde information and you can use this option with these 3 fileFormats.
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim These options can only be used with “textfile” fileFormat. They define how to read delimited files into rows.

All other properties defined with OPTIONS will be regarded as Hive serde properties.

Interacting with Different Versions of Hive Metastore

One of the most important pieces of Spark SQL’s Hive support is interaction with Hive metastore, which enables Spark SQL to access metadata of Hive tables. Starting from Spark 1.4.0, a single binary build of Spark SQL can be used to query different versions of Hive metastores, using the configuration described below. Note that independent of the version of Hive that is being used to talk to the metastore, internally Spark SQL will compile against Hive 1.2.1 and use those classes for internal execution (serdes, UDFs, UDAFs, etc).
The following options can be used to configure the version of Hive that is used to retrieve metadata:

Property Name Default Meaning
spark.sql.hive.metastore.version 1.2.1 Version of the Hive metastore. Available options are 0.12.0 through 1.2.1.
spark.sql.hive.metastore.jars builtin Location of the jars that should be used to instantiate the HiveMetastoreClient. This property can be one of three options:
1. builtin
-Phive``spark.sql.hive.metastore.version``1.2.11. maven
1. A classpath in the standard format for the JVM. This classpath must include all of Hive and its dependencies, including the correct version of Hadoop. These jars only need to be present on the driver, but if you are running in yarn cluster mode then you must ensure they are packaged with your application.
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,<br />org.postgresql,<br />com.microsoft.sqlserver,<br />oracle.jdbc A comma separated list of class prefixes that should be loaded using the classloader that is shared between Spark SQL and a specific version of Hive. An example of classes that should be shared is JDBC drivers that are needed to talk to the metastore. Other classes that need to be shared are those that interact with classes that are already shared. For example, custom appenders that are used by log4j.
spark.sql.hive.metastore.barrierPrefixes (empty) A comma separated list of class prefixes that should explicitly be reloaded for each version of Hive that Spark SQL is communicating with. For example, Hive UDFs that are declared in a prefix that typically would be shared (i.e. org.apache.spark.*).

JDBC To Other Databases

Spark SQL also includes a data source that can read data from other databases using JDBC. This functionality should be preferred over using JdbcRDD. This is because the results are returned as a DataFrame and they can easily be processed in Spark SQL or joined with other data sources. The JDBC data source is also easier to use from Java or Python as it does not require the user to provide a ClassTag. (Note that this is different than the Spark SQL JDBC server, which allows other applications to run queries using Spark SQL).
To get started you will need to include the JDBC driver for you particular database on the spark classpath. For example, to connect to postgres from the Spark Shell you would run the following command:

  1. bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

Tables from the remote database can be loaded as a DataFrame or Spark SQL temporary view using the Data Sources API. Users can specify the JDBC connection properties in the data source options. user and password are normally provided as connection properties for logging into the data sources. In addition to the connection properties, Spark also supports the following case-insensitive options:

Property Name Meaning
url The JDBC URL to connect to. The source-specific connection properties may be specified in the URL. e.g., jdbc:postgresql://localhost/test?user=fred&password=secret
dbtable The JDBC table that should be read. Note that anything that is valid in a FROM clause of a SQL query can be used. For example, instead of a full table you could also use a subquery in parentheses.
driver The class name of the JDBC driver to use to connect to this URL.
partitionColumn, lowerBound, upperBound These options must all be specified if any of them is specified. In addition, numPartitions must be specified. They describe how to partition the table when reading in parallel from multiple workers. partitionColumn must be a numeric column from the table in question. Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned. This option applies only to reading.
numPartitions The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing.
fetchsize The JDBC fetch size, which determines how many rows to fetch per round trip. This can help performance on JDBC drivers which default to low fetch size (eg. Oracle with 10 rows). This option applies only to reading.
batchsize The JDBC batch size, which determines how many rows to insert per round trip. This can help performance on JDBC drivers. This option applies only to writing. It defaults to 1000.
isolationLevel The transaction isolation level, which applies to current connection. It can be one of NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ, or SERIALIZABLE, corresponding to standard transaction isolation levels defined by JDBC’s Connection object, with default of READ_UNCOMMITTED. This option applies only to writing. Please refer the documentation in java.sql.Connection.
truncate This is a JDBC writer related option. When SaveMode.Overwrite is enabled, this option causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient, and prevents the table metadata (e.g., indices) from being removed. However, it will not work in some cases, such as when the new data has a different schema. It defaults to false. This option applies only to writing.
createTableOptions This is a JDBC writer related option. If specified, this option allows setting of database-specific table and partition options when creating a table (e.g., CREATE TABLE t (name string) ENGINE=InnoDB.). This option applies only to writing.
createTableColumnTypes The database column data types to use instead of the defaults, when creating the table. Data type information should be specified in the same format as CREATE TABLE columns syntax (e.g: "name CHAR(64), comments VARCHAR(1024)"). The specified types should be valid spark sql data types. This option applies only to writing.

// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read
.format(“jdbc”)
.option(“url”, “jdbc:postgresql:dbserver”)
.option(“dbtable”, “schema.tablename”)
.option(“user”, “username”)
.option(“password”, “password”)
.load()

val connectionProperties = new Properties()
connectionProperties.put(“user”, “username”)
connectionProperties.put(“password”, “password”)
val jdbcDF2 = spark.read
.jdbc(“jdbc:postgresql:dbserver”, “schema.tablename”, connectionProperties)

// Saving data to a JDBC source
jdbcDF.write
.format(“jdbc”)
.option(“url”, “jdbc:postgresql:dbserver”)
.option(“dbtable”, “schema.tablename”)
.option(“user”, “username”)
.option(“password”, “password”)
.save()

jdbcDF2.write
.jdbc(“jdbc:postgresql:dbserver”, “schema.tablename”, connectionProperties)

// Specifying create table column data types on write
jdbcDF.write
.option(“createTableColumnTypes”, “name CHAR(64), comments VARCHAR(1024)”)
.jdbc(“jdbc:postgresql:dbserver”, “schema.tablename”, connectionProperties)
Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala” in the Spark repo.

Troubleshooting

  • The JDBC driver class must be visible to the primordial class loader on the client session and on all executors. This is because Java’s DriverManager class does a security check that results in it ignoring all drivers not visible to the primordial class loader when one goes to open a connection. One convenient way to do this is to modify compute_classpath.sh on all worker nodes to include your driver JARs.
  • Some databases, such as H2, convert all names to upper case. You’ll need to use upper case to refer to those names in Spark SQL.

    Performance Tuning

    For some workloads it is possible to improve performance by either caching data in memory, or by turning on some experimental options.

    Caching Data In Memory

    Spark SQL can cache tables using an in-memory columnar format by calling spark.catalog.cacheTable("tableName") or dataFrame.cache(). Then Spark SQL will scan only required columns and will automatically tune compression to minimize memory usage and GC pressure. You can call spark.catalog.uncacheTable("tableName") to remove the table from memory.
    Configuration of in-memory caching can be done using the setConf method on SparkSession or by running SET key=value commands using SQL.
Property Name Default Meaning
spark.sql.inMemoryColumnarStorage.compressed true When set to true Spark SQL will automatically select a compression codec for each column based on statistics of the data.
spark.sql.inMemoryColumnarStorage.batchSize 10000 Controls the size of batches for columnar caching. Larger batch sizes can improve memory utilization and compression, but risk OOMs when caching data.

Other Configuration Options

The following options can also be used to tune the performance of query execution. It is possible that these options will be deprecated in future release as more optimizations are performed automatically.

Property Name Default Meaning
spark.sql.files.maxPartitionBytes 134217728 (128 MB) The maximum number of bytes to pack into a single partition when reading files.
spark.sql.files.openCostInBytes 4194304 (4 MB) The estimated cost to open a file, measured by the number of bytes could be scanned in the same time. This is used when putting multiple files into a partition. It is better to over estimated, then the partitions with small files will be faster than partitions with bigger files (which is scheduled first).
spark.sql.broadcastTimeout 300 Timeout in seconds for the broadcast wait time in broadcast joins
spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan has been run.
spark.sql.shuffle.partitions 200 Configures the number of partitions to use when shuffling data for joins or aggregations.

Distributed SQL Engine

Spark SQL can also act as a distributed query engine using its JDBC/ODBC or command-line interface. In this mode, end-users or applications can interact with Spark SQL directly to run SQL queries, without the need to write any code.

Running the Thrift JDBC/ODBC server

The Thrift JDBC/ODBC server implemented here corresponds to the HiveServer2 in Hive 1.2.1 You can test the JDBC server with the beeline script that comes with either Spark or Hive 1.2.1.
To start the JDBC/ODBC server, run the following in the Spark directory:

  1. ./sbin/start-thriftserver.sh

This script accepts all bin/spark-submit command line options, plus a --hiveconf option to specify Hive properties. You may run ./sbin/start-thriftserver.sh --help for a complete list of all available options. By default, the server listens on localhost:10000. You may override this behaviour via either environment variables, i.e.:

  1. export HIVE_SERVER2_THRIFT_PORT=<listening-port>
  2. export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
  3. ./sbin/start-thriftserver.sh \
  4. --master <master-uri> \
  5. ...

or system properties:

  1. ./sbin/start-thriftserver.sh \
  2. --hiveconf hive.server2.thrift.port=<listening-port> \
  3. --hiveconf hive.server2.thrift.bind.host=<listening-host> \
  4. --master <master-uri>
  5. ...

Now you can use beeline to test the Thrift JDBC/ODBC server:

  1. ./bin/beeline

Connect to the JDBC/ODBC server in beeline with:

  1. beeline> !connect jdbc:hive2://localhost:10000

Beeline will ask you for a username and password. In non-secure mode, simply enter the username on your machine and a blank password. For secure mode, please follow the instructions given in the beeline documentation.
Configuration of Hive is done by placing your hive-site.xml, core-site.xml and hdfs-site.xml files in conf/.
You may also use the beeline script that comes with Hive.
Thrift JDBC server also supports sending thrift RPC messages over HTTP transport. Use the following setting to enable HTTP mode as system property or in hive-site.xml file in conf/:

  1. hive.server2.transport.mode - Set this to value: http
  2. hive.server2.thrift.http.port - HTTP port number to listen on; default is 10001
  3. hive.server2.http.endpoint - HTTP endpoint; default is cliservice

To test, use beeline to connect to the JDBC/ODBC server in http mode with:

  1. beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>

Running the Spark SQL CLI

The Spark SQL CLI is a convenient tool to run the Hive metastore service in local mode and execute queries input from the command line. Note that the Spark SQL CLI cannot talk to the Thrift JDBC server.
To start the Spark SQL CLI, run the following in the Spark directory:

  1. ./bin/spark-sql

Configuration of Hive is done by placing your hive-site.xml, core-site.xml and hdfs-site.xml files in conf/. You may run ./bin/spark-sql --help for a complete list of all available options.

Migration Guide

Upgrading From Spark SQL 2.1 to 2.2

  • Spark 2.1.1 introduced a new configuration key: spark.sql.hive.caseSensitiveInferenceMode. It had a default setting of NEVER_INFER, which kept behavior identical to 2.1.0. However, Spark 2.2.0 changes this setting’s default value to INFER_AND_SAVE to restore compatibility with reading Hive metastore tables whose underlying file schema have mixed-case column names. With the INFER_AND_SAVE configuration value, on first access Spark will perform schema inference on any Hive metastore table for which it has not already saved an inferred schema. Note that schema inference can be a very time consuming operation for tables with thousands of partitions. If compatibility with mixed-case column names is not a concern, you can safely set spark.sql.hive.caseSensitiveInferenceMode to NEVER_INFER to avoid the initial overhead of schema inference. Note that with the new default INFER_AND_SAVE setting, the results of the schema inference are saved as a metastore key for future use. Therefore, the initial schema inference occurs only at a table’s first access.

    Upgrading From Spark SQL 2.0 to 2.1

  • Datasource tables now store partition metadata in the Hive metastore. This means that Hive DDLs such asALTER TABLE PARTITION ... SET LOCATIONare now available for tables created with the Datasource API.

    • Legacy datasource tables can be migrated to this format via the MSCK REPAIR TABLE command. Migrating legacy tables is recommended to take advantage of Hive DDL support and improved planning performance.
    • To determine if a table has been migrated, look for the PartitionProvider: Catalog attribute when issuing DESCRIBE FORMATTED on the table.
  • Changes toINSERT OVERWRITE TABLE ... PARTITION ...behavior for Datasource tables.

    • In prior Spark versions INSERT OVERWRITE overwrote the entire Datasource table, even when given a partition specification. Now only partitions matching the specification are overwritten.
    • Note that this still differs from the behavior of Hive tables, which is to overwrite only partitions overlapping with newly inserted data.

      Upgrading From Spark SQL 1.6 to 2.0

  • SparkSession is now the new entry point of Spark that replaces the old SQLContext and HiveContext. Note that the old SQLContext and HiveContext are kept for backward compatibility. A new catalog interface is accessible from SparkSession - existing API on databases and tables access such as listTables, createExternalTable, dropTempView, cacheTable are moved here.

  • Dataset API and DataFrame API are unified. In Scala, DataFrame becomes a type alias for Dataset[Row], while Java API users must replace DataFrame with Dataset<Row>. Both the typed transformations (e.g., map, filter, and groupByKey) and untyped transformations (e.g., select and groupBy) are available on the Dataset class. Since compile-time type-safety in Python and R is not a language feature, the concept of Dataset does not apply to these languages’ APIs. Instead, DataFrame remains the primary programing abstraction, which is analogous to the single-node data frame notion in these languages.
  • Dataset and DataFrame API unionAll has been deprecated and replaced by union
  • Dataset and DataFrame API explode has been deprecated, alternatively, use functions.explode() with select or flatMap
  • Dataset and DataFrame API registerTempTable has been deprecated and replaced by createOrReplaceTempView
  • Changes toCREATE TABLE ... LOCATIONbehavior for Hive tables.

    • From Spark 2.0, CREATE TABLE ... LOCATION is equivalent to CREATE EXTERNAL TABLE ... LOCATION in order to prevent accidental dropping the existing data in the user-provided locations. That means, a Hive table created in Spark SQL with the user-specified location is always a Hive external table. Dropping external tables will not remove the data. Users are not allowed to specify the location for Hive managed tables. Note that this is different from the Hive behavior.
    • As a result, DROP TABLE statements on those tables will not remove the data.

      Upgrading From Spark SQL 1.5 to 1.6

  • From Spark 1.6, by default the Thrift server runs in multi-session mode. Which means each JDBC/ODBC connection owns a copy of their own SQL configuration and temporary function registry. Cached tables are still shared though. If you prefer to run the Thrift server in the old single-session mode, please set option spark.sql.hive.thriftServer.singleSession to true. You may either add this option to spark-defaults.conf, or pass it to start-thriftserver.sh via --conf:

    1. ./sbin/start-thriftserver.sh \
    2. --conf spark.sql.hive.thriftServer.singleSession=true \
    3. ...
  • Since 1.6.1, withColumn method in sparkR supports adding a new column to or replacing existing columns of the same name of a DataFrame.

  • From Spark 1.6, LongType casts to TimestampType expect seconds instead of microseconds. This change was made to match the behavior of Hive 1.2 for more consistent type casting to TimestampType from numeric types. See SPARK-11724 for details.

    Upgrading From Spark SQL 1.4 to 1.5

  • Optimized execution using manually managed memory (Tungsten) is now enabled by default, along with code generation for expression evaluation. These features can both be disabled by setting spark.sql.tungsten.enabled to false.

  • Parquet schema merging is no longer enabled by default. It can be re-enabled by setting spark.sql.parquet.mergeSchema to true.
  • Resolution of strings to columns in python now supports using dots (.) to qualify the column or access nested values. For example df['table.column.nestedField']. However, this means that if your column name contains any dots you must now escape them using backticks (e.g., table.column.with.dots.nested).
  • In-memory columnar storage partition pruning is on by default. It can be disabled by setting spark.sql.inMemoryColumnarStorage.partitionPruning to false.
  • Unlimited precision decimal columns are no longer supported, instead Spark SQL enforces a maximum precision of 38. When inferring schema from BigDecimal objects, a precision of (38, 18) is now used. When no precision is specified in DDL then the default remains Decimal(10, 0).
  • Timestamps are now stored at a precision of 1us, rather than 1ns
  • In the sql dialect, floating point numbers are now parsed as decimal. HiveQL parsing remains unchanged.
  • The canonical name of SQL/DataFrame functions are now lower case (e.g., sum vs SUM).
  • JSON data source will not automatically load new files that are created by other applications (i.e. files that are not inserted to the dataset through Spark SQL). For a JSON persistent table (i.e. the metadata of the table is stored in Hive Metastore), users can use REFRESH TABLE SQL command or HiveContext’s refreshTable method to include those new files to the table. For a DataFrame representing a JSON dataset, users need to recreate the DataFrame and the new DataFrame will include new files.
  • DataFrame.withColumn method in pySpark supports adding a new column or replacing existing columns of the same name.

    Upgrading from Spark SQL 1.3 to 1.4

    DataFrame data reader/writer interface

    Based on user feedback, we created a new, more fluid API for reading data in (SQLContext.read) and writing data out (DataFrame.write), and deprecated the old APIs (e.g., SQLContext.parquetFile, SQLContext.jsonFile).
    See the API docs for SQLContext.read ( Scala, Java), Python ) and DataFrame.write ( Scala, Java), Python ) more information.

    DataFrame.groupBy retains grouping columns

    Based on user feedback, we changed the default behavior of DataFrame.groupBy().agg() to retain the grouping columns in the resulting DataFrame. To keep the behavior in 1.3, set spark.sql.retainGroupColumns to false.

  • Scala

  • Java
  • Python
  • Scala
  • Java
  • Python

    1. // In 1.3.x, in order for the grouping column "department" to show up,
    2. // it must be included explicitly as part of the agg function call.
    3. df.groupBy("department").agg($"department", max("age"), sum("expense"))
    4. // In 1.4+, grouping column "department" is included automatically.
    5. df.groupBy("department").agg(max("age"), sum("expense"))
    6. // Revert to 1.3 behavior (not retaining grouping column) by:
    7. sqlContext.setConf("spark.sql.retainGroupColumns", "false")

    Behavior change on DataFrame.withColumn

    Prior to 1.4, DataFrame.withColumn() supports adding a column only. The column will always be added as a new column with its specified name in the result DataFrame even if there may be any existing columns of the same name. Since 1.4, DataFrame.withColumn() supports adding a column of a different name from names of all existing columns or replacing existing columns of the same name.
    Note that this change is only for Scala API, not for PySpark and SparkR.

    Upgrading from Spark SQL 1.0-1.2 to 1.3

    In Spark 1.3 we removed the “Alpha” label from Spark SQL and as part of this did a cleanup of the available APIs. From Spark 1.3 onwards, Spark SQL will provide binary compatibility with other releases in the 1.X series. This compatibility guarantee excludes APIs that are explicitly marked as unstable (i.e., DeveloperAPI or Experimental).

    Rename of SchemaRDD to DataFrame

    The largest change that users will notice when upgrading to Spark SQL 1.3 is that SchemaRDD has been renamed to DataFrame. This is primarily because DataFrames no longer inherit from RDD directly, but instead provide most of the functionality that RDDs provide though their own implementation. DataFrames can still be converted to RDDs by calling the .rdd method.
    In Scala there is a type alias from SchemaRDD to DataFrame to provide source compatibility for some use cases. It is still recommended that users update their code to use DataFrame instead. Java and Python users will need to update their code.

    Unification of the Java and Scala APIs

    Prior to Spark 1.3 there were separate Java compatible classes (JavaSQLContext and JavaSchemaRDD) that mirrored the Scala API. In Spark 1.3 the Java API and Scala API have been unified. Users of either language should use SQLContext and DataFrame. In general theses classes try to use types that are usable from both languages (i.e. Array instead of language specific collections). In some cases where no common type exists (e.g., for passing in closures or Maps) function overloading is used instead.
    Additionally the Java specific types API has been removed. Users of both Scala and Java should use the classes present in org.apache.spark.sql.types to describe schema programmatically.

    Isolation of Implicit Conversions and Removal of dsl Package (Scala-only)

    Many of the code examples prior to Spark 1.3 started with import sqlContext._, which brought all of the functions from sqlContext into scope. In Spark 1.3 we have isolated the implicit conversions for converting RDDs into DataFrames into an object inside of the SQLContext. Users should now write import sqlContext.implicits._.
    Additionally, the implicit conversions now only augment RDDs that are composed of Products (i.e., case classes or tuples) with a method toDF, instead of applying automatically.
    When using function inside of the DSL (now replaced with the DataFrame API) users used to import org.apache.spark.sql.catalyst.dsl. Instead the public dataframe functions API should be used: import org.apache.spark.sql.functions._.

    Removal of the type aliases in org.apache.spark.sql for DataType (Scala-only)

    Spark 1.3 removes the type aliases that were present in the base sql package for DataType. Users should instead import the classes in org.apache.spark.sql.types

    UDF Registration Moved to sqlContext.udf (Java & Scala)

    Functions that are used to register UDFs, either for use in the DataFrame DSL or SQL, have been moved into the udf object in SQLContext.

  • Scala

  • Java
  • Scala
  • Java

    1. sqlContext.udf.register("strLen", (s: String) => s.length())

    Python UDF registration is unchanged.

    Python DataTypes No Longer Singletons

    When using DataTypes in Python you will need to construct them (i.e. StringType()) instead of referencing a singleton.

    Compatibility with Apache Hive

    Spark SQL is designed to be compatible with the Hive Metastore, SerDes and UDFs. Currently Hive SerDes and UDFs are based on Hive 1.2.1, and Spark SQL can be connected to different versions of Hive Metastore (from 0.12.0 to 2.1.1. Also see [Interacting with Different Versions of Hive Metastore] (#interacting-with-different-versions-of-hive-metastore)).

    Deploying in Existing Hive Warehouses

    The Spark SQL Thrift JDBC server is designed to be “out of the box” compatible with existing Hive installations. You do not need to modify your existing Hive Metastore or change the data placement or partitioning of your tables.

    Supported Hive Features

    Spark SQL supports the vast majority of Hive features, such as:

  • Hive query statements, including:

    • SELECT
    • GROUP BY
    • ORDER BY
    • CLUSTER BY
    • SORT BY
  • All Hive operators, including:
    • Relational operators (=, , ==, <>, <, >, >=, <=, etc)
    • Arithmetic operators (+, -, *, /, %, etc)
    • Logical operators (AND, &&, OR, ||, etc)
    • Complex type constructors
    • Mathematical functions (sign, ln, cos, etc)
    • String functions (instr, length, printf, etc)
  • User defined functions (UDF)
  • User defined aggregation functions (UDAF)
  • User defined serialization formats (SerDes)
  • Window functions
  • Joins
    • JOIN
    • {LEFT|RIGHT|FULL} OUTER JOIN
    • LEFT SEMI JOIN
    • CROSS JOIN
  • Unions
  • Sub-queries
    • SELECT col FROM ( SELECT a + b AS col from t1) t2
  • Sampling
  • Explain
  • Partitioned tables including dynamic partition insertion
  • View
  • All Hive DDL Functions, including:
    • CREATE TABLE
    • CREATE TABLE AS SELECT
    • ALTER TABLE
  • Most Hive Data types, including:

    • TINYINT
    • SMALLINT
    • INT
    • BIGINT
    • BOOLEAN
    • FLOAT
    • DOUBLE
    • STRING
    • BINARY
    • TIMESTAMP
    • DATE
    • ARRAY<>
    • MAP<>
    • STRUCT<>

      Unsupported Hive Functionality

      Below is a list of Hive features that we don’t support yet. Most of these features are rarely used in Hive deployments.
      Major Hive Features
  • Tables with buckets: bucket is the hash partitioning within a Hive table partition. Spark SQL doesn’t support buckets yet.

Esoteric Hive Features

  • UNION type
  • Unique join
  • Column statistics collecting: Spark SQL does not piggyback scans to collect column statistics at the moment and only supports populating the sizeInBytes field of the hive metastore.

Hive Input/Output Formats

  • File format for CLI: For results showing back to the CLI, Spark SQL only supports TextOutputFormat.
  • Hadoop archive

Hive Optimizations
A handful of Hive optimizations are not yet included in Spark. Some of these (such as indexes) are less important due to Spark SQL’s in-memory computational model. Others are slotted for future releases of Spark SQL.

  • Block level bitmap indexes and virtual columns (used to build indexes)
  • Automatically determine the number of reducers for joins and groupbys: Currently in Spark SQL, you need to control the degree of parallelism post-shuffle using “SET spark.sql.shuffle.partitions=[num_tasks];”.
  • Meta-data only query: For queries that can be answered by using only meta data, Spark SQL still launches tasks to compute the result.
  • Skew data flag: Spark SQL does not follow the skew data flags in Hive.
  • STREAMTABLE hint in join: Spark SQL does not follow the STREAMTABLE hint.
  • Merge multiple small files for query results: if the result output contains multiple small files, Hive can optionally merge the small files into fewer large files to avoid overflowing the HDFS metadata. Spark SQL does not support that.

    Reference

    Data Types

    Spark SQL and DataFrames support the following data types:

  • Numeric types

    • ByteType: Represents 1-byte signed integer numbers. The range of numbers is from -128 to 127.
    • ShortType: Represents 2-byte signed integer numbers. The range of numbers is from -32768 to 32767.
    • IntegerType: Represents 4-byte signed integer numbers. The range of numbers is from -2147483648 to 2147483647.
    • LongType: Represents 8-byte signed integer numbers. The range of numbers is from -9223372036854775808 to 9223372036854775807.
    • FloatType: Represents 4-byte single-precision floating point numbers.
    • DoubleType: Represents 8-byte double-precision floating point numbers.
    • DecimalType: Represents arbitrary-precision signed decimal numbers. Backed internally by java.math.BigDecimal. A BigDecimal consists of an arbitrary precision integer unscaled value and a 32-bit integer scale.
  • String type
    • StringType: Represents character string values.
  • Binary type
    • BinaryType: Represents byte sequence values.
  • Boolean type
    • BooleanType: Represents boolean values.
  • Datetime type
    • TimestampType: Represents values comprising values of fields year, month, day, hour, minute, and second.
    • DateType: Represents values comprising values of fields year, month, day.
  • Complex types
    • ArrayType(elementType, containsNull): Represents values comprising a sequence of elements with the type of elementType. containsNull is used to indicate if elements in a ArrayType value can have null values.
    • MapType(keyType, valueType, valueContainsNull): Represents values comprising a set of key-value pairs. The data type of keys are described by keyType and the data type of values are described by valueType. For a MapType value, keys are not allowed to have null values. valueContainsNull is used to indicate if values of a MapType value can have null values.
    • StructType(fields): Represents values with the structure described by a sequence ofStructFields (fields).
      • StructField(name, dataType, nullable): Represents a field in a StructType. The name of a field is indicated by name. The data type of a field is indicated by dataType. nullable is used to indicate if values of this fields can have null values.
  • Scala
  • Java
  • Python
  • R
  • Scala
  • Java
  • Python
  • R

All data types of Spark SQL are located in the package org.apache.spark.sql.types. You can access them by doing
import org.apache.spark.sql.types._
Find full example code at “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” in the Spark repo.

Data type Value type in Scala API to access or create a data type
ByteType Byte ByteType
ShortType Short ShortType
IntegerType Int IntegerType
LongType Long LongType
FloatType Float FloatType
DoubleType Double DoubleType
DecimalType java.math.BigDecimal DecimalType
StringType String StringType
BinaryType Array[Byte] BinaryType
BooleanType Boolean BooleanType
TimestampType java.sql.Timestamp TimestampType
DateType java.sql.Date DateType
ArrayType scala.collection.Seq ArrayType(elementType, [containsNull])
Note: The default value of containsNull is true.
MapType scala.collection.Map MapType(keyType, valueType, [valueContainsNull])
Note: The default value of valueContainsNull is true.
StructType org.apache.spark.sql.Row StructType(fields)
Note: fields is a Seq of StructFields. Also, two fields with the same name are not allowed.
StructField The value type in Scala of the data type of this field (For example, Int for a StructField with the data type IntegerType) StructField(name, dataType, [nullable])
Note: The default value of nullable is true.

NaN Semantics

There is specially handling for not-a-number (NaN) when dealing with float or double types that does not exactly match standard floating point semantics. Specifically:

  • NaN = NaN returns true.
  • In aggregations all NaN values are grouped together.
  • NaN is treated as a normal value in join keys.
  • NaN values go last when in ascending order, larger than any other numeric value.