3. Spark SQL

3.1. Spark SQL 概述

3.2.1.Spark SQL 是什么

image.png
Spark SQL 是 Spark 用来处理结构化数据的一个模块,它提供了一个编程抽象叫做 DataFrame 并且作为分布式 SQL 查询引擎的作用。

3.2.2.为什么要学习 Spark SQL

我们已经学习了 Hive,它是将 Hive SQL 转换成 MapReduce 然后提交到集群上执行,大大简化了编写MapReduce 的程序的复杂性,由于 MapReduce 这种计算模型执行效率比较慢。所有 Spark SQL 的应运而生,它是将 Spark SQL 转换成 RDD,然后提交到集群执行,执行效率非常快!

3.2.3.Spark SQL 的特点

  • 易整合

image.png

  • 统一的数据访问方式

image.png

  • 兼容 Hive

image.png

  • 标准的数据连接

image.png
SparkSQL 支持两种编程 API

  1. SQL 方式
  2. DataFrame 的方式(DSL)

    3.3. DataFrame

    3.3.1.DataFrame 是什么

    与 RDD 类似,DataFrame 也是一个分布式数据容器。
    DataFrame 更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即 schema。
    同时,与 Hive 类似,DataFrame 也支持嵌套数据类型(struct、array 和 map)。
    从 API 易用性的角度上 看,DataFrame API 提供的是一套高层的关系操作,比函数式的RDD API 要更加友好,门槛更低。
    由于与 R 和 Pandas 的 DataFrame 类似,Spark DataFrame 很好地继承了传统单机数据分析的开发体验。
    DataFrame 里面存放的结构化数据的描述信息,DataFrame 要有表头(表的描述信息),描述了有多少列,每一列数叫什么名字、什么类型、能不能为空
    DataFrame 是特殊的 RDD(RDD+Schema 信息就变成了 DataFrame)
    image.png

    3.4. 入门程序

    我们需要使用 Spark SQL,需要导入对应的 jar 包 ```java org.apache.spark spark-sql_2.11 ${spark.version}
  1. SparkSQL 现在有两个版本<br />SparkSQL 1.x 2.x 的编程 API 有一些变化,企业中都有使用,所以两种方式都讲
  2. <a name="OnKnM"></a>
  3. ### 3.4.1.SparkSQL1.x 的方式
  4. SQL 方式<br />创建一个 SQLContext
  5. 1. 创建 `sparkContext`,然后再创建 `SQLContext` --> SparkSQL 1.x 版本使用
  6. 1. 先创建 RDD,对数据进行清洗,然后关联 case class,**将非结构化数据转换成结构化数据**
  7. 1. 导入隐式转换,显示的调用 `toDF` 方法将 RDD 转换成 DataFrame
  8. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/1774803/1634713823651-ad0c816f-2053-481c-9d03-ea51cc3a524a.png#clientId=u00035801-ec26-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=42&id=ufd63368c&margin=%5Bobject%20Object%5D&name=image.png&originHeight=84&originWidth=945&originalType=binary&ratio=1&rotation=0&showTitle=false&size=49759&status=done&style=none&taskId=u0d541141-cf23-4e1a-8915-1d550bfc833&title=&width=472.5)
  9. 4. 注册临时表(视图)
  10. 4. 执行 SQLTransformationlazy
  11. 4. 显示SQL执行结果 result.show()
  12. 4. 关闭 sc
  13. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/1774803/1634713407717-f3298caa-9469-486e-87a1-4e2b0da15f7c.png#clientId=u00035801-ec26-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=109&id=u27b51070&margin=%5Bobject%20Object%5D&name=image.png&originHeight=314&originWidth=338&originalType=binary&ratio=1&rotation=0&showTitle=false&size=48206&status=done&style=shadow&taskId=u42978c5a-f308-4e91-8977-a79d5f3eb3c&title=&width=117)
  14. ```scala
  15. object SparkSQLDemo01 {
  16. def main(args: Array[String]): Unit = {
  17. val conf: SparkConf = new SparkConf().setAppName("SparkSQLDemo01").setMaster("local[4]")
  18. val sc = new SparkContext(conf)
  19. val sqlContext = new SQLContext(sc)
  20. // 读取数据,生成普通的RDD
  21. val lines: RDD[String] = sc.textFile(args(0))
  22. val userRDD: RDD[User] = lines.map(line => {
  23. val fields: Array[String] = line.split(" ")
  24. val id = fields(0).toInt
  25. val name = fields(1)
  26. val age = fields(2).toInt
  27. User(id, name, age)
  28. })
  29. //导入隐式转换,调用 toDF 将普通RDD转成DF
  30. import sqlContext.implicits._
  31. val df: DataFrame = userRDD.toDF()
  32. //注册临时表
  33. df.registerTempTable("t_user")
  34. //开始进行编程操作
  35. val result: DataFrame = sqlContext.sql("select id,name,age from t_user order by age desc")
  36. //查看结果
  37. result.show()
  38. sc.stop()
  39. }
  40. }
  41. //定义一个样例类
  42. case class User(id:Int,name:String,age:Int);

使用默认的 Row 类型

  • 不用自定义样例类,直接使用 sparkSQL提供的Row类型来装处理好的结构化数据
  • 这种方式需要定义一个 schema
  • 将 schema 和 row 关联起来
  • 通过 createDataFrame 来创建 DF,而不是使用 toDF ```scala object SparkSQLDemo02 { def main(args: Array[String]): Unit = { val conf: SparkConf =\ new SparkConf().setAppName(“SparkSQLDemo02”).setMaster(“local[4]”) val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val lines: RDD[String] = sc.textFile(args(0)) val userRDD: RDD[Row] = lines.map(line => {

    1. val fields: Array[String] = line.split(" ")
    2. val id = fields(0).toInt
    3. val name = fields(1)
    4. val age = fields(2).toInt
    5. Row(id, name, age)

    })

    //结果类型,其实就是表头,用于描述 DataFrame val schema: StructType = StructType(List(

    1. StructField("id", IntegerType),
    2. StructField("name", StringType),
    3. StructField("age", IntegerType)

    ))

    //将 RowRDD 关联 Schema val df: DataFrame = sqlContext.createDataFrame(userRDD,schema)

    //把 DataFrame 先注册临时表 df.registerTempTable(“t_user”) //书写 SQL(SQL 方法应其实是 Transformation) val result: DataFrame = sqlContext.sql(“SELECT * FROM t_user ORDER BY age asc”) //查看结果(触发 Action) result.show() sc.stop() } }

  1. DSL 方式
  2. 1. 创建 sparkContext,然后再创建 SQLContext
  3. 1. 先创建 RDD,对数据进行整理,然后关联 Row,将非结构化数据转换成结构化数据
  4. 1. 定义 schema
  5. 1. 调用 sqlContext createDataFrame 方法
  6. 1. 调用 DSL 的方法 --> 类似于函数式,拼接成SQL语句
  7. 1. 执行 Action
  8. 注意:
  9. - 不用创建临时表
  10. ```scala
  11. package day06.cn.wolfcode.spark.sql
  12. import org.apache.spark.rdd.RDD
  13. import org.apache.spark.sql.types._
  14. import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext}
  15. import org.apache.spark.{SparkConf, SparkContext}
  16. object SparkSQLDemo03 {
  17. def main(args: Array[String]): Unit = {
  18. val conf: SparkConf = new SparkConf().setAppName("SparkSQLDemo03").setMaster("local[4]")
  19. val sc = new SparkContext(conf)
  20. val sqlContext = new SQLContext(sc)
  21. val lines: RDD[String] = sc.textFile(args(0))
  22. val userRDD: RDD[Row] = lines.map(line => {
  23. val fields: Array[String] = line.split(" ")
  24. val id = fields(0).toInt
  25. val name = fields(1)
  26. val age = fields(2).toInt
  27. Row(id, name, age)
  28. })
  29. //结果类型,其实就是表头,用于描述 DataFrame
  30. val schema: StructType = StructType(List(
  31. StructField("id", IntegerType),
  32. StructField("name", StringType),
  33. StructField("age", IntegerType)
  34. ))
  35. //将 RowRDD 关联 Schema
  36. val df: DataFrame = sqlContext.createDataFrame(userRDD,schema)
  37. // DSL
  38. val df1: DataFrame = df.select("id","name","age")
  39. import sqlContext.implicits._
  40. val df2: Dataset[Row] = df1.sort($"age" desc)
  41. //查看结果(触发 Action)
  42. df2.show()
  43. sc.stop()
  44. }
  45. }

3.4.2.SparkSQL2.x 的方式

sql 方式

  • SparkSQL2.x 使用的不是 sqlContext,而是 SparkSession
  • SparkContext 通过 SparkSession来获取 ```scala object SparkSQLTest01 { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName(“SparkSQLTest01”).setMaster(“local[4]”) val sc = new SparkContext(conf)

    //Spark SQL2.x 的执行入口 val sparkSession: SparkSession = SparkSession.builder().appName(“SparkSQLDemo01”).master(“local[4”).getOrCreate() val lines: RDD[String] = sparkSession.sparkContext.textFile(args(0)) //将数据进行整理 val rowRDD: RDD[Row] = lines.map(line => {

    1. val fields = line.split(" ")
    2. val id = fields(0).toInt
    3. val name = fields(1)
    4. val age = fields(2).toInt
    5. Row(id, name, age)

    }) //结果类型,其实就是表头,用于描述 DataFrame val schema: StructType = StructType(List(

    1. StructField("id", IntegerType),
    2. StructField("name", StringType),
    3. StructField("age", IntegerType)

    )) val df: DataFrame = sparkSession.createDataFrame(rowRDD,schema) df.createTempView(“v_user”)

    val df2: DataFrame = sparkSession.sql(“select id,name,age from v_user where age > 20 order by age asc”) df2.show() //关闭资源 sparkSession.stop() } }

  1. DSL 方式
  2. ```scala
  3. object SparkSQLTest01 {
  4. def main(args: Array[String]): Unit = {
  5. val conf: SparkConf = new SparkConf().setAppName("SparkSQLTest01").setMaster("local[4]")
  6. val sc = new SparkContext(conf)
  7. //Spark SQL2.x 的执行入口
  8. val sparkSession: SparkSession = SparkSession.builder().appName("SparkSQLDemo01").master("local[4]").getOrCreate()
  9. val lines: RDD[String] = sparkSession.sparkContext.textFile(args(0))
  10. //将数据进行整理
  11. val rowRDD: RDD[Row] = lines.map(line => {
  12. val fields = line.split(" ")
  13. val id = fields(0).toInt
  14. val name = fields(1)
  15. val age = fields(2).toInt
  16. Row(id, name, age)
  17. })
  18. //结果类型,其实就是表头,用于描述 DataFrame
  19. val schema: StructType = StructType(List(
  20. StructField("id", IntegerType),
  21. StructField("name", StringType),
  22. StructField("age", IntegerType)
  23. ))
  24. val df: DataFrame = sparkSession.createDataFrame(rowRDD,schema)
  25. val df1: DataFrame = df.select("id","name","age")
  26. //导入隐式转换
  27. import sparkSession.implicits._
  28. val rdd2: Dataset[Row] = df1.where($"age" > 20).orderBy($"age" desc)
  29. rdd2.show()
  30. //关闭资源
  31. sparkSession.stop()
  32. }
  33. }

4. Spark SQL 的 WordCount

SparkSQL 2.x

  1. object DataSetWordCount {
  2. def main(args: Array[String]): Unit = {
  3. val sparkSession = SparkSession.builder().appName("sparkSQL2WordCount").master("local[2]").getOrCreate()
  4. val words: RDD[String] = sparkSession.sparkContext.textFile("h:/wordCount/wordCount.txt")
  5. val rdd2 = words.flatMap(_.split(" ")).map(word => {
  6. Row(word)
  7. })
  8. val schema: StructType = StructType(List(
  9. new StructType("value", StringType)
  10. ))
  11. val df = sparkSession.createDataFrame(radd2, schema)
  12. // 建临时表
  13. df.createTempView("v_word")
  14. // 写sql并执行
  15. val df2 = sparkSession.sql("select value, count(value) as ct from v_word group by value order by ct desc")
  16. df2.show
  17. sparkSession.stop()
  18. }
  19. }

DSL

  1. object DataSetWordCount {
  2. def main(args: Array[String]): Unit = {
  3. val sparkSession = SparkSession.builder().appName("sparkSQL2WordCount").master("local[2]").getOrCreate()
  4. val words: RDD[String] = sparkSession.sparkContext.textFile("h:/wordCount/wordCount.txt")
  5. val rdd2 = words.flatMap(_.split(" ")).map(word => {
  6. Row(word)
  7. })
  8. val schema: StructType = StructType(List(
  9. new StructType("value", StringType)
  10. ))
  11. val df = sparkSession.createDataFrame(radd2, schema)
  12. //导入隐式转换
  13. import sparkSession.implicits._
  14. val df2 = df.select($"value").groupBy($"value").count().sort($"count" desc)
  15. df2.show
  16. sparkSession.stop()
  17. }
  18. }

1. Spark SQL 连接查询

1.1. 等值 join:商品的连接查询

现在有一个文件存放的是商品信息,另外有一个文件中存放的是商品类别信息,现在需要读取两个文件的信息,然后关联显示其中的所有的数据

  • 分别加载两个文件
  • 分别生成两个临时表
  • 进行join ```scala object JoinDemo01 { def main(args: Array[String]): Unit = { val sqlSession: SparkSession = SparkSession.builder()

    1. .appName("JoinDemo01")
    2. .master("local[2]")
    3. .getOrCreate()

    // 加载商品类别信息 val ds: Dataset[String] = sqlSession.read.textFile(“D:/sparkdata/good/class.txt”) //导入隐式转换,ds 可以调用 RDD 的算子方法 import sqlSession.implicits._ val ds2: Dataset[(String, String)] = ds.map(line => {

    1. val fields: Array[String] = line.split(",")
    2. val classSn = fields(0)
    3. val className = fields(1)
    4. (classSn, className)

    }) //把 Dataset 类型的数据转换为 DataFrame 的类型 val classDF: DataFrame = ds2.toDF(“classSn”,”className”)

    //加载商品信息表 val dsItem: Dataset[String] = sqlSession.read.textFile(“D:/sparkdata/good/item.txt”) val ds3: Dataset[(String, String, String, Int)] = dsItem.map(line => {

    1. val fields: Array[String] = line.split(",")
    2. val itemSn = fields(0)
    3. val itemName = fields(1)
    4. val classId = fields(2)
    5. val price = fields(3).toInt
    6. (itemSn, itemName, classId, price)

    }) val itemDF: DataFrame = ds3.toDF(“itemSn”,”itemName”,”classId”,”price”)

    //方式一: 通过使用 sql 的方式 classDF.createTempView(“v_class”) itemDF.createTempView(“v_item”) val resutl: DataFrame = sqlSession.sql(“ select * from v_class join v_item on classId = classSn”) resutl.show()

    //方式二: 通过 dataFrame 的 API import org.apache.spark.sql.functions._ val result: DataFrame = itemDF.join(classDF,$”classId” === $”classSn”,”left_outer”) result.show() sqlSession.stop() } }

  1. 注意事项:<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/1774803/1634786983486-6a260aa5-7994-4ea7-8a7f-eee5e2c85d87.png#clientId=u49156a06-2f84-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=102&id=ue082d1c6&margin=%5Bobject%20Object%5D&name=image.png&originHeight=285&originWidth=1711&originalType=binary&ratio=1&rotation=0&showTitle=false&size=187970&status=done&style=none&taskId=u1d833f61-718b-4bbb-b4a9-20feb815670&title=&width=611.5)<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/1774803/1634787084752-8e269eac-eadc-4bb1-8b9d-a70413d5509c.png#clientId=u49156a06-2f84-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=106&id=u7789c100&margin=%5Bobject%20Object%5D&name=image.png&originHeight=211&originWidth=1773&originalType=binary&ratio=1&rotation=0&showTitle=false&size=111570&status=done&style=none&taskId=ua616be98-abe0-4b1e-9f18-bb1192f8666&title=&width=886.5)<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/1774803/1634787073917-fc1fe5b3-d030-4644-97a4-2466a627bf75.png#clientId=u49156a06-2f84-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=262&id=ub941bfde&margin=%5Bobject%20Object%5D&name=image.png&originHeight=523&originWidth=1747&originalType=binary&ratio=1&rotation=0&showTitle=false&size=317611&status=done&style=none&taskId=ue257c279-aec1-4955-ad0b-bc18cbfda03&title=&width=873.5)
  2. <a name="giqAb"></a>
  3. ## 1.2. 不等值 join:利用 SQL 计算 IP 归属地
  4. <a name="KM2ek"></a>
  5. ### 1.2.1.使用 join 连接方式
  6. - join 的连接条件 on 中,可以使用不等值条件 `**ip>=startNum and ip <= endNum**`
  7. ```scala
  8. //思路
  9. // 把 ip 规则和日志信息加载到我们的表中, 通过 sql 进行转换
  10. object IPLocation01 {
  11. def main(args: Array[String]): Unit = {
  12. val sqlSession: SparkSession = SparkSession.builder()
  13. .master("local[2]")
  14. .appName("IPLocation01")
  15. .getOrCreate()
  16. import sqlSession.implicits._
  17. //加载 IP 规则
  18. val ipDS: Dataset[String] = sqlSession.read.textFile("d:/ip/ip.txt")
  19. val ipDS2: Dataset[(Long, Long, String)] = ipDS.map(line => {
  20. val fields: Array[String] = line.split("[|]")
  21. val startNum = fields(2).toLong
  22. val endNum = fields(3).toLong
  23. val province = fields(6)
  24. (startNum, endNum, province)
  25. })
  26. val ipDF: DataFrame = ipDS2.toDF("startNum","endNum","province")
  27. //ipDF.show()
  28. //加载读取日志文件
  29. val logDS: Dataset[String] = sqlSession.read.textFile("d:/ip/access.log")
  30. val logDS2: Dataset[Long] = logDS.map(line => {
  31. val fields: Array[String] = line.split("[|]")
  32. val ipAddr =MyUtils.ip2Long(fields(1))
  33. ipAddr
  34. })
  35. val logDF: DataFrame = logDS2.toDF("ip")
  36. //方式一: sql 方式
  37. ipDF.createTempView("v_rule")
  38. logDF.createTempView("v_log")
  39. val result: DataFrame = sqlSession.sql("select province, count(1) ct from v_log join v_rule on (ip>=startNum and ip <= endNum) group by province order by ct desc")
  40. result.show()
  41. //方式二:dsl:使用 df 的 api 方式
  42. import org.apache.spark.sql.functions._
  43. val result2: DataFrame = ipDF.join(logDF,$"ip" >= $"startNum" and $"ip" <= $"endNum").select($"province")
  44. .groupBy($"province").count().orderBy($"count" desc)
  45. result2.show()
  46. sqlSession.stop()
  47. }
  48. }

缺点

  • 这种方式效率比较低,因为每次都对于我们的日志解析都需要去连接 ip 规则进行查询连接

如果需要提高我们的运行效率,可以使用我们的 ip 广播的方式

1.2.2.使用 IP 规则广播 + UDF,避免 join 操作

  • 先查出 ip 规则,collect 到 Driver 端,然后广播到各个 Executor
  • 查询出 ip
  • 定义一个 udf ,从广播中拿到 IP 规则,匹配返回省份 ```scala //思路 // 把 ip 规则和日志信息加载到我们的表中, 通过 sql 进行转换 object IPLocation02 { def main(args: Array[String]): Unit = { val sqlSession: SparkSession = SparkSession.builder()

    1. .master("local[2]")
    2. .appName("IPLocation02")
    3. .getOrCreate()

    import sqlSession.implicits._

    //加载 IP 规则 val lines: RDD[String] = sqlSession.sparkContext.textFile(“d:/ip/ip.txt”) val ipDS2: RDD[(Long, Long, String)] = lines.map(line => {

    1. val fields: Array[String] = line.split("[|]")
    2. val startNum = fields(2).toLong
    3. val endNum = fields(3).toLong
    4. val province = fields(6)
    5. (startNum, endNum, province)

    }) val ipRules: Array[(Long, Long, String)] = ipDS2.collect() //广播到 executor val ipRuleRef: Broadcast[Array[(Long, Long, String)]] = sqlSession.sparkContext.broadcast(ipRules) //ipDF.show()

    //加载读取日志文件 val logDS: Dataset[String] = sqlSession.read.textFile(“d:/ip/access.log”) val logDS2: Dataset[Long] = logDS.map(line => {

    1. val fields: Array[String] = line.split("[|]")
    2. val ipAddr = MyUtils.ip2Long(fields(1))
    3. ipAddr

    }) val logDF: DataFrame = logDS2.toDF(“ip”)

    //定义一个 udf 自定义转换函数 def ip2province(ip:Long)={

    1. val value: Array[(Long, Long, String)] = ipRuleRef.value
    2. var province="未知"
    3. val index: Int = MyUtils.binarySearch(value,ip)
    4. if(index != -1){
    5. province=value(index)._3
    6. }
    7. province

    }

    //注册 udf 函数 sqlSession.udf.register(“ip2province”,ip2province _)

    //方式一: sql 方式 logDF.createTempView(“v_log”) val result: DataFrame = sqlSession.sql(“select province,count(1) ct from (select ip2province(ip) province from v_log) tmp group by province order by ct desc “) result.show()

    //方式二: dataframe 的方式 logDF.select(callUDF(“ip2province”,$”ip”) as “province”).groupBy($”province”).count().orderBy($”count” desc).show() sqlSession.stop() } }

  1. <a name="Kms1C"></a>
  2. # 2. UDF 自定义函数
  3. 对于用户自定义函数,在 Hive 中分为三种
  4. - **UDF (User Defined Funciton)**
  5. - 输入一行,返回一个结果,**一对一返回**
  6. - 方式
  7. 1. 定义一个普通函数
  8. 1. 注册
  9. ```scala
  10. //定义一个 udf 自定义转换函数
  11. def ip2province(ip:Long)={
  12. val value: Array[(Long, Long, String)] = ipRuleRef.value
  13. var province="未知"
  14. val index: Int = MyUtils.binarySearch(value,ip)
  15. if(index != -1){
  16. province=value(index)._3
  17. }
  18. province
  19. }
  20. //注册 udf 函数
  21. sqlSession.udf.register("ip2province",ip2province _)
  • UDTF
    • 输入一行,返回多行数据
    • Spark 中没有 UDTF 函数
    • Spark 自带的 flatMap 即可实现该功能
  • UDAF(User Defined Aggregate Function)
    • 输入多行,返回一行数据,为聚合函数
    • 主要包括 aggregate(聚合)、count、sum
    • 复杂的业务,我们需要指定 UDAF 函数
    • 方式
      • 定一个类,继承 UserDefinedAggregateFunction
      • 覆写方法
    • 使用
      • 创建该类对象
      • 和使用 UDF 一样使用

定义一个函数,用于计算几何平均数

  1. package day07.cn.wolfcode.spark.join
  2. import java.lang.Long
  3. import org.apache.spark.sql.{Dataset, Row, SparkSession}
  4. import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
  5. import org.apache.spark.sql.types._
  6. object UdafDemo {
  7. def main(args: Array[String]): Unit = {
  8. val sqlSession = SparkSession
  9. .builder()
  10. .appName("JoinTest")
  11. .master("local[*]")
  12. .getOrCreate()
  13. //创建一个 udaf 的对象实例
  14. val geomean = new GeoMean
  15. //生成一个 1 到 10 的序列 Dataset
  16. val range: Dataset[Long] = sqlSession.range(1,11)
  17. //方式一: sql 方式
  18. //注册函数
  19. sqlSession.udf.register("geomean",geomean)
  20. //将 range 这个 Dataset[Long]注册成视图
  21. range.createTempView("v_range")
  22. val result = sqlSession.sql("SELECT geomean(id) result FROM v_range")
  23. result.show()
  24. //方式二: dataFrame 的 API
  25. import org.apache.spark.sql.functions._
  26. import sqlSession.implicits._
  27. range.select(callUDF("geomean",$"id")).show()
  28. range.agg(geomean($"id").as("gm"))
  29. sqlSession.stop()
  30. }
  31. }
  32. class GeoMean extends UserDefinedAggregateFunction{
  33. //输入数据的类型
  34. override def inputSchema: StructType = StructType(List(new StructField("value",DoubleType)))
  35. //产生中间结果的数据类型
  36. override def bufferSchema: StructType = StructType(List(
  37. //相乘之后返回的积
  38. new StructField("product",DoubleType),
  39. //参与运算的数字个数
  40. new StructField("count",LongType)
  41. ))
  42. //最终发返回的结果类型
  43. override def dataType: DataType = DoubleType
  44. //确保数据一致性
  45. override def deterministic: Boolean = true
  46. //指定初始值
  47. override def initialize(buffer: MutableAggregationBuffer): Unit = {
  48. //相乘的初始值
  49. buffer(0)=1.0
  50. //参与运算的数字的个数的初始值
  51. buffer(1)=0L
  52. }
  53. //每有一条数据参与运算就更新一下中间结果(update 相当于在每一个分区中的运算)
  54. override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
  55. //每有一个数字参与运算就进行相乘(包含中间结果)
  56. buffer(0) = buffer.getDouble(0) * input.getDouble(0)
  57. //参与运算数据的个数也有更新
  58. buffer(1) = buffer.getLong(1) + 1L
  59. }
  60. //全局聚合
  61. override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
  62. //每个分区计算的结果进行相乘
  63. buffer1(0) = buffer1.getDouble(0) * buffer2.getDouble(0)
  64. //每个分区参与预算的中间结果进行相加
  65. buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  66. }
  67. //计算最终的结果
  68. override def evaluate(buffer: Row): Any = {
  69. math.pow(buffer.getDouble(0), 1.toDouble / buffer.getLong(1))
  70. }
  71. }

3. Dataset 和 DataFrame

Dateset 是 spark1.6 以后推出的新的 API,也是一个分布式数据集,于 RDD 相比,保存了跟多的描述信息,概念上等同于关系型数据库中的二维表,基于保存了跟多的描述信息,spark 在运行时可以被优化。
Dateset 里面对应的的数据是强类型的,并且可以使用功能更加丰富的 lambda 表达式,弥补了函数式编程的一些缺点,使用起来更方便
在 scala 中,DataFrame 其实就是 Dateset[Row]

3.1. Dataset 的特点

  • 一系列分区
  • 每个切片上会有对应的函数
  • 依赖关系
  • kv 类型 shuffle 也会有分区器
  • 如果读取 hdfs 中的数据会感知最优位置
  • 会优化执行计划,在执行真正的 task 任务之前会把程序调整到最优的状态
  • 支持更加智能的数据源(jdbc,cvs,json,parquet)

    3.2. Dataset 和 DateFrame 之间的转换

    在我们通过读取到一个 Dataset 的数据类型的时候,我们可以通过方法 toDF("name", "age") 转换为一个 DataFrame 对象
    对于 DataFrame 对象,我们具有比较好的一个数据格式和 Scheme 信息
    对于 Dataset来说,一般就是使用一个列名 (value,id 等)来表示,如果 Dataset 存放的是一个元组或者集合,那么有可能通过 _1,_2 这样的格式来表示列名
    image.png
    image.png

4. SparkSQL 的数据处理

4.1. SparkSQL 的多种格式的输出

我们可以使用 SparkSQL 的 DataFrameWriter 对象完成对 JDBC,cvs,json,text,parquet 等多种格式的持久化操作
jdbc 持久化

  • 第一种 ```scala // 追加写入 val props = new Properties() props.put(“user”,”root”) props.put(“password”,”admin”) itemDF.write.mode(“append”).jdbc(“jdbc:mysql://localhost:3306/bigdata”, “item”, props)
  1. - 第二种
  2. ```scala
  3. // 覆盖写入
  4. df.write.mode("overwrite").format("jdbc").options(Map(
  5. "url"->"jdbc:mysql://lab202:3306/bigdata?characterEncoding=UTF-8",
  6. "dbtable"->"item",
  7. "user"->"root",
  8. "password"->"admin"
  9. )).save()

json 格式

  • 会保留 schema 信息

    1. itemDF.write.json("d:/sparkdata/json")

    cvs 格式

  • 不会保留 schema 信息

    1. itemDF.write.csv("d:/sparkdata/csv")

    parquet 格式

    1. reslut.write.parquet("d:/sparkdata/parquet")

    4.2. SparkSQK 的多种格式的输入

    jdbc 格式

    1. val item: DataFrame = spark.read.format("jdbc").options(
    2. Map("url" -> "jdbc:mysql://localhost:3306/bigdata",
    3. "driver" -> "com.mysql.jdbc.Driver",
    4. "dbtable" -> "logs",
    5. "user" -> "root",
    6. "password" -> "admin")
    7. ).load()

    json 格式

    1. val jsons: DataFrame = spark.read.json("d:/sparkdata/json")

    parquet 格式

    1. val parquetLine: DataFrame = spark.read.parquet("d:/sparkdata/parquet")

    1. SparkSQL 补充

    1.1. 分窗函数的案例:各学科最受欢迎的老师

    ```scala object FavTeacher { def main(args: Array[String]): Unit = { val sqlSession: SparkSession = SparkSession.builder().appName(“FavTeacher”).master(“local[4]”).getOrCreate() val lines: Dataset[String] = sqlSession.read.textFile(“D:/sparkdata/topn/teacher.txt”) import sqlSession.implicits._ val subTeacherRDD: Dataset[(String, String)] = lines.map(line => {

    1. // 字符串切割, x 是读取到的一行的内容
    2. val words: Array[String] = line.split("/")
    3. // 获取到每个教师的姓名
    4. val teacher = words(words.length - 1)
    5. // 获取到学科的信息
    6. val subject = words(words.length - 2)
    7. //返回一个元组信息(教师,1)
    8. (subject, teacher)

    }) val df: DataFrame = subTeacherRDD.toDF(“sub”,”teacher”) df.createTempView(“v_sub_teacher”) val tempDf: DataFrame = sqlSession.sql(“select sub,teacher,count(1) ct from v_sub_teacher group by sub,teacher”) tempDf.createTempView(“v_sub_teacher_tmp”) sqlSession.sql(“select sub, teacher, ct, row_number() over(partition by sub order by ct desc ) sub_rk from v_sub_teacher_tmp “).show() sqlSession.sql(“select sub, teacher, ct, row_number() over(partition by sub order by ct desc ) sub_rk, row_number() over(order by ct desc) g_rk from v_sub_teacher_tmp “).show() sqlSession.stop() } }

  1. 分窗函数
  2. - over():对数据进行分窗,括号内的为分窗条件
  3. - partition by:分窗条件,如果为 partition by sub order by ct desc,表示按照学科进行分窗(分区),每个分区的数据按照 ct 字段进行降序排序
  4. - row_number():对每个分区内的数据进行编号,从1开始
  5. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/1774803/1634802343259-ccc8caf5-21ca-4a2b-ba51-f3026e1885e4.png#clientId=ucf2b6191-dde0-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=403&id=u4fb33535&margin=%5Bobject%20Object%5D&name=image.png&originHeight=1028&originWidth=1916&originalType=binary&ratio=1&rotation=0&showTitle=false&size=513354&status=done&style=none&taskId=ueaf2b287-fad1-449b-bce3-6fecb7a64e1&title=&width=752)
  6. <a name="GI7q5"></a>
  7. ## 1.2. Join 操作的细节
  8. 目前我们的 SparkSQL 支持的 Join 的方式有三种
  9. - Broadcast Hash Join(很小表+大表)
  10. - Shuffle Hash(小表+大表)
  11. - Sort-Merge Join
  12. 默认情况下,这三种 join 的方式是有 SparkSQL 自己根据表的数据大小以及当前的运行环境进行自由调配
  13. <a name="Gz702"></a>
  14. ### 1.2.1.Hash Join 的概念
  15. 先来看看这样一条 SQL 语句:`select * from order,item where item.id = order.i_id`<br />很简单一个 Join 节点,参与 join 的两张表是 item orderjoin key 分别是 item.id 以及 order.i_id。<br />现在假设这个 Join 采用的是 hash join 算法,整个过程会经历三步:
  16. 1. 确定 Build Table 以及 Probe Table:这个概念比较重要,Build Table使用 join key构建Hash Table,而 Probe Table 使用 join key 进行探测,探测成功就可以 join 在一起。通常情况下,小表会作为 Build Table,大表作为 Probe Table。此事例中 item Build Tableorder Probe Table
  17. 1. 构建 Hash Table:依次读取 Build Tableitem)的数据,对于每一行数据根据 join keyitem.id)进行 hashhash 到对应的 Bucket,生成 hash table 中的一条记录。数据缓存在内存中,如果内存放不下需要 dump 到外存。
  18. - Hash Table 可以理解为 Build Table 的索引,
  19. 3. 探测:再依次扫描 Probe Tableorder)的数据,**对 Probe Table join key 使用相同的 hash 函数映射 Hash Table 中的记录**,映射成功之后再检查 join 条件(item.id = order.i_id)(存在不同值的 hash 值相同,因此需要额外检查一下值),如果匹配成功就可以将两者 join 在一起。
  20. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/1774803/1634800775419-615bb9fa-99e3-4586-8105-0528b1a1bf0a.png#clientId=ucf2b6191-dde0-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=480&id=ue24df9ec&margin=%5Bobject%20Object%5D&name=image.png&originHeight=1216&originWidth=2115&originalType=binary&ratio=1&rotation=0&showTitle=false&size=1094410&status=done&style=shadow&taskId=u2891644c-97d7-4ed1-ae03-40ff26cc754&title=&width=834)<br />这里有两个小问题需要关注
  21. 1. hash join 性能如何?很显然,hash join 基本都只扫描两表一次,可以认为 o(a+b),较之最极端的笛卡尔集运算 a*b,不知甩了多少条街
  22. 1. 为什么 Build Table 选择小表?道理很简单,因为构建的 Hash Table 最好能全部加载在内存,效率最高;这也决定了 hash join 算法只适合至少一个小表的 join 场景,对于两个大表的 join 场景并不适用;
  23. 上文说过,**hash join 是传统数据库中的单机 join 算法**,在分布式环境下需要经过一定的**分布式改造**,说到底就是尽可能利用分布式计算资源进行并行化计算,提高总体效率。<br />hash join 分布式改造一般有两种经典方案:
  24. 1. **broadcast hash join**:将其中一张小表广播分发到另一张大表所在的分区节点上,分别并发地与其上的分区记录进行 hash joinbroadcast 适用于小表很小,可以直接广播的场景。
  25. 1. **shuffler hash join**:一旦小表数据量较大,此时就不再适合进行广播分发。这种情况下,可以根据 join key 相同必然分区相同的原理,将两张表分别按照 join key 进行重新组织分区,这样就可以将 join 分而治之,划分为很多小 join,充分利用集群资源并行化。
  26. <a name="y7OTw"></a>
  27. ### 1.2.2.Broadcast Hash Join
  28. 1. broadcast 阶段:将小表广播分发到大表所在的所有主机。广播算法可以有很多,最简单的是先发给 driverdriver 再统一分发给所有 executor;要不就是基于 bittorrete p2p 思路;
  29. 1. hash join 阶段:在每个 executor 上执行单机版 hash join,小表映射,大表试探;
  30. 默认情况下如果对于小表的大小不超过 10M,那么就会采用广播的方式, 其中默认的配置参数为 `spark.sql.autoBroadcastJoinThreshold` 默认值为 10M, 如果设置为-1,则表示禁用广播<br />总结:
  31. - Probe Table 根据分区,分到各个 Task
  32. - Build Table 整个表广播到各个 Executor
  33. - 没有涉及到 shuffle 的操作
  34. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/1774803/1634805361730-e33f04f7-9795-4bae-a176-a19e96bb9283.png#clientId=ucf2b6191-dde0-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=295&id=ubfacfcea&margin=%5Bobject%20Object%5D&name=image.png&originHeight=590&originWidth=1221&originalType=binary&ratio=1&rotation=0&showTitle=false&size=384592&status=done&style=none&taskId=u6d6896ce-6779-424c-a4b7-7c74aa74055&title=&width=610.5)
  35. <a name="ZOppE"></a>
  36. ### 1.2.3.Shuffle Hash Join
  37. 在大数据条件下如果一张表很小,执行 join 操作最优的选择无疑是 broadcast hash join,效率最高。<br />但是一旦**小表数据量增大(但依旧是小表)**,广播所需内存、带宽等资源必然就会太大,broadcast hash join 就不再是最优方案。此时可以按照 join key 进行分区,根据 key 相同必然分区相同的原理,就可以将大表 join 分而治之,划分为很多小表的 join,充分利用集群资源并行化。<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/1774803/1634806504025-1acdfb35-a293-46bf-9c36-ec18186b08cc.png#clientId=ucf2b6191-dde0-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=395&id=u761c8fd0&margin=%5Bobject%20Object%5D&name=image.png&originHeight=790&originWidth=1455&originalType=binary&ratio=1&rotation=0&showTitle=false&size=524200&status=done&style=shadow&taskId=ucc39a7d2-c789-4981-92c1-7f24f524b59&title=&width=727.5)<br />如下图所示,shuffle hash join 也可以分为两步:
  38. 1. shuffle 阶段:分别**将两个表按照 join key 进行分区**,将**相同 join key 的记录重分布到同一节点**,两张表的数据会被重分布到集群中所有节点。这个过程称为 shuffle
  39. 1. hash join 阶段:每个分区节点上的数据单独执行**单机 hash join 算法**
  40. 总结:
  41. - 两个表分别按照分区,分发到各个Task
  42. - **进行shuffle**,两个表依据 join key,重新进行分发(类似于 reduceByKeyshuffle过程,使用的是 hashPartitioner),相同 hash 值的份到一个 Executor
  43. - Executor 上执行单机 hash join 算法
  44. 配置使用 Shuffle Hash Join<br />`spark.sql.autoBroadcastJoinThreshold` 的值配置为`-1` 禁用广播
  45. - `sqlSession.config("spark.sql.autoBroadcastJoinThreshold", "-1")`
  46. `spark.sql.join.preferSortMergeJoin` 的值配置为 `false`,禁用排序的合并的方式
  47. - `sqlSession.config("spark.sql.join.preferSortMergeJoin", "false")`
  48. <a name="uVt5O"></a>
  49. ### 1.2.4.Sort-Merge Join
  50. SparkSQL 对**两张大表 join **采用了全新的算法-**sort-merge join**,如下图所示,整个过程分为三个步骤
  51. 1. shuffle 阶段:将两张大表**根据 join key 进行重新分区**,两张表数据会分布到整个集群,以便分布式并行处理(类似于 shuffle Hash join 操作)
  52. 1. **sort 阶段**:对单个分区节点的两表数据,分别进行排序,**写入本地磁盘**
  53. 1. **merge 阶段**:对排好序的两张分区表数据执行 join 操作。join 操作很简单,**分别遍历两个有序序列,碰到相同 join key merge 输出,否则取更小一边**
  54. 1. 排序后的数据就比较方便了
  55. 配置使用 Sort-Merge Join<br />`spark.sql.autoBroadcastJoinThreshold` 的值配置为 `-1` 禁用广播
  56. - `sqlSession.config("spark.sql.autoBroadcastJoinThreshold", "1")`
  57. <a name="HeINu"></a>
  58. # 2. SparkSQL 整合 Hive
  59. 启动 Hive:
  60. - bin/spark-sql
  61. <a name="fioCk"></a>
  62. ## 2.1. 基本操作
  63. <a name="qQmys"></a>
  64. ### 2.1.1.创建表
  65. 内部表:
  66. ```sql
  67. create table t2(id int,name string,age int,gender string)
  68. row format delimited fields terminated by ",";

外部表

  1. create external table t3(id int,name string,age int,gender string)
  2. row format delimited fields terminated by ","
  3. location '/aa/bb/';

2.1.2.导入数据到表中

  1. load data local inpath '/root/data/user.data.2' into table t1;

2.2. Spark 整合 Hive

2.2.1. 安装 MySQL 数据库,并且配置远程登录用户

  1. GRANT ALL PRIVILEGES ON *.* TO 'root'@'%'IDENTIFIED BY 'admin' WITH GRANT OPTION;
  2. FLUSH PRIVILEGES;

2.2.2. 添加 Hive 的配置文件 hive-site.xml

/root/apps/spark-2.1.1/conf 目录下面

  1. <configuration>
  2. <property>
  3. <name>javax.jdo.option.ConnectionURL</name>
  4. <value>jdbc:mysql://lab153:3306/hive?createDatabaseIfNotExist=true</value>
  5. </property>
  6. <property>
  7. <name>javax.jdo.option.ConnectionDriverName</name>
  8. <value>com.mysql.jdbc.Driver</value>
  9. </property>
  10. <property>
  11. <name>javax.jdo.option.ConnectionUserName</name>
  12. <value>root</value>
  13. </property>
  14. <property>
  15. <name>javax.jdo.option.ConnectionPassword</name>
  16. <value>admin</value>
  17. </property>
  18. </configuration>

2.2.3.配置 HADOOP_HOME 环境变量

  1. vi /etc/profile.d/hadoop.sh
  2. export HADOOP_HOME=/root/apps/hadoop-2.7.3
  3. export HADOOP_CONF_DIR=/root/apps/hadoop-2.7.3/etc/hadoop
  4. export PATH=$PATH:$HADOOP_HOME/bin
  5. source /etc/profile

2.2.4.上传 mysql 的驱动包启动程序

启动命令

  1. bin/spark-sql --master spark://lab150:7077
  2. --driver-class-path /root/mysql-connector-java-5.1.39

启动以后需要手动修改元数据中的一个表 DBS 中的 DB_LOCATION_URL 为

  1. hdfs://lab150:9000/user/hive/spark-warehouse

2.3. 通过文件的方式使用 hive

  1. bin/spark-sql -e show databases;”
  2. bin/spark-sql -f “/root/hive.sql

2.4. Spark 程序整合 Hive

2.4.1.在项目中导入对应的 pom 依赖

  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-hive_2.11</artifactId>
  4. <version>${spark.version}</version>
  5. </dependency>

2.4.2.在项目中添加对应的配置文件

把 core-site.xml hdfs-site.xml hive-site.xml 这三个配置文件拷贝到项目的 resource 目录,配置好元数据的目录信息

2.4.3. 启用对 hive 的支持

在程序中调用 sqlSession 的 enableHiveSupport()启用对 hive 的支持

  1. object HiveDemo {
  2. def main(args: Array[String]): Unit = {
  3. val sqlSession: SparkSession = SparkSession.builder()
  4. .appName("JoinDemo01")
  5. .master("local[2]")
  6. .enableHiveSupport()
  7. .getOrCreate()
  8. sqlSession.sql("select * from t1 ").show()
  9. sqlSession.stop()
  10. }
  11. }