Spark SQL: Relational Data Processing in Spark

Abstract

Spark Sql makes two main additions:

  • much tighter integratioon between relational and procedural Processing
  • includes a highly extrnsible optimizer, Catalyst

    1.Introduction

    Spark Sql诞生的原因: bridges the gap between relational and procedural model

  • First, Spark SQL provides a Dataframe API that can perform relational operations on both external data sources and Spark’s built-in distributed collections

  • Spark SQL introduces a novel extensible optimizer called Catalyst. Catalyst makes it easy to add data sources, ooptimization rules, and data types for domains such as machine learning

    Dataframe

    Definition

    Dataframes are

  • collections of structured records that can be manipulated using Spark’s procedural API

  • can be created directly from Spark’s built-in distributed collections of Java/Python objects

    Advantages

  1. Make it easy to compute multiple aggregates in one pass using a SQL statement, something that is difficult to express in traditional functional APIs
  2. automatically store data in a columnar format that is more compact than Java/Python objects
  3. SQL go through a relational optimizer, Catalyst, which offers a general framework for transforming trees, which we use to perform analysis, planning, and runtime code generation. It can also support Semi-structured datasource and User Defined Data types.

    2.Background and Goals

    RDD

  • Resilient Distributed Datasets is a collection of Java or Python objects partitioned across a cluster.
  • can be manipulated through operations like map, filter, and reduce, which take functions in the programming language and ship them to nodes on the cluster
  • evaluated lazily
  • the engine does not understand the structure of the data in RDDs

    Goals for Spark Sql

  1. Support relational processing both within Spark programs (on native RDDs) and on external data sources using a programmer- friendly API.
  2. Provide high performance using established DBMS techniques.
  3. Easily support new data sources, including semi-structured data and external databases amenable to query federation.
  4. Enable extension with advanced analytics algorithms such as graph processing and machine learning.

    3.Programming Interfaces

    Dataframe API

  • Distributed collection of rows with the same schema
  • Each Dataframe object represents a logic plan to compute a dataset, but no execution occurs until an “output operation” is triggered

Example:

  1. ctx = new HiveContext()
  2. users = ctx.table("users")
  3. young = users.where(users("age") < 21)
  4. println(young.count())

count is an “output operation”, Spark SQL builds a physical plan to compute the final result. This might include optimization such as only scanning the “age” column of the data if its storage format is columnar, or even using an index in the datasource to count the matching rows

Data model

  • support nested data types: maps, arrays, structs
  • user-defined types

    Dataframe Operations

    Relational DSL

    employees.join(dept, employees("deptId") === dept("id"))
    .where(employees("gender") === "female")
    .groupBy(dept("id"), dept("name"))
    .agg(count("name"))
    
    All of these operators build up an abstract syntax tree(AST) of the expression, which is then passed to Catalyst for optimization

    Temporary table

    users.where(users("age") < 21).registerTempTable("young")
    ctx.sql("SELECT count(*), avg(age) FROM young")
    

    Querying Native Datasets

  1. Dataframes can be created directly from RDDs. Spark SQL can automatically infer the schema of these objects using reflection

    case class User(name: String, age: Int)
    // Create an RDD of User objects usersRDD = spark.parallelize(
    List(User("Alice", 22), User("Bob", 19)))
    // View the RDD as a DataFrame 
    usersDF = usersRDD.toDF
    
  2. Internally, Spark SQL creates a logical data scan operator that points to the RDDs. This is compiled into a physical operator that accesses fields of the native objects.

  3. Different from tradiyional ORM, whivh translate an entire object into a different format. Spark SQL accesses the native objects in-place, extracting only the fileds used in each query.

    In-Memory Caching

  • Spark’s native cache: store JVM objects
  • Columnar cache: applys columnar compression schemes such as dictionary encoding and run-length encoding

    Catalyst optimizer

    Extensible Design:
  1. make it easy to add new optimization techniques and features to Spark Sql
  2. Enable external developers to entend the optimizer

Catalyst uses pattern matching

trees

The main data type in Catalyst is tree composed of node objects

rules

Rules are used for tree transformation

  • Catalyst will tests which parts of a tree a given rule applies to and descending into subtrees that do not match -
  • Rules do not need to be modified as new types of operators are added to the system

    Catalyst’s Tree Transformation Framework

  1. analyzing a logic plan to resolve references
  2. logic plan optimization
  3. Physical planning
  4. Code generation

    analysis

    1000 lines of Code
    Spark SQL begins with a relation to be computed from either
  • AST retured by a SQL parser
  • Dataframe object constructed using the API

It starts by building an “unresolved loogic plan” tree with unbound attributed and data types, then applies rules that do the following:

  • Looking up relations by name from the catalog. -
  • Mapping named attributes, such as col, to the input provided given operator’s children. -
  • Determining which attributes refer to the same value to give them a unique ID (which later allows optimization of expressions such as col = col). -
  • Propagating and coercing types through expressions: for example, we cannot know the type of 1 + col until we have resolved col and possibly cast its subexpressions to compatible types.

    Logical Optimization

    800 lines of code

  • constant folding -

  • predicate pushdown -
  • projection pruning -
  • null propagatioon -
  • boolean expression simplification

    Physical Planning

    500 lines of code
    Selects a plan using a cost model

  • cost based: For relations that are known to be small, Spark SQL uses a broadcast join

  • rule based: pipelining projections or filters into one Spark map opertion

    Code Generation

    generating Java bytecode to run on each machine
  1. result directly in a Scala AST

    def compile(node: Node): AST = node match { case Literal(value) => q"$value"
    case Attribute(name) => q"row.get($name)" case Add(left, right) =>
    q"${compile(left)} + ${compile(right)}" }
    

    The strings beginning with q are quasiquotes, meaning that although they look like strings, they are parsed by the Scala compiler at compile time and represent ASTs for the code within. Quasiquotes can have variables or other ASTs spliced into them, indicated using $ notation. For example, Literal(1) would become the Scala AST for 1, while Attribute(“x”) becomes row.get(“x”). In the end, a tree like Add(Literal(1), Attribute(“x”)) becomes an AST for a Scala expression like 1+row.get(“x”).

  2. Direct Access to to the required field, instead of having to copy the object into a Spark SQL Row and use the Row’s accessor methods.