概念

join算子相当于内连接,将两个RDD中的key相同的数据匹配,如果key匹配不上,那么数据不关联

注意:
1 如果某一个 RDD 有重复的 Key, 则会分别与另外一个 RDD 的相同的 Key进行组合.
2 也支持外连接: leftOuterJoin, rightOuterJoin, and fullOuterJoin.

案例

join

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object demo {
  4. def main(args: Array[String]): Unit = {
  5. val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
  6. val sc: SparkContext = new SparkContext(conf)
  7. //3.1 创建第一个RDD
  8. val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
  9. //3.2 创建第二个pairRDD
  10. val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (4, 6), (2, 8)))
  11. //join算子相当于内连接,将两个RDD中的key相同的数据匹配,如果key匹配不上,那么数据不关联
  12. val newRDD: RDD[(Int, (String, Int))] = rdd.join(rdd1)
  13. println(newRDD.collect().mkString(",")) //输出: (1,(a,4)),(2,(b,5)),(2,(b,8))
  14. val newRDD2: RDD[(Int, (Int, String))] = rdd1.join(rdd)
  15. println(newRDD2.collect().mkString(",")) //输出: (1,(4,a)),(2,(5,b)),(2,(8,b))
  16. sc.stop()
  17. }
  18. }

leftOuterJoin和rightOuterJoin和fullOuterJoin

fullOuterJoin 是全外连接
leftOuterJoin 左连接
rightOuterJoin 右连接



import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object demo {
  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)

    //3.1 创建第一个RDD
    val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
    //3.2 创建第二个pairRDD
    val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (4, 6), (2, 8)))

    //join算子相当于内连接,将两个RDD中的key相同的数据匹配,如果key匹配不上,那么数据不关联
    val newRDD: RDD[(Int, (String, Option[Int]))] = rdd.leftOuterJoin(rdd1)
    println(newRDD.collect().mkString(",")) //输出:   (1,(a,Some(4))),(2,(b,Some(5))),(2,(b,Some(8))),(3,(c,None))
    val newRDD2: RDD[(Int, (Option[String], Int))] = rdd.rightOuterJoin(rdd1)
    println(newRDD2.collect().mkString(",")) //输出: (1,(Some(a),4)),(2,(Some(b),5)),(2,(Some(b),8)),(4,(None,6))
    sc.stop()
  }
}