2.1 DataFrame

2.2.1 创建

  • 从Spark数据源进行创建
  • 从RDD进行转换
  • 从Hive Table进行查询返回

    2.2.2 SQL风格语法

    ```scala val sparkSession = SparkSession.builder().appName(“creatDataFrameDemo”).master(“local[*]”).getOrCreate() val df = sparkSession.read.json(“Spark/src/main/resources/json.json”)

df.createTempView(“data”) val sqlDf = sparkSession.sql(“SELECT * FROM data”) sqlDf.show()

  1. <a name="i39WN"></a>
  2. ### 2.2.3 RDD转换为DataFrame
  3. - 手动确定转换(按顺序匹配)
  4. - 反射转换(样例类)
  5. <a name="A5rFi"></a>
  6. ### 2.2.4 DataFrame转换为RDD
  7. 直接调用rdd。
  8. <a name="ASuTZ"></a>
  9. ## 2.3 DataSet
  10. <a name="pDQbB"></a>
  11. ### 2.3.1 创建
  12. 使用样例类创建。
  13. ```scala
  14. val sparkSession = SparkSession.builder().appName("createDataFrameDemo").master("local[*]").getOrCreate()
  15. import sparkSession.implicits._
  16. val ds = Seq(People("zhangsan", 14),People("lisi", 15)).toDS()
  17. ds.show()

2.3.2 RDD转换为DataSet

SparkSQL支持直接将包含有样例类的RDD转换为DataFrame。

2.3.3 DataSet转换为RDD

直接调用rdd。

2.4 DataFrame和DataSet的转换

2.4.1 DataFrame转DataSet

使用样例类转换。

// 给出每一列的类型后,使用as方法转换
import ss.implicits._
val df = ss.read.json("Spark/src/main/resources/json.json")
val ds = df.as[People]
ds.show()

2.4.2 DataSet转DataFrame

调用toDF直接转换。

2.5 RDD、DataFrame、DataSet

2.5.1 共性

  • 分布式弹性数据集
  • 懒计算
  • 会根据spark的内存情况自动缓存

    2.5.2 差别

  • RDD

    • 不支持sparksql操作
    • 一般与spark mlib同时使用
  • DataFrame
    • DataFrame每一行的类型固定为Row,即DataSet[Row],必须解析才能取到各个字段的值
    • 支持sparksql
    • 不与spark mlib联用
  • DataSet
    • 每一行每一列都有固定的类型
    • 支持sparksql
    • 不与spark mlib联用

      2.6 用户自定义函数

      2.6.1 UDF

      spark.udf.register(“funcName”, function defination)

      2.6.2 UDAF

  1. 继承Aggregator实现聚合函数
    1. zero:初始化中间值buffer
    2. reduce:分区内计算,结果存入中间值buffer
    3. merge:合并中间值buffer
    4. finish:使用最终的中间值buffer计算结果
    5. bufferEncoder:设定中间值类型的编码器,使用Encoders.product转换为样例类
    6. outputEncoder:设定输出结果类型的编码器,使用Encoders中定义好的编码器