2.1 DataFrame
2.2.1 创建
- 从Spark数据源进行创建
- 从RDD进行转换
- 从Hive Table进行查询返回
2.2.2 SQL风格语法
```scala val sparkSession = SparkSession.builder().appName(“creatDataFrameDemo”).master(“local[*]”).getOrCreate() val df = sparkSession.read.json(“Spark/src/main/resources/json.json”)
df.createTempView(“data”) val sqlDf = sparkSession.sql(“SELECT * FROM data”) sqlDf.show()
<a name="i39WN"></a>
### 2.2.3 RDD转换为DataFrame
- 手动确定转换(按顺序匹配)
- 反射转换(样例类)
<a name="A5rFi"></a>
### 2.2.4 DataFrame转换为RDD
直接调用rdd。
<a name="ASuTZ"></a>
## 2.3 DataSet
<a name="pDQbB"></a>
### 2.3.1 创建
使用样例类创建。
```scala
val sparkSession = SparkSession.builder().appName("createDataFrameDemo").master("local[*]").getOrCreate()
import sparkSession.implicits._
val ds = Seq(People("zhangsan", 14),People("lisi", 15)).toDS()
ds.show()
2.3.2 RDD转换为DataSet
SparkSQL支持直接将包含有样例类的RDD转换为DataFrame。
2.3.3 DataSet转换为RDD
2.4 DataFrame和DataSet的转换
2.4.1 DataFrame转DataSet
使用样例类转换。
// 给出每一列的类型后,使用as方法转换
import ss.implicits._
val df = ss.read.json("Spark/src/main/resources/json.json")
val ds = df.as[People]
ds.show()
2.4.2 DataSet转DataFrame
2.5 RDD、DataFrame、DataSet
2.5.1 共性
- 分布式弹性数据集
- 懒计算
-
2.5.2 差别
RDD
- 不支持sparksql操作
- 一般与spark mlib同时使用
- DataFrame
- DataFrame每一行的类型固定为Row,即DataSet[Row],必须解析才能取到各个字段的值
- 支持sparksql
- 不与spark mlib联用
- DataSet
- 继承Aggregator实现聚合函数
- zero:初始化中间值buffer
- reduce:分区内计算,结果存入中间值buffer
- merge:合并中间值buffer
- finish:使用最终的中间值buffer计算结果
- bufferEncoder:设定中间值类型的编码器,使用Encoders.product转换为样例类
- outputEncoder:设定输出结果类型的编码器,使用Encoders中定义好的编码器