• 官方文档:http://spark.apache.org/docs/latest/sql-getting-started.html

    2.1 SparkSession

  • 在 Spark 2.0 之前:

    • SQLContext 是创建 DataFrame 和执行 SQL 的入口
    • HiveContext通过Hive sql语句操作Hive数据,兼Hhive操作,HiveContext继承自SQLContext
    • image.png
  • 在 Spark 2.0 之后:
    • 将这些入口点统一到了SparkSession,SparkSession 封装了 SqlContext 及 HiveContext;
    • 实现了 SQLContext 及 HiveContext 所有功能;
    • 通过SparkSession可以获取到SparkConetxt;
    • image.png ```scala import org.apache.spark.sql.SparkSession

val spark = SparkSession .builder() .appName(“Spark SQL basic example”) .config(“spark.some.config.option”, “some-value”) .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames import spark.implicits._

  1. <a name="znC9W"></a>
  2. # 2.2 DataFrame & Dataset 的创建
  3. - 刻意区分:DF、DS。DF是一种特殊的DS;ds.transformation => df
  4. <a name="Dszcp"></a>
  5. ## 1 由range生成Dataset
  6. ```scala
  7. val numDS = spark.range(5, 100, 5)
  8. // orderBy 转换操作;desc:function;show:Action
  9. numDS.orderBy(desc("id")).show(5)
  10. // 统计信息
  11. numDS.describe().show
  12. // 显示schema信息
  13. numDS.printSchema
  14. // 使用RDD执行同样的操作
  15. numDS.rdd.map(_.toInt).stats
  16. // 检查分区数
  17. numDS.rdd.getNumPartitions

2 由集合生成Dataset

  • Dataset = RDD[case class] ```scala case class Person(name:String, age:Int, height:Int)

// 注意 Seq 中元素的类型 val seq1 = Seq(Person(“Jack”, 28, 184), Person(“Tom”, 10, 144), Person(“Andy”, 16, 165)) val ds1 = spark.createDataset(seq1) // 显示schema信息 ds1.printSchema ds1.show

val seq2 = Seq((“Jack”, 28, 184), (“Tom”, 10, 144), (“Andy”, 16, 165)) val ds2 = spark.createDataset(seq2) ds2.show

<a name="aIe7T"></a>
## 3 由集合生成DataFrame

- DataFrame = RDD[Row] + Schema
```scala
val lst = List(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
val df1 = spark.createDataFrame(lst).
// 改单个字段名时简便
withColumnRenamed("_1", "name1").
withColumnRenamed("_2", "age1").
withColumnRenamed("_3", "height1")

df1.orderBy("age1").show(10)

// desc是函数,在IDEA中使用是需要导包
import org.apache.spark.sql.functions._
df1.orderBy(desc("age1")).show(10)

// 修改整个DF的列名
val df2 = spark.createDataFrame(lst).toDF("name", "age", "height")

4 RDD 转成 DataFrame

  • DataFrame = RDD[Row] + Schema ```scala import org.apache.spark.sql.Row import org.apache.spark.sql.types._

val arr = Array((“Jack”, 28, 184), (“Tom”, 10, 144), (“Andy”, 16, 165)) val rdd1 = sc.makeRDD(arr).map(f=>Row(f._1, f._2, f._3)) val schema = StructType( StructField(“name”, StringType, false) :: StructField(“age”, IntegerType, false) :: StructField(“height”, IntegerType, false) :: Nil)

val schema1 = (new StructType). add(“name”, “string”, false). add(“age”, “int”, false). add(“height”, “int”, false)

// RDD => DataFrame,要指明schema val rddToDF = spark.createDataFrame(rdd1, schema) rddToDF.orderBy(desc(“name”)).show(false)


```scala
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

val arr1 = Array(("Jack", 28, null), ("Tom", 10, 144), ("Andy", 16, 165))
val rdd1 = sc.makeRDD(arr1).map(f=>Row(f._1, f._2, f._3))
val structType = StructType(
  StructField("name", StringType, false) ::
  StructField("age", IntegerType, false) ::
  StructField("height", IntegerType, false) :: Nil)

// false 说明字段不能为空
val schema1 = structType
val df1 = spark.createDataFrame(rdd1, schema1)

// 下一句执行报错(因为有空字段)
df1.show

// true 允许该字段为空,语句可以正常执行
val schema2 = StructType( 
  StructField("name", StringType, false) ::
  StructField("age", IntegerType, false) ::
  StructField("height", IntegerType, true) :: Nil)

val df2 = spark.createDataFrame(rdd1, schema2)

df2.show

// IDEA中需要,spark-shell中不需要
import spark.implicits._
val arr2 = Array(("Jack", 28, 150), ("Tom", 10, 144), ("Andy", 16, 165))
val rddToDF = sc.makeRDD(arr2).toDF("name", "age", "height")
case class Person(name:String, age:Int, height:Int)

val arr2 = Array(("Jack", 28, 150), ("Tom", 10, 144), ("Andy", 16, 165))
val rdd2: RDD[Person] = spark.sparkContext.makeRDD(arr2).map(f=>Person(f._1, f._2, f._3))
val ds2 = rdd2.toDS() // 反射推断,spark 通过反射从case class的定义得到类名
val df2 = rdd2.toDF() // 反射推断
ds2.printSchema
df2.printSchema
ds2.orderBy(desc("name")).show(10)
df2.orderBy(desc("name")).show(10)

5 RDD转Dataset

  • Dataset = RDD[case class]
  • DataFrame = RDD[Row] + Schema
    val ds3 = spark.createDataset(rdd2)
    ds3.show(10)
    

    6 从文件创建DateFrame(以csv文件为例)

    ```scala val df1 = spark.read.csv(“data/people1.csv”) df1.printSchema() df1.show()

val df2 = spark.read.csv(“data/people2.csv”) df2.printSchema() df2.show()

// 指定参数 // spark 2.3.0 val schema = “name string, age int, job string” val df3 = spark.read .options(Map((“delimiter”, “;”), (“header”, “true”))) .schema(schema) .csv(“data/people2.csv”) df3.printSchema() df3.show

// 自动类型推断 val df4 = spark.read .option(“delimiter”, “;”) .option(“header”, “true”) .option(“inferschema”, “true”) .csv(“data/people2.csv”) df4.printSchema() df4.show

<a name="geX8p"></a>
# 2.3 三者的转换
![image.png](https://cdn.nlark.com/yuque/0/2021/png/3013578/1611840252507-3f819556-8c73-4b68-9132-552813153697.png#align=left&display=inline&height=296&margin=%5Bobject%20Object%5D&name=image.png&originHeight=493&originWidth=1043&size=52303&status=done&style=none&width=627)

- SparkSQL提供了一个领域特定语言(DSL)以方便操作结构化数据。核心思想还是SQL;仅仅是一个语法的问题。
<a name="QI5tZ"></a>
# 2.4 Action操作

- 与RDD类似的操作
   - `show`、`collect`、`collectAsList`、`head`、`first`、`count`、`take`、`takeAsList`、`reduce`
- 与结构相关
   - `printSchema`、`explain`、`columns`、`dtypes`、`col`
```scala
EMPNO,ENAME,JOB,MGR,HIREDATE,SAL,COMM,DEPTNO
7369,SMITH,CLERK,7902,2001-01-02 22:12:13,800,,20
7499,ALLEN,SALESMAN,7698,2002-01-02 22:12:13,1600,300,30
7521,WARD,SALESMAN,7698,2003-01-02 22:12:13,1250,500,30
7566,JONES,MANAGER,7839,2004-01-02 22:12:13,2975,,20
7654,MARTIN,SALESMAN,7698,2005-01-02 22:12:13,1250,1400,30
7698,BLAKE,MANAGER,7839,2005-04-02 22:12:13,2850,,30
7782,CLARK,MANAGER,7839,2006-03-02 22:12:13,2450,,10
7788,SCOTT,ANALYST,7566,2007-03-02 22:12:13,3000,,20
7839,KING,PRESIDENT,,2006-03-02 22:12:13,5000,,10
7844,TURNER,SALESMAN,7698,2009-07-02 22:12:13,1500,0,30
7876,ADAMS,CLERK,7788,2010-05-02 22:12:13,1100,,20
7900,JAMES,CLERK,7698,2011-06-02 22:12:13,950,,30
7902,FORD,ANALYST,7566,2011-07-02 22:12:13,3000,,20
7934,MILLER,CLERK,7782,2012-11-02 22:12:13,1300,,10
// 处理了文件头,得到了字段名称
// 使用自动类型推断,基本得到了合理的值
val df1 = spark.read.
option("header", "true").
option("inferschema","true").
csv("data/emp.dat")
df1.count

// 缺省显示 20 行
df1.union(df1).show()

// 显示 2 行
df1.show(2)

// 不截断字符
df1.toJSON.show(false)

// 显示 10 行,不截断字符
df1.toJSON.show(10, false)
spark.catalog.listFunctions.show(10000, false)

// collect返回的是数组, Array[org.apache.spark.sql.Row]
val c1 = df1.collect()

// collectAsList返回的是List, List[org.apache.spark.sql.Row]
val c2 = df1.collectAsList()

// 返回 org.apache.spark.sql.Row
val h1 = df1.head()
val f1 = df1.first()

// 返回 Array[org.apache.spark.sql.Row],长度为 3
val h2 = df1.head(3)
val f2 = df1.take(3)

// 返回 List[org.apache.spark.sql.Row],长度为 2
val t2 = df1.takeAsList(2)
// 结构属性
df1.columns // 查看列名
df1.dtypes // 查看列名和类型
df1.explain() // 参看执行计划
df1.col("name") // 获取某个列
df1.printSchema // 常用

2.5 Transformation 操作

  • 与RDD类似的操作
    • mapfilterflatMapmapPartitionssamplerandomSplitlimitdistinctdropDuplicatesdescribe ```scala df1.map(row=>row.getAsInt).show

// randomSplit(与RDD类似,将DF、DS按给定参数分成多份) val df2 = df1.randomSplit(Array(0.5, 0.6, 0.7)) df2(0).count df2(1).count df2(2).count

// 取 10 行数据生成新的DataSet val df2 = df1.limit(10)

// distinct,去重 val df2 = df1.union(df1) df2.distinct.count

// dropDuplicates,按列值去重 df2.dropDuplicates.show df2.dropDuplicates(“mgr”, “deptno”).show df2.dropDuplicates(“mgr”).show df2.dropDuplicates(“deptno”).show

// 返回全部列的统计(count、mean、stddev、min、max) ds1.describe().show

// 返回指定列的统计 ds1.describe(“sal”).show ds1.describe(“sal”, “comm”).show


- 存储相关
   - `cacheTable`、`persist`、`checkpoint`、`unpersist`、`cache`
   - 备注:Dataset 默认的存储级别是 **MEMORY_AND_DISK**
```scala
import org.apache.spark.storage.StorageLevel
spark.sparkContext.setCheckpointDir("hdfs://linux121:9000/checkpoint")

df1.show()
df1.checkpoint()
df1.cache()
df1.persist(StorageLevel.MEMORY_ONLY)
df1.count()
df1.unpersist(true)

df1.createOrReplaceTempView("t1")
spark.catalog.cacheTable("t1")
spark.catalog.uncacheTable("t1")

image.png

  • select相关
    • 列的多种表示、select、selectExpr
    • drop、withColumn、withColumnRenamed、cast(内置函数) ```scala // 列的多种表示方法。使用””、$””、’、col()、ds(“”) // 注意:不要混用;必要时使用spark.implicitis._;并非每个表示在所有的地方都有效 df1.select($”ename”, $”hiredate”, $”sal”).show df1.select(“ename”, “hiredate”, “sal”).show df1.select(‘ename, ‘hiredate, ‘sal).show df1.select(col(“ename”), col(“hiredate”), col(“sal”)).show df1.select(df1(“ename”), df1(“hiredate”), df1(“sal”)).show

// 下面的写法无效,其他列的表示法有效 df1.select(“ename”, “hiredate”, “sal”+100).show df1.select(“ename”, “hiredate”, “sal+100”).show // 这样写才符合语法 df1.select($”ename”, $”hiredate”, $”sal”+100).show df1.select(‘ename, ‘hiredate, ‘sal+100).show

// 可使用expr表达式(expr里面只能使用引号) df1.select(expr(“comm+100”), expr(“sal+100”), expr(“ename”)).show df1.selectExpr(“ename as name”).show df1.selectExpr(“power(sal, 2)”, “sal”).show df1.selectExpr(“round(sal, -3) as newsal”, “sal”, “ename”).show

// drop、withColumn、 withColumnRenamed、casting // drop 删除一个或多个列,得到新的DF df1.drop(“mgr”) df1.drop(“empno”, “mgr”)

// withColumn,修改列值 val df2 = df1.withColumn(“sal”, $”sal”+1000) df2.show

// withColumnRenamed,更改列名 df1.withColumnRenamed(“sal”, “newsal”)

// 备注:drop、withColumn、withColumnRenamed返回的是DF

// cast,类型转换 df1.selectExpr(“cast(empno as string)”).printSchema

import org.apache.spark.sql.types._ df1.select(‘empno.cast(StringType)).printSchema


- where相关
   - where == filter
```scala
//filter操作
df1.filter("sal>1000").show 
df1.filter("sal>1000 and job=='MANAGER'").show 

//where操作
df1.where("sal>1000").show 
df1.where("sal>1000 and job=='MANAGER'").show
  • groupBy相关
    • groupByaggmax、min、avg、sum、count(后面 5 个为内置函数) ```scala // groupBy、max、min、mean、sum、count(与df1.count不同) df1.groupBy(“Job”).sum(“sal”).show df1.groupBy(“Job”).max(“sal”).show df1.groupBy(“Job”).min(“sal”).show df1.groupBy(“Job”).avg(“sal”).show df1.groupBy(“Job”).count.show

// 类似having子句 df1.groupBy(“Job”).avg(“sal”).where(“avg(sal) > 2000”).show df1.groupBy(“Job”).avg(“sal”).where($”avg(sal)” > 2000).show

// agg df1.groupBy(“Job”).agg(“sal”->”max”, “sal”->”min”, “sal”->”avg”, “sal”->”sum”, “sal”- >”count”).show df1.groupBy(“deptno”).agg(“sal”->”max”, “sal”->”min”, “sal”->”avg”, “sal”->”sum”, “sal”->”count”).show

// 这种方式更好理解 df1.groupBy(“Job”).agg(max(“sal”), min(“sal”), avg(“sal”), sum(“sal”), count(“sal”)).show

// 给列取别名 df1.groupBy(“Job”).agg(max(“sal”), min(“sal”), avg(“sal”), sum(“sal”), count(“sal”)).withColumnRenamed(“min(sal)”, “min1”).show

// 给列取别名,最简便 df1.groupBy(“Job”).agg(max(“sal”).as(“max1”), min(“sal”).as(“min2”), avg(“sal”).as(“avg3”), sum(“sal”).as(“sum4”), count(“sal”).as(“count5”)).show


- orderBy相关
   - orderBy == sort
```scala
// orderBy 
df1.orderBy("sal").show 
df1.orderBy($"sal").show 
df1.orderBy($"sal".asc).show 

// 降序
df1.orderBy(-$"sal").show 
df1.orderBy('sal).show
df1.orderBy(col("sal")).show 
df1.orderBy(df1("sal")).show 
df1.orderBy($"sal".desc).show 
df1.orderBy(-'sal).show 
df1.orderBy(-'deptno, -'sal).show 

// sort,以下语句等价 
df1.sort("sal").show 
df1.sort($"sal").show 
df1.sort($"sal".asc).show 
df1.sort('sal).show 
df1.sort(col("sal")).show 
df1.sort(df1("sal")).show 
df1.sort($"sal".desc).show 
df1.sort(-'sal).show 
df1.sort(-'deptno, -'sal).show
  • join相关 ```scala // 1、笛卡尔积 df1.crossJoin(df1).count

// 2、等值连接(单字段)(连接字段empno,仅显示了一次) df1.join(df1, “empno”).count

// 3、等值连接(多字段)(连接字段empno、ename,仅显示了一次) df1.join(df1, Seq(“empno”, “ename”)).show

// 定义第一个数据集 case class StudentAge(sno: Int, name: String, age: Int) val lst = List(StudentAge(1,”Alice”, 18), StudentAge(2,”Andy”, 19), StudentAge(3,”Bob”, 17), StudentAge(4,”Justin”, 21), StudentAge(5,”Cindy”, 20))

val ds1 = spark.createDataset(lst) ds1.show()

// 定义第二个数据集 case class StudentHeight(sname: String, height: Int) val rdd = sc.makeRDD(List(StudentHeight(“Alice”, 160), StudentHeight(“Andy”, 159), StudentHeight(“Bob”, 170), StudentHeight(“Cindy”, 165), StudentHeight(“Rose”, 160))) val ds2 = rdd.toDS

// 备注:不能使用双引号,而且这里是 === ds1.join(ds2, $”name”===$”sname”).show ds1.join(ds2, ‘name===’sname).show ds1.join(ds2, ds1(“name”)===ds2(“sname”)).show ds1.join(ds2, ds1(“sname”)===ds2(“sname”), “inner”).show

// 多种连接方式 ds1.join(ds2, $”name”===$”sname”).show ds1.join(ds2, $”name”===$”sname”, “inner”).show ds1.join(ds2, $”name”===$”sname”, “left”).show ds1.join(ds2, $”name”===$”sname”, “left_outer”).show ds1.join(ds2, $”name”===$”sname”, “right”).show ds1.join(ds2, $”name”===$”sname”, “right_outer”).show ds1.join(ds2, $”name”===$”sname”, “outer”).show ds1.join(ds2, $”name”===$”sname”, “full”).show ds1.join(ds2, $”name”===$”sname”, “full_outer”).show

//备注:DS在join操作之后变成了DF


- 集合相关
   - `union==unionAll(过期)、intersect、except`
```scala
// union、unionAll、intersect、except。集合的交、并、差 
val ds3 = ds1.select("name") 
val ds4 = ds2.select("sname") 

// union 求并集,不去重 
ds3.union(ds4).show 

// unionAll、union 等价;unionAll过期方法,不建议使用 
ds3.unionAll(ds4).show 

// intersect 求交 
ds3.intersect(ds4).show 

// except 求差 
ds3.except(ds4).show
  • 空值处理
    • na.fill、na.drop ```scala // NaN (Not a Number) math.sqrt(-1.0) math.sqrt(-1.0).isNaN() df1.show

// 删除所有列的空值和NaN df1.na.drop.show

// 删除某列的空值和NaN df1.na.drop(Array(“mgr”)).show

// 对全部列填充;对指定单列填充;对指定多列填充 df1.na.fill(1000).show df1.na.fill(1000, Array(“comm”)).show df1.na.fill(Map(“mgr”->2000, “comm”->1000)).show

// 对指定的值进行替换 df1.na.replace(“comm” :: “deptno” :: Nil, Map(0 -> 100, 10 -> 100)).show

// 查询空值列或非空值列。isNull、isNotNull为内置函数 df1.filter(“comm is null”).show df1.filter($”comm”.isNull).show df1.filter(col(“comm”).isNull).show

df1.filter(“comm is not null”).show df1.filter(col(“comm”).isNotNull).show


- 窗口函数
   - 一般情况下窗口函数不用 DSL 处理,直接用SQL更方便
   - 参考源码Window.scala、WindowSpec.scala(主要)
```scala
import org.apache.spark.sql.expressions.Window 
val w1 = Window.partitionBy("cookieid").orderBy("createtime") 
val w2 = Window.partitionBy("cookieid").orderBy("pv") 
val w3 = w1.rowsBetween(Window.unboundedPreceding, Window.currentRow) 
val w4 = w1.rowsBetween(-1, 1) 

// 聚组函数【用分析函数的数据集】
df.select($"cookieid", $"pv", sum("pv").over(w1).alias("pv1")).show 
df.select($"cookieid", $"pv", sum("pv").over(w3).alias("pv1")).show 
df.select($"cookieid", $"pv", sum("pv").over(w4).as("pv1")).show 

// 排名 
df.select($"cookieid", $"pv", rank().over(w2).alias("rank")).show 
df.select($"cookieid", $"pv", dense_rank().over(w2).alias("denserank")).show 
df.select($"cookieid", $"pv", row_number().over(w2).alias("rownumber")).show 

// lag、lead 
df.select($"cookieid", $"pv", lag("pv", 2).over(w2).alias("rownumber")).show 
df.select($"cookieid", $"pv", lag("pv", -2).over(w2).alias("rownumber")).show

image.png

package cn.lagou.sparksql 
import org.apache.spark.rdd.RDD 
import org.apache.spark.sql.{Dataset, SparkSession} 

case class Info(id: String, tags: String) 

object SQLDemo { 
  def main(args: Array[String]): Unit = { 
    val spark = SparkSession 
    .builder() 
    .appName("Demo1") 
    .master("local[*]")
    .getOrCreate() 

    val sc = spark.sparkContext 
    sc.setLogLevel("warn") 

    import spark.implicits._ 
    val arr = Array("1 1,2,3", "2 2,3", "3 1,2") 
    val rdd: RDD[Info] = sc.makeRDD(arr).map { 
      line => val fields: Array[String] = line.split("\\s+") 
      Info(fields(0), fields(1))
    }

    val ds: Dataset[Info] = spark.createDataset(rdd) 
    ds.createOrReplaceTempView("t1") 
    spark.sql( """|select id, tag | from t1 | lateral view explode(split(tags, ",")) t2 as tag |""".stripMargin).show

    spark.sql( """|select id, explode(split(tags, ",")) | from t1 |""".stripMargin).show

    spark.close() 
  } 
}

2.7 输入与输出

  • SparkSQL内建支持的数据源包括:Parquet、JSON、CSV、Avro、Images、BinaryFiles(Spark 3.0)。其中Parquet是默认的数据源。 ```scala // 内部使用 DataFrameReader.format(args) .option(“key”, “value”) .schema(args) .load()

// 开发API SparkSession.read

![image.png](https://cdn.nlark.com/yuque/0/2021/png/3013578/1612017155389-55ae9e80-79c1-4bdc-a70b-d314604642b8.png#align=left&display=inline&height=185&margin=%5Bobject%20Object%5D&name=image.png&originHeight=369&originWidth=974&size=27250&status=done&style=none&width=487)
```scala
val df1 = spark.read.format("parquet").load("data/users.parquet") 

// Use Parquet; you can omit format("parquet") if you wish as it's the default 
val df2 = spark.read.load("data/users.parquet")

// Use CSV 
val df3 = spark.read.format("csv") 
.option("inferSchema", "true") 
.option("header", "true") 
.load("data/people1.csv") 

// Use JSON 
val df4 = spark.read.format("json") 
.load("data/emp.json")



// 内部使用 
DataFrameWriter.format(args) 
.option(args) 
.bucketBy(args) 
.partitionBy(args)
.save(path) 

// 开发API
DataFrame.write
  • Parquet文件: ```scala spark.sql( “””
          |CREATE OR REPLACE TEMPORARY VIEW users 
    |USING parquet 
    |OPTIONS (path "data/users.parquet")
    |""".stripMargin )
    

spark.sql(“select * from users”).show

df.write.format(“parquet”) .mode(“overwrite”) .option(“compression”, “snappy”) .save(“data/parquet”)


- json文件:
```scala
val fileJson = "data/emp.json" 
val df6 = spark.read.format("json").load(fileJson) 

spark.sql("""
            |CREATE OR REPLACE TEMPORARY VIEW emp 
      | USING json 
      | options(path "data/emp.json") 
      |""".stripMargin) 

spark.sql("SELECT * FROM emp").show() 
spark.sql("SELECT * FROM emp").write 
.format("json") 
.mode("overwrite") 
.save("data/json")

image.png

  • CSV文件: ```scala // CSV val fileCSV = “data/people1.csv”

val df = spark.read.format(“csv”) .option(“header”, “true”) .option(“inferschema”, “true”) .load(fileCSV)

spark.sql( “”” |CREATE OR REPLACE TEMPORARY VIEW people | USING csv |options(path “data/people1.csv”, | header “true”, | inferschema “true”) |”””.stripMargin)

spark.sql(“select * from people”) .write .format(“csv”) .mode(“overwrite”) .save(“data/csv”)

<a name="1327ba4c"></a>
### ![image.png](https://cdn.nlark.com/yuque/0/2021/png/3013578/1612089239220-96cdbfbc-5cab-45cb-a79b-132b4d0adef6.png#align=left&display=inline&height=344&margin=%5Bobject%20Object%5D&name=image.png&originHeight=526&originWidth=1033&size=218584&status=done&style=none&width=676)

- SparkSQL还支持使用JDBC的方式连接到外部数据源:
```scala
val jdbcDF = spark
.read
.format("jdbc")
.option("url", "jdbc:mysql://linux123:3306/ebiz?useSSL=false")
//&useUnicode=true
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "lagou_product_info")
.option("user", "hive")
.option("password", "12345678")
.load()
jdbcDF.show()
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:mysql://linux123:3306/ebiz?useSSL=false&characterEncoding=utf8")
.option("user", "hive")
.option("password", "12345678")
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "lagou_product_info_back")
.mode("append")
.save
  • 备注:如果有中文注意表的字符集,否则会有乱码
    • SaveMode.ErrorIfExists(默认)。若表存在,则会直接报异常,数据不能存入数据库
    • SaveMode.Append。若表存在,则追加在该表中;若该表不存在,则会先创建表,再插入数据
    • SaveMode.Overwrite。先将已有的表及其数据全都删除,再重新创建该表,最后插入新的数据
    • SaveMode.Ignore。若表不存在,则创建表并存入数据;若表存在,直接跳过数据的存储,不会报错 ```sql — 创建表 create table lagou_product_info_back as select * from lagou_product_info;

— 检查表的字符集 show create table lagou_product_info_back; show create table lagou_product_info;

— 修改表的字符集 alter table lagou_product_info_back convert to character set utf8;

![image.png](https://cdn.nlark.com/yuque/0/2021/png/3013578/1612093482188-04769369-17c4-43e9-a6c3-a562857684fe.png#align=left&display=inline&height=264&margin=%5Bobject%20Object%5D&name=image.png&originHeight=311&originWidth=828&size=61610&status=done&style=none&width=703)<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/3013578/1612093508850-589e2578-a507-4c35-b08d-853e9facb78c.png#align=left&display=inline&height=199&margin=%5Bobject%20Object%5D&name=image.png&originHeight=229&originWidth=827&size=52393&status=done&style=none&width=720)
<a name="LlOay"></a>
# 2.8 UDF & UDAF
<a name="VGVNy"></a>
## 1 UDF

- UDF(User Defined Function),自定义函数。函数的输入、输出都是一条数据记录,类似于Spark SQL中普通的数学或字符串函数。实现上看就是普通的Scala函数;
- UDAF(User Defined Aggregation Funcation),用户自定义聚合函数。函数本身作用于数据集合,能够在聚合操作的基础上进行自定义操作(多条数据输入,一条数据输出);类似于在group by之后使用的sum、avg等函数;
- 用Scala编写的UDF与普通的Scala函数几乎没有任何区别,唯一需要多执行的一个步骤是要在SQLContext注册它。
```scala
def len(bookTitle: String):Int = bookTitle.length
spark.udf.register("len", len _)
val booksWithLongTitle = spark.sql("select title, author from books where len(title) > 10")
  • 编写的UDF可以放到SQL语句的fields部分,也可以作为where、groupBy或者having子句的一部分。
  • 也可以在使用UDF时,传入常量而非表的列名。稍稍修改一下前面的函数,让长度 10 作为函数的参数传入:

    def lengthLongerThan(bookTitle: String, length: Int): Boolean = bookTitle.length > length
    spark.udf.register("longLength", lengthLongerThan _)
    val booksWithLongTitle = spark.sql("select title, author from books where longLength(title,10)")
    
  • 若使用DataFrame的API,则以字符串的形式将UDF传入:

    val booksWithLongTitle = dataFrame.filter("longLength(title, 10)")
    
  • DataFrame的API也可以接收Column对象,可以用SparkSQL编程 - 图6是定义在SQLImplicits 对象中的一个隐式转换。此时,UDF的定义也不相同,不能直接定义Scala函数,而是要用定义在org.apache.spark.sql.functions中的 udf 方法来接收一个函数。这种方式无需register:

    import org.apache.spark.sql.functions._
    val longLength = udf((bookTitle: String, length: Int) => bookTitle.length > length)
    import spark.implicits._
    val booksWithLongTitle = dataFrame.filter(longLength($"title", lit(10)))
    
  • 完整示例: ```scala package cn.lagou.sparksql

import org.apache.spark.sql.{Row, SparkSession}

class UDF { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName(this.getClass.getCanonicalName) .master(“local[*]”) .getOrCreate()

spark.sparkContext.setLogLevel("WARN") 

val data = List(("scala", "author1"), ("spark", "author2"), ("hadoop", "author3"), ("hive", "author4"), ("strom", "author5"), ("kafka", "author6")) 
val df = spark.createDataFrame(data).toDF("title", "author") 
df.createTempView("books")

// 定义函数并注册 
def len1(bookTitle: String):Int = bookTitle.length 
spark.udf.register("len1", len1 _) 

// UDF可以在select语句、where语句等多处使用 
spark.sql("select title, author, len1(title) from books").show
spark.sql("select title, author from books where len1(title)>5").show 

// UDF可以在DataFrame、Dataset的API中使用 
import spark.implicits._ 
df.filter("len1(title)>5").show 

// 不能通过编译 
// df.filter(len1($"title")>5).show 
// 能通过编译,但不能执行 
// df.select("len1(title)").show 
// 不能通过编译 
// df.select(len1($"title")).show 
// 如果要在DSL语法中使用$符号包裹字符串表示一个Column,需要用udf方法来接收函数。这种函数无需注册 
import org.apache.spark.sql.functions._ 
// val len2 = udf((bookTitle: String) => bookTitle.length) 

// val a:(String) => Int = (bookTitle: String) => bookTitle.length 
// val len2 = udf(a) 

val len2 = udf(len1 _) 
df.filter(len2($"title") > 5).show
df.select($"title", $"author", len2($"title")).show 

// 不使用UDF 
df.map{case Row(title: String, author: String) => (title, author, title.length)}.show 

spark.stop() 

} }

<a name="wyEs7"></a>
## 2 UDAF

- 数据如下:
```scala
id, name, sales, discount, state, saleDate
1, "Widget Co", 1000.00, 0.00, "AZ", "2019-01-01"
2, "Acme Widgets", 2000.00, 500.00, "CA", "2019-02-01"
3, "Widgetry", 1000.00, 200.00, "CA", "2020-01-11"
4, "Widgets R Us", 2000.00, 0.0, "CA", "2020-02-19"
5, "Ye Olde Widgete", 3000.00, 0.0, "MA", "2020-02-28"

最后要得到的结果为:
(2020年的合计值 – 2019年的合计值) / 2019年的合计值
(6000 - 3000) / 3000 = 1

执行以下SQL得到最终的结果:
select userFunc(sales, saleDate) from table1;
即计算逻辑在userFunc中实现
  • 普通的UDF不支持数据的聚合运算。如当要对销售数据执行年度同比计算,就需要对当年和上一年的销量分别求和,然后再利用公式进行计算。此时需要使用UDAF,Spark为所有的UDAF定义了一个父类UserDefinedAggregateFunction 。要继承这个类,需要实现父类的几个抽象方法:
  • image.png
  • inputSchema用于定义与DataFrame列有关的输入样式
  • bufferSchema用于定义存储聚合运算时产生的中间数据结果的Schema
  • dataType标明了UDAF函数的返回值类型
  • deterministic是一个布尔值,用以标记针对给定的一组输入,UDAF是否总是生成相同的结果
  • initialize对聚合运算中间结果的初始化
  • update函数的第一个参数为bufferSchema中两个Field的索引,默认以 0 开始;UDAF的核心计算都发生在
  • update函数中;update函数的第二个参数input: Row对应的并非DataFrame的行,而是被inputSchema投影了的行
  • merge函数负责合并两个聚合运算的buffer,再将其存储到MutableAggregationBuffer中
  • evaluate函数完成对聚合Buffer值的运算,得到最终的结果

  • UDAF—类型不安全 ```scala package cn.lagou.sparksql import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.{Row, SparkSession, TypedColumn} import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types.{DataType, DoubleType, StringType, StructType}

class TypeUnsafeUDAF extends UserDefinedAggregateFunction {

// UDAF与DataFrame列有关的输入样式 override def inputSchema: StructType = new StructType().add(“sales”, DoubleType).add(“saledate”, StringType)

// 缓存中间结果 override def bufferSchema: StructType = new StructType().add(“year2019”, DoubleType).add(“year2020”, DoubleType)

// UDAF函数的返回值类型 override def dataType: DataType = DoubleType

// 布尔值,用以标记针对给定的一组输入,UDAF是否总是生成相同的结果。通常用true override def deterministic: Boolean = true

// initialize就是对聚合运算中间结果的初始化 override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0.0 buffer(1) = 0.0 }

// UDAF的核心计算都发生在update函数中。 // 扫描每行数据,都会调用一次update,输入buffer(缓存中间结果)、input(这一行的输入值) // update函数的第一个参数为bufferSchema中两个Field的索引,默认以0开始 // update函数的第二个参数input: Row对应的是被inputSchema投影了的行。 // 本例中每一个input就应该只有两个Field的值,input(0)代表销量,input(1)代表销售日期 override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { val salenumber = input.getAsDouble // 输入的字符串为:2014-01-02,take(4) 取出有效的年份 input.getString(1).take(4) match { case “2019” => buffer(0) = buffer.getAsDouble + salenumber case “2020” => buffer(1) = buffer.getAsDouble + salenumber case _ => println(“ERROR!”) } }

// 合并两个分区的buffer1、buffer2,将最终结果保存在buffer1中 override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getDouble(0) + buffer2.getDouble(0) buffer1(1) = buffer1.getDouble(1) + buffer2.getDouble(1) }

// 取出buffer(缓存的值)进行运算,得到最终结果 override def evaluate(buffer: Row): Double = { println(s”evaluate : ${buffer.getDouble(0)}, ${buffer.getDouble(1)}”) if (buffer.getDouble(0) == 0.0) 0.0 else (buffer.getDouble(1) - buffer.getDouble(0)) / buffer.getDouble(0) } }

object TypeUnsafeUDAFTest{ def main(args: Array[String]): Unit = { Logger.getLogger(“org”).setLevel(Level.WARN)

val spark = SparkSession.builder() 
.appName(s"${this.getClass.getCanonicalName}")
.master("local[*]") 
.getOrCreate() 

val sales = Seq( 
  (1, "Widget Co", 1000.00, 0.00, "AZ", "2019-01-02"),
  (2, "Acme Widgets", 2000.00, 500.00, "CA", "2019-02-01"),
  (3, "Widgetry", 1000.00, 200.00, "CA", "2020-01-11"), 
  (4, "Widgets R Us", 2000.00, 0.0, "CA", "2020-02-19"), 
  (5, "Ye Olde Widgete", 3000.00, 0.0, "MA", "2020-02-28")) 

val salesDF = spark.createDataFrame(sales).toDF("id", "name", "sales", "discount", "state", "saleDate") 
salesDF.createTempView("sales") 

val userFunc = new TypeUnsafeUDAF 

spark.udf.register("userFunc", userFunc) 
spark.sql("select userFunc(sales, saleDate) as rate from sales").show() 

spark.stop() 

} }


- **UDAF--类型安全**

![image.png](https://cdn.nlark.com/yuque/0/2021/png/3013578/1612094559922-467ded97-682a-406a-a924-c238e3e77b92.png#align=left&display=inline&height=385&margin=%5Bobject%20Object%5D&name=image.png&originHeight=569&originWidth=1024&size=56642&status=done&style=none&width=693)
```scala
package cn.lagou.sparksql 
import org.apache.log4j.{Level, Logger} 
import org.apache.spark.sql.{Encoder, Encoders, SparkSession, TypedColumn} 
import org.apache.spark.sql.expressions.Aggregator 

case class Sales(id: Int, name1: String, sales: Double, discount: Double, name2: String, stime: String) 

case class SalesBuffer(var sales2019: Double, var sales2020: Double) 

class TypeSafeUDAF extends Aggregator[Sales, SalesBuffer, Double]{ 
  // 定义初值 
  override def zero: SalesBuffer = SalesBuffer(0, 0) 

  // 分区内数据合并 
  override def reduce(buf: SalesBuffer, inputRow: Sales): SalesBuffer = { 
    val sales: Double = inputRow.sales 
    val year: String = inputRow.stime.take(4) year match { 
      case "2019" => buf.sales2019 += sales 
      case "2020" => buf.sales2020 += sales 
      case _ => println("ERROR!") 
    }
    buf 
  }

  // 分区间数据合并 
  override def merge(part1: SalesBuffer, part2: SalesBuffer): SalesBuffer = { 
    val sales1 = part1.sales2019 + part2.sales2019
    val sales2 = part1.sales2020 + part2.sales2020 
    SalesBuffer(sales1, sales2)
  }

  // 最终结果的计算
  override def finish(reduction: SalesBuffer): Double = { 
    if (math.abs(reduction.sales2019) < 0.0000000001) 0 
    else (reduction.sales2020 - reduction.sales2019) / reduction.sales2019 
  }

  // 定义buffer 和 输出结果的编码器 
  override def bufferEncoder: Encoder[SalesBuffer] = Encoders.product 
  override def outputEncoder: Encoder[Double] = Encoders.scalaDouble 
}

object TypeSafeUDAFTest{ 
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.WARN) 

    val spark = SparkSession.builder() 
    .appName(s"${this.getClass.getCanonicalName}") 
    .master("local[*]") 
    .getOrCreate() 

    val sales = Seq( 
      Sales(1, "Widget Co", 1000.00, 0.00, "AZ", "2019-01-02"), 
      Sales(2, "Acme Widgets", 2000.00, 500.00, "CA", "2019-02-01"), 
      Sales(3, "Widgetry", 1000.00, 200.00, "CA", "2020-01-11"), 
      Sales(4, "Widgets R Us", 2000.00, 0.0, "CA", "2020-02-19"), 
      Sales(5, "Ye Olde Widgete", 3000.00, 0.0, "MA", "2020-02-28")) 

    import spark.implicits._ 
    val ds = spark.createDataset(sales) 
    ds.show 

    // name 会作为列名 
    val rate: TypedColumn[Sales, Double] = new TypeSafeUDAF().toColumn.name("rate")
    ds.select(rate).show 

    spark.stop() 
  } 
}

2.9 访问Hive

  • 在 pom 文件中增加依赖:

    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.12</artifactId>
    <version>${spark.version}</version>
    </dependency>
    
  • 在 resources中增加hive-site.xml文件,在文件中增加内容:

    <configuration>
    <property>
      <name>hive.metastore.uris</name>
      <value>thrift://linux123:9083</value>
    </property>
    </configuration>
    
  • 备注:最好使用 metastore service 连接Hive;使用直连 metastore 的方式时,SparkSQL程序会修改 Hive 的版本信息;

  • 默认Spark使用 Hive 1.2.1进行编译,包含对应的serde, udf, udaf等。
  • image.png ```scala package cn.lagou.sparksql import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

object AccessHive { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName(“Demo1”) .master(“local[*]”) .enableHiveSupport() // 设为true时,Spark使用与Hive相同的约定来编写Parquet数据 .config(“spark.sql.parquet.writeLegacyFormat”, true) .getOrCreate()

val sc = spark.sparkContext sc.setLogLevel("warn") 

spark.sql("show databases").show 
spark.sql("select * from ods.ods_trade_product_info").show 

val df: DataFrame = spark.table("ods.ods_trade_product_info") 
df.show() 

df.write.mode(SaveMode.Append).saveAsTable("ods.ods_trade_product_info_back") 
spark.table("ods.ods_trade_product_info_back").show 

spark.close() 

} } ```