Spark SQL: Relational Data Processing in Spark
Abstract
Spark Sql makes two main additions:
- much tighter integratioon between relational and procedural Processing
includes a highly extrnsible optimizer, Catalyst
1.Introduction
Spark Sql诞生的原因: bridges the gap between relational and procedural model
First, Spark SQL provides a Dataframe API that can perform relational operations on both external data sources and Spark’s built-in distributed collections
Spark SQL introduces a novel extensible optimizer called Catalyst. Catalyst makes it easy to add data sources, ooptimization rules, and data types for domains such as machine learning
Dataframe
Definition
Dataframes are
collections of structured records that can be manipulated using Spark’s procedural API
- can be created directly from Spark’s built-in distributed collections of Java/Python objects
Advantages
- Make it easy to compute multiple aggregates in one pass using a SQL statement, something that is difficult to express in traditional functional APIs
- automatically store data in a columnar format that is more compact than Java/Python objects
- SQL go through a relational optimizer, Catalyst, which offers a general framework for transforming trees, which we use to perform analysis, planning, and runtime code generation. It can also support Semi-structured datasource and User Defined Data types.
2.Background and Goals
RDD
- Resilient Distributed Datasets is a collection of Java or Python objects partitioned across a cluster.
- can be manipulated through operations like map, filter, and reduce, which take functions in the programming language and ship them to nodes on the cluster
- evaluated lazily
- the engine does not understand the structure of the data in RDDs
Goals for Spark Sql
- Support relational processing both within Spark programs (on native RDDs) and on external data sources using a programmer- friendly API.
- Provide high performance using established DBMS techniques.
- Easily support new data sources, including semi-structured data and external databases amenable to query federation.
- Enable extension with advanced analytics algorithms such as graph processing and machine learning.
3.Programming Interfaces
Dataframe API
- Distributed collection of rows with the same schema
- Each Dataframe object represents a logic plan to compute a dataset, but no execution occurs until an “output operation” is triggered
Example:
ctx = new HiveContext()
users = ctx.table("users")
young = users.where(users("age") < 21)
println(young.count())
count is an “output operation”, Spark SQL builds a physical plan to compute the final result. This might include optimization such as only scanning the “age” column of the data if its storage format is columnar, or even using an index in the datasource to count the matching rows
Data model
- support nested data types: maps, arrays, structs
- user-defined types
Dataframe Operations
Relational DSL
All of these operators build up an abstract syntax tree(AST) of the expression, which is then passed to Catalyst for optimizationemployees.join(dept, employees("deptId") === dept("id")) .where(employees("gender") === "female") .groupBy(dept("id"), dept("name")) .agg(count("name"))
Temporary table
users.where(users("age") < 21).registerTempTable("young") ctx.sql("SELECT count(*), avg(age) FROM young")
Querying Native Datasets
Dataframes can be created directly from RDDs. Spark SQL can automatically infer the schema of these objects using reflection
case class User(name: String, age: Int) // Create an RDD of User objects usersRDD = spark.parallelize( List(User("Alice", 22), User("Bob", 19))) // View the RDD as a DataFrame usersDF = usersRDD.toDF
Internally, Spark SQL creates a logical data scan operator that points to the RDDs. This is compiled into a physical operator that accesses fields of the native objects.
- Different from tradiyional ORM, whivh translate an entire object into a different format. Spark SQL accesses the native objects in-place, extracting only the fileds used in each query.
In-Memory Caching
- Spark’s native cache: store JVM objects
- Columnar cache: applys columnar compression schemes such as dictionary encoding and run-length encoding
Catalyst optimizer
Extensible Design:
- make it easy to add new optimization techniques and features to Spark Sql
- Enable external developers to entend the optimizer
Catalyst uses pattern matching
trees
The main data type in Catalyst is tree composed of node objects
rules
Rules are used for tree transformation
- Catalyst will tests which parts of a tree a given rule applies to and descending into subtrees that do not match -
- Rules do not need to be modified as new types of operators are added to the system
Catalyst’s Tree Transformation Framework
- analyzing a logic plan to resolve references
- logic plan optimization
- Physical planning
- Code generation
analysis
1000 lines of Code
Spark SQL begins with a relation to be computed from either
- AST retured by a SQL parser
- Dataframe object constructed using the API
It starts by building an “unresolved loogic plan” tree with unbound attributed and data types, then applies rules that do the following:
- Looking up relations by name from the catalog. -
- Mapping named attributes, such as col, to the input provided given operator’s children. -
- Determining which attributes refer to the same value to give them a unique ID (which later allows optimization of expressions such as col = col). -
Propagating and coercing types through expressions: for example, we cannot know the type of 1 + col until we have resolved col and possibly cast its subexpressions to compatible types.
Logical Optimization
800 lines of code
constant folding -
- predicate pushdown -
- projection pruning -
- null propagatioon -
boolean expression simplification
Physical Planning
500 lines of code
Selects a plan using a cost modelcost based: For relations that are known to be small, Spark SQL uses a broadcast join
- rule based: pipelining projections or filters into one Spark map opertion
Code Generation
generating Java bytecode to run on each machine
result directly in a Scala AST
def compile(node: Node): AST = node match { case Literal(value) => q"$value" case Attribute(name) => q"row.get($name)" case Add(left, right) => q"${compile(left)} + ${compile(right)}" }
The strings beginning with q are quasiquotes, meaning that although they look like strings, they are parsed by the Scala compiler at compile time and represent ASTs for the code within. Quasiquotes can have variables or other ASTs spliced into them, indicated using $ notation. For example, Literal(1) would become the Scala AST for 1, while Attribute(“x”) becomes row.get(“x”). In the end, a tree like Add(Literal(1), Attribute(“x”)) becomes an AST for a Scala expression like 1+row.get(“x”).
Direct Access to to the required field, instead of having to copy the object into a Spark SQL Row and use the Row’s accessor methods.