SparkSQL作为Spark生态的一员继续发展,而不再受限于Hive,只是兼容Hive;
Hive on Spark是一个Hive的发展计划,该计划将Spark作为Hive的底层引擎之一,也就是说,Hive将不再受限于一个引擎,可以采用Map-Reduce、Tez、Spark等引擎。
对于开发人员来讲,SparkSQL可以简化RDD的开发,提高开发效率,且执行效率非常快,所以实际工作中,基本上采用的就是Spark SQL。
Spark SQL为了简化RDD的开发,提高开发效率,提供了2个编程抽象,类似Spark Core中的RDD
- DataFrame
- DataSet
读取文件
注意:
如果从内存中获取数据,spark可以知道数据类型具体是什么。如果是数字,默认作为Int处理;
但是从文件中读取的数字,不能确定是什么类型,所以用bigint接收,可以和Long类型转换,但是和Int不能进行转换
scala> val df = spark.read.json("data/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint,username: string]
创建会话
Core: SparkContext
SparkContext.xxx 创建RDD
Sql: SparkSession.xxx 创建DF或DS
如何创建SparkSessoin:
* ①如果已经创建过了SparkSession,可以通过SparkSession.builder().getOrCreate()获取已经存在的SparkSession
* ② SparkSession.builder.xxxx.xxx.xxxx.getOrCreate() 创建一个新的SparkSession
// sparkSession 和 session 共用同一个SparkContext
_session.read.格式,及_session.sql(“xxxx”)
//在每个测试方法之前运行
@Before
def init(): Unit ={
session = SparkSession.builder
.master("local[4]")
.appName("Word Count")
//.config("spark.some.config.option", "some-value") //添加spark的参数
.getOrCreate()
}
//在每个测试方法之后运行
@After
def close(): Unit ={
session.close()
}
@Test
def testCreateDF1() : Unit ={
//如果从内存中获取数据,spark可以知道数据类型具体是什么。如果是数字,默认作为Int处理;
//但是从文件中读取的数字,不能确定是什么类型,所以用bigint接收,可以和Long类型转换,但是和Int不能进行转换
val dataFrame: DataFrame = session.read.json("input/people.json")
//查看
dataFrame.show()
//只查看schame
dataFrame.printSchema()
// 使用sql分析
//为当前的表格起名
dataFrame.createTempView("person")
session.sql("select avg(age) from person where age > 20").show()
}
创建视图
将数据映射为一张表
@Test
def testGlobalView() : Unit ={
val dataFrame1: DataFrame = session.read.json("input/people.json")
//创建为全局视图
dataFrame1.createOrReplaceGlobalTempView("person")
val sparkSession: SparkSession = session.newSession()
//访问全局视图,添加global_temp名称空间,类似于库名.表名
sparkSession.sql("select avg(age) from global_temp.person where age > 20").show()
}
/*
一个表格可以有多个视图名
Temporary view 'person' already exists;
在一个session中,同一个视图名只能指向唯一表格!
默认创建的视图,不能跨越Session的,如果希望跨session操作,必须是全局视图!
*/
@Test
def testCreateTempViewDifference() : Unit ={
val dataFrame1: DataFrame = session.read.json("input/people.json")
val dataFrame2: DataFrame = session.read.json("input/people1.json")
dataFrame1.createTempView("person")
//dataFrame2.createTempView("person") 报错
//存在就替换,不存在就创建
dataFrame2.createOrReplaceTempView("person")
println("----------------")
// sparkSession 和 session 共用同一个SparkContext
// session还能分裂出多一个sparkSession
val sparkSession: SparkSession = session.newSession()
println(sparkSession.sparkContext == session.sparkContext)
val dataFrame3: DataFrame = sparkSession.read.json("input/people1.json")
dataFrame3.createTempView("person")
sparkSession.sql("select avg(age) from person where age > 20").show()
}
创建DataFrame
/
* 基于集合创建DF
DataFrame=RDD(数据)+schema(元数据)
RDD:
def createDataFrame(rows: java.util.List[Row], schema: StructType)
Row: 代表关系型运算中的一行输出。
构造: Row(x,x,x,x)
类比为数组,通过索引定位元素
获取元素: Row(index) 返回Any(没有类型)
Row.getXxx(index) 返回带类型的数据
弱类型!
schema:
StructType: 表的结构 样例类,apply方法:StructType() 参数是一个StructField的list或array
StructField(schema): 表中的字段,样例类,apply方法:StructField()
DataType: 字段的数据类型
是什么类型,就使用什么样的case object
例如: StringType、IntegerType…
*/
// 通过Row类型创建DF
@Test
def testCreateDF2() : Unit ={
//将row数据添加到数组中
var datas:java.util.List[Row]=new util.ArrayList[Row]();
datas.add(Row("jack",20))
datas.add(Row("jack1",30))
datas.add(Row("jack2",40))
// 表结构
var schema: StructType=StructType( StructField("name",StringType) :: StructField("age",IntegerType) :: Nil)
val df: DataFrame = session.createDataFrame(datas, schema)
df.show()
df.printSchema()
}
Row补充:
创建Row的两种方式
直接传值
* Row(value1, value2, value3, …)
传进集合:
* Row.fromSeq(Seq(value1, value2, …))
特性
Row(a,b,c,d)
可以通过索引获取:Row(2):Any=c
但是通过下标获取的的值类型是Any,是不确定类型的,所以Row是弱类型数据,需要强转
Row提供了转换方法(明确知道index是什么类型的时候使用!!)_row.getXxx(index):_
val firstValue = row.getInt(0)*
// firstValue: Int = 1
val isNull = row.isNullAt(3)*
// isNull: Boolean = true
创建DataSet
/*
DS 和DS的区别:
DS[T]
DF : Dataset[Row]
泛型上下文:
def createDatasetT : Encoder: Dataset[T]
等价于
def createDatasetEncoder(implicit aa:Encoder[T])
** 重要**:** Encoder**: ①可以直接通过SparkSession中的一些隐式方法,隐式创建Encoder.<br /> 不用你创建,它偷偷给你创建,你看不见<br /> `import sparksession对象.implicits._`<br /> 隐式类import后面的类名必须是用val修饰,因为Scala只支持val修饰的对象的引入。
②可以显式调用Encoders的静态方法创建<br /> 1)scala中的基本数据类型: `Encoders.scalaXxx`<br /> 2)scala的String : `Encoders.STRING`<br /> 3)自己定义的类:<br /> 3.1)样例类: ` ExpressionEncoder[T]()` 或 ` Encoders.product[T]`<br /> 因为所有样例类都实现了Product,Serilizable接口<br /> 3.2)非样例类: ` ExpressionEncoder[T]()`
Row类型RDD调用toDS,无法调用,因为没有继承producer,无法构建Encoder,需要使用以下方法:
val df:DataFrame=session.createDataFrame(rdd,schema)
*/
package com.tcode
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Encoders, Row, SparkSession}
import org.junit.{After, Before, Test}
import java.util
// 实现了 Product ,Serilizable
case class Emp(name:String,age:Int)
class DFAndDSTest {
var session:SparkSession = null
@Test
def testCreateDS() : Unit ={
val emps = List(Emp("jack", 30), Emp("marray", 20))
// 隐式类import后面的类名必须是用val修饰,因为Scala只支持val修饰的对象的引入。
//import session.implicits._
val ds: Dataset[Emp] = session.createDataset(emps)(Encoders.product[Emp])
ds.show()
ds.printSchema()
}
//在每个测试方法之前运行
@Before
def init(): Unit ={
session = SparkSession.builder
.master("local[4]")
.appName("Word Count")
//.config("spark.some.config.option", "some-value") //添加spark的参数
.getOrCreate()
}
//在每个测试方法之后运行
@After
def close(): Unit ={
session.close()
}
}
DF-DS-RDD转换
总结:
RDD—DF|DS
Row类型RDD调用toDS,无法调用,因为没有继承producer,无法构建Encoder,需要使用以下方法:
val df:DataFrame=session.createDataFrame(rdd,schema)
@Test
def testRDDToDf() : Unit ={
val sparkSession: SparkSession = SparkSession.builder.getOrCreate()
val list = List(Person("jack", 20), Person("marry", 30))
val rdd: RDD[Person] = session.sparkContext.makeRDD(list)
// 不导入RDD是没有方法提示的
import sparkSession.implicits._
//implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T]
// rdd: RDD[Person] ----->rddToDatasetHolder(rdd) ----> DatasetHolder[Person] ----------->DatasetHolder[Person].toDF()
val df: DataFrame = rdd.toDF()
val ds: Dataset[Person] = rdd.toDS()
}
DF—DS—RDD
package com.tcode
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.junit.{After, Before, Test}
case class Person(name:String,age:Int)
/**
* Created by VULCAN on 2021/6/5
*/
class ConversionTest {
var session:SparkSession = null
/*
DF/DS.rdd
*/
@Test
def testDFToRDD() : Unit ={
val sparkSession: SparkSession = SparkSession.builder.getOrCreate()
val dataFrame1: DataFrame = session.read.json("input/people.json")
// DF转换为rdd
val rdd: RDD[Row] = dataFrame1.rdd
// DF转换DS
import sparkSession.implicits._
val ds: Dataset[Person] = dataFrame1.as[Person]
// DS转换rdd
val rdd1: RDD[Person] = ds.rdd
}
//在每个测试方法之前运行
@Before
def init(): Unit ={
session = SparkSession.builder
.master("local[4]")
.appName("Word Count")
//.config("spark.some.config.option", "some-value") //添加spark的参数
.getOrCreate()
}
//在每个测试方法之后运行
@After
def close(): Unit ={
session.close()
}
}
RDD[Row]—DF
val df: DataFrame = session1.implicits.rddToDatasetHolder(rdd2)(ExpressionEncoder[Row]()).toDF()
@Test
def testRowTypeRddToDF() : Unit ={
val emps = List(Emp("jack", 30), Emp("marray", 20))
val rdd1: RDD[Emp] = session.sparkContext.makeRDD(emps)
val rdd2: RDD[Row] = rdd1.map(emp => Row(emp.name, emp.age))
val session1: SparkSession = SparkSession.builder.getOrCreate()
import session1.implicits._
val df: DataFrame = session1.implicits.rddToDatasetHolder(rdd2)(ExpressionEncoder[Row]()).toDF()
//rdd2.toDF()
// df.show()
rdd1.toDF()
}
Encoder
重要: Encoder: ①可以直接通过SparkSession中的一些隐式方法,隐式创建Encoder.
不用你创建,它偷偷给你创建,你看不见
import sparksession对象.implicits._
隐式类import后面的类名必须是用val修饰,因为Scala只支持val修饰的对象的引入。
②可以显式调用Encoders的静态方法创建<br /> 1)scala中的基本数据类型: `Encoders.scalaXxx`<br /> 2)scala的String : `Encoders.STRING`<br /> 3)自己定义的类:<br /> 3.1)样例类: ` ExpressionEncoder[T]()` 或 ` Encoders.product[T]`<br /> 因为所有样例类都实现了Product,Serilizable接口<br /> 3.2)非样例类: ` ExpressionEncoder[T]()`
Row类型RDD调用toDS,无法调用,因为没有继承producer,无法构建Encoder,需要使用一下方法: