引入 import spark.implicits._

image.png

一 RDD和DataFrame

1.1 RDD 转换为 DataFrame

实际开发中,一般通过样例类将 RDD 转换为 DataFrame

  1. scala> case class User(name:String, age:Int)
  2. defined class User
  3. scala> sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1,t._2)).toDF.show
  4. +--------+---+
  5. | name|age|
  6. +--------+---+
  7. |zhangsan| 30|
  8. | lisi| 40|
  9. +--------+---+

1.2 DataFrame 转换为 RDD

  1. scala> val df = sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1, t._2)).toDF
  2. df: org.apache.spark.sql.DataFrame = [name: string, age: int]
  3. scala> val rdd = df.rdd
  4. rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[46]
  5. at rdd at <console>:25

注意:此时得到的 RDD 存储类型为 Row

二 RDD和DataSet

2.1 RDD 转换为 DataSet

  1. scala> case class User(name:String, age:Int)
  2. defined class User
  3. scala> sc.makeRDD(List( ("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS
  4. res11: org.apache.spark.sql.Dataset[User] = [name: string, age: int]

2.2 DataSet 转换为 RDD

  1. scala> case class User(name:String, age:Int)
  2. defined class User
  3. scala> sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS
  4. res11: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
  5. scala> val rdd = res11.rdd
  6. rdd: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[51] at rdd at <console>:25
  7. scala> rdd.collect
  8. res12: Array[User] = Array(User(zhangsan,30), User(lisi,49))

注意:DataSet 其实也是对RDD 的封装,所以可以直接获取内部的RDD

三 DataFrame 和 DataSet

3.1 DataFrame 转换为 DataSet

  1. scala> case class User(name:String, age:Int)
  2. defined class User
  3. scala> val df = sc.makeRDD(List(("zhangsan",30),("lisi",49))).toDF("name","age")
  4. df: org.apache.spark.sql.DataFrame = [name: string, age: int]
  5. scala> val ds = df.as[User]
  6. ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]

3.2 DataSet 转换为 DataFrame

  1. scala> val ds = df.as[User]
  2. ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
  3. scala> val df = ds.toDF
  4. df: org.apache.spark.sql.DataFrame = [name: string, age: int]