概念
join算子相当于内连接,将两个RDD中的key相同的数据匹配,如果key匹配不上,那么数据不关联
注意:
1 如果某一个 RDD 有重复的 Key, 则会分别与另外一个 RDD 的相同的 Key进行组合.
2 也支持外连接: leftOuterJoin, rightOuterJoin, and fullOuterJoin.
案例
join
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object demo {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
//3.1 创建第一个RDD
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
//3.2 创建第二个pairRDD
val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (4, 6), (2, 8)))
//join算子相当于内连接,将两个RDD中的key相同的数据匹配,如果key匹配不上,那么数据不关联
val newRDD: RDD[(Int, (String, Int))] = rdd.join(rdd1)
println(newRDD.collect().mkString(",")) //输出: (1,(a,4)),(2,(b,5)),(2,(b,8))
val newRDD2: RDD[(Int, (Int, String))] = rdd1.join(rdd)
println(newRDD2.collect().mkString(",")) //输出: (1,(4,a)),(2,(5,b)),(2,(8,b))
sc.stop()
}
}
leftOuterJoin和rightOuterJoin和fullOuterJoin
fullOuterJoin 是全外连接
leftOuterJoin 左连接
rightOuterJoin 右连接
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object demo {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
//3.1 创建第一个RDD
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
//3.2 创建第二个pairRDD
val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (4, 6), (2, 8)))
//join算子相当于内连接,将两个RDD中的key相同的数据匹配,如果key匹配不上,那么数据不关联
val newRDD: RDD[(Int, (String, Option[Int]))] = rdd.leftOuterJoin(rdd1)
println(newRDD.collect().mkString(",")) //输出: (1,(a,Some(4))),(2,(b,Some(5))),(2,(b,Some(8))),(3,(c,None))
val newRDD2: RDD[(Int, (Option[String], Int))] = rdd.rightOuterJoin(rdd1)
println(newRDD2.collect().mkString(",")) //输出: (1,(Some(a),4)),(2,(Some(b),5)),(2,(Some(b),8)),(4,(None,6))
sc.stop()
}
}