SparkSQL作为Spark生态的一员继续发展,而不再受限于Hive,只是兼容Hive;
Hive on Spark是一个Hive的发展计划,该计划将Spark作为Hive的底层引擎之一,也就是说,Hive将不再受限于一个引擎,可以采用Map-Reduce、Tez、Spark等引擎。

对于开发人员来讲,SparkSQL可以简化RDD的开发,提高开发效率,且执行效率非常快,所以实际工作中,基本上采用的就是Spark SQL。
Spark SQL为了简化RDD的开发,提高开发效率,提供了2个编程抽象,类似Spark Core中的RDD

  • DataFrame
  • DataSet

读取文件

注意:
如果从内存中获取数据,spark可以知道数据类型具体是什么。如果是数字,默认作为Int处理;
但是从文件中读取的数字,不能确定是什么类型,所以用bigint接收,可以和Long类型转换,但是和Int不能进行转换

  1. scala> val df = spark.read.json("data/user.json")
  2. df: org.apache.spark.sql.DataFrame = [age: bigintusername: string]

创建会话

Core: SparkContext
SparkContext.xxx 创建RDD


Sql: SparkSession.xxx 创建DF或DS


如何创建SparkSessoin:
* ①如果已经创建过了SparkSession,可以通过SparkSession.builder().getOrCreate()获取已经存在的SparkSession

* ② SparkSession.builder.xxxx.xxx.xxxx.getOrCreate() 创建一个新的SparkSession

// sparkSession 和 session 共用同一个SparkContext

_session.read.格式,及_session.sql(“xxxx”)

  1. //在每个测试方法之前运行
  2. @Before
  3. def init(): Unit ={
  4. session = SparkSession.builder
  5. .master("local[4]")
  6. .appName("Word Count")
  7. //.config("spark.some.config.option", "some-value") //添加spark的参数
  8. .getOrCreate()
  9. }
  10. //在每个测试方法之后运行
  11. @After
  12. def close(): Unit ={
  13. session.close()
  14. }
  1. @Test
  2. def testCreateDF1() : Unit ={
  3. //如果从内存中获取数据,spark可以知道数据类型具体是什么。如果是数字,默认作为Int处理;
  4. //但是从文件中读取的数字,不能确定是什么类型,所以用bigint接收,可以和Long类型转换,但是和Int不能进行转换
  5. val dataFrame: DataFrame = session.read.json("input/people.json")
  6. //查看
  7. dataFrame.show()
  8. //只查看schame
  9. dataFrame.printSchema()
  10. // 使用sql分析
  11. //为当前的表格起名
  12. dataFrame.createTempView("person")
  13. session.sql("select avg(age) from person where age > 20").show()
  14. }

创建视图

将数据映射为一张表

  1. @Test
  2. def testGlobalView() : Unit ={
  3. val dataFrame1: DataFrame = session.read.json("input/people.json")
  4. //创建为全局视图
  5. dataFrame1.createOrReplaceGlobalTempView("person")
  6. val sparkSession: SparkSession = session.newSession()
  7. //访问全局视图,添加global_temp名称空间,类似于库名.表名
  8. sparkSession.sql("select avg(age) from global_temp.person where age > 20").show()
  9. }
  10. /*
  11. 一个表格可以有多个视图名
  12. Temporary view 'person' already exists;
  13. 在一个session中,同一个视图名只能指向唯一表格!
  14. 默认创建的视图,不能跨越Session的,如果希望跨session操作,必须是全局视图!
  15. */
  16. @Test
  17. def testCreateTempViewDifference() : Unit ={
  18. val dataFrame1: DataFrame = session.read.json("input/people.json")
  19. val dataFrame2: DataFrame = session.read.json("input/people1.json")
  20. dataFrame1.createTempView("person")
  21. //dataFrame2.createTempView("person") 报错
  22. //存在就替换,不存在就创建
  23. dataFrame2.createOrReplaceTempView("person")
  24. println("----------------")
  25. // sparkSession 和 session 共用同一个SparkContext
  26. // session还能分裂出多一个sparkSession
  27. val sparkSession: SparkSession = session.newSession()
  28. println(sparkSession.sparkContext == session.sparkContext)
  29. val dataFrame3: DataFrame = sparkSession.read.json("input/people1.json")
  30. dataFrame3.createTempView("person")
  31. sparkSession.sql("select avg(age) from person where age > 20").show()
  32. }

创建DataFrame


/
* 基于集合创建DF

DataFrame=RDD(数据)+schema(元数据)

RDD:
def createDataFrame(rows: java.util.List[Row], schema: StructType)
Row: 代表关系型运算中的一行输出。
构造: Row(x,x,x,x)
类比为数组,通过索引定位元素
获取元素: Row(index) 返回Any(没有类型)
Row.getXxx(index) 返回带类型的数据
弱类型!
schema:
StructType: 表的结构 样例类,apply方法:StructType() 参数是一个StructField的list或array
StructField(schema): 表中的字段,样例类,apply方法:StructField()
DataType: 字段的数据类型
是什么类型,就使用什么样的case object
例如: StringType、IntegerType…
*/

  1. // 通过Row类型创建DF
  2. @Test
  3. def testCreateDF2() : Unit ={
  4. //将row数据添加到数组中
  5. var datas:java.util.List[Row]=new util.ArrayList[Row]();
  6. datas.add(Row("jack",20))
  7. datas.add(Row("jack1",30))
  8. datas.add(Row("jack2",40))
  9. // 表结构
  10. var schema: StructType=StructType( StructField("name",StringType) :: StructField("age",IntegerType) :: Nil)
  11. val df: DataFrame = session.createDataFrame(datas, schema)
  12. df.show()
  13. df.printSchema()
  14. }

Row补充:

创建Row的两种方式

直接传值
* Row(value1, value2, value3, …)
传进集合:
* Row.fromSeq(Seq(value1, value2, …))

特性

Row(a,b,c,d)
可以通过索引获取:Row(2):Any=c
但是通过下标获取的的值类型是Any,是不确定类型的,所以Row是弱类型数据,需要强转

Row提供了转换方法(明确知道index是什么类型的时候使用!!)
_row.getXxx(index):_

  1. val firstValue = row.getInt(0)*
  2. // firstValue: Int = 1
  3. val isNull = row.isNullAt(3)*
  4. // isNull: Boolean = true

创建DataSet

/*
DS 和DS的区别:
DS[T]
DF : Dataset[Row]
泛型上下文:
def createDatasetT : Encoder: Dataset[T]
等价于
def createDatasetEncoder(implicit aa:Encoder[T])

  1. ** 重要**:** Encoder**: ①可以直接通过SparkSession中的一些隐式方法,隐式创建Encoder.<br /> 不用你创建,它偷偷给你创建,你看不见<br /> `import sparksession对象.implicits._`<br /> 隐式类import后面的类名必须是用val修饰,因为Scala只支持val修饰的对象的引入。
  2. ②可以显式调用Encoders的静态方法创建<br /> 1scala中的基本数据类型: `Encoders.scalaXxx`<br /> 2scalaString : `Encoders.STRING`<br /> 3)自己定义的类:<br /> 3.1)样例类: ` ExpressionEncoder[T]()` ` Encoders.product[T]`<br /> 因为所有样例类都实现了ProductSerilizable接口<br /> 3.2)非样例类: ` ExpressionEncoder[T]()`

Row类型RDD调用toDS,无法调用,因为没有继承producer,无法构建Encoder,需要使用以下方法:

  1. val df:DataFrame=session.createDataFrame(rdd,schema)

*/

  1. package com.tcode
  2. import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
  3. import org.apache.spark.sql.{DataFrame, Dataset, Encoders, Row, SparkSession}
  4. import org.junit.{After, Before, Test}
  5. import java.util
  6. // 实现了 Product ,Serilizable
  7. case class Emp(name:String,age:Int)
  8. class DFAndDSTest {
  9. var session:SparkSession = null
  10. @Test
  11. def testCreateDS() : Unit ={
  12. val emps = List(Emp("jack", 30), Emp("marray", 20))
  13. // 隐式类import后面的类名必须是用val修饰,因为Scala只支持val修饰的对象的引入。
  14. //import session.implicits._
  15. val ds: Dataset[Emp] = session.createDataset(emps)(Encoders.product[Emp])
  16. ds.show()
  17. ds.printSchema()
  18. }
  19. //在每个测试方法之前运行
  20. @Before
  21. def init(): Unit ={
  22. session = SparkSession.builder
  23. .master("local[4]")
  24. .appName("Word Count")
  25. //.config("spark.some.config.option", "some-value") //添加spark的参数
  26. .getOrCreate()
  27. }
  28. //在每个测试方法之后运行
  29. @After
  30. def close(): Unit ={
  31. session.close()
  32. }
  33. }

DF-DS-RDD转换

总结:

image.png

RDD—DF|DS

Row类型RDD调用toDS,无法调用,因为没有继承producer,无法构建Encoder,需要使用以下方法:

  1. val df:DataFrame=session.createDataFrame(rdd,schema)
  1. @Test
  2. def testRDDToDf() : Unit ={
  3. val sparkSession: SparkSession = SparkSession.builder.getOrCreate()
  4. val list = List(Person("jack", 20), Person("marry", 30))
  5. val rdd: RDD[Person] = session.sparkContext.makeRDD(list)
  6. // 不导入RDD是没有方法提示的
  7. import sparkSession.implicits._
  8. //implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T]
  9. // rdd: RDD[Person] ----->rddToDatasetHolder(rdd) ----> DatasetHolder[Person] ----------->DatasetHolder[Person].toDF()
  10. val df: DataFrame = rdd.toDF()
  11. val ds: Dataset[Person] = rdd.toDS()
  12. }

DF—DS—RDD

  1. package com.tcode
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
  4. import org.junit.{After, Before, Test}
  5. case class Person(name:String,age:Int)
  6. /**
  7. * Created by VULCAN on 2021/6/5
  8. */
  9. class ConversionTest {
  10. var session:SparkSession = null
  11. /*
  12. DF/DS.rdd
  13. */
  14. @Test
  15. def testDFToRDD() : Unit ={
  16. val sparkSession: SparkSession = SparkSession.builder.getOrCreate()
  17. val dataFrame1: DataFrame = session.read.json("input/people.json")
  18. // DF转换为rdd
  19. val rdd: RDD[Row] = dataFrame1.rdd
  20. // DF转换DS
  21. import sparkSession.implicits._
  22. val ds: Dataset[Person] = dataFrame1.as[Person]
  23. // DS转换rdd
  24. val rdd1: RDD[Person] = ds.rdd
  25. }
  26. //在每个测试方法之前运行
  27. @Before
  28. def init(): Unit ={
  29. session = SparkSession.builder
  30. .master("local[4]")
  31. .appName("Word Count")
  32. //.config("spark.some.config.option", "some-value") //添加spark的参数
  33. .getOrCreate()
  34. }
  35. //在每个测试方法之后运行
  36. @After
  37. def close(): Unit ={
  38. session.close()
  39. }
  40. }

RDD[Row]—DF

val df: DataFrame = session1.implicits.rddToDatasetHolder(rdd2)(ExpressionEncoder[Row]()).toDF()

  1. @Test
  2. def testRowTypeRddToDF() : Unit ={
  3. val emps = List(Emp("jack", 30), Emp("marray", 20))
  4. val rdd1: RDD[Emp] = session.sparkContext.makeRDD(emps)
  5. val rdd2: RDD[Row] = rdd1.map(emp => Row(emp.name, emp.age))
  6. val session1: SparkSession = SparkSession.builder.getOrCreate()
  7. import session1.implicits._
  8. val df: DataFrame = session1.implicits.rddToDatasetHolder(rdd2)(ExpressionEncoder[Row]()).toDF()
  9. //rdd2.toDF()
  10. // df.show()
  11. rdd1.toDF()
  12. }

Encoder

重要 Encoder: ①可以直接通过SparkSession中的一些隐式方法,隐式创建Encoder.
不用你创建,它偷偷给你创建,你看不见
import sparksession对象.implicits._
隐式类import后面的类名必须是用val修饰,因为Scala只支持val修饰的对象的引入。

  1. ②可以显式调用Encoders的静态方法创建<br /> 1scala中的基本数据类型: `Encoders.scalaXxx`<br /> 2scalaString : `Encoders.STRING`<br /> 3)自己定义的类:<br /> 3.1)样例类: ` ExpressionEncoder[T]()` ` Encoders.product[T]`<br /> 因为所有样例类都实现了ProductSerilizable接口<br /> 3.2)非样例类: ` ExpressionEncoder[T]()`

Row类型RDD调用toDS,无法调用,因为没有继承producer,无法构建Encoder,需要使用一下方法: