如何创建RDD?
创建RDD有三种方式
基于集合创建RDD:使用sparkContext的parallelize()方法,第一个参数传入集合,第二个参数传入partition数量。Spark会为每个partition执行一个task
val spark = SparkSession.builder().getOrCreate()val arr = Array(1,2,3,4)val rdd = spark.sparkContext.parallelize(arr,4) //基于Array创建一个4分区的rdd
基于本地或HDFS文件创建RDD:使用sparkContext的textFile()方法,第一个参数传入文件路径,第二个参数传入partition数量
val spark = SparkSession.builder().appName("WordCount").getOrCreate() val text = spark.sparkContext.textFile("/path/words.txt",3)
Spark中对RDD的操作有哪些?
- 在Spark中,对RDD的操作只有两种,Transformation 和 Action
Transformation
- 是对已有的RDD转化为新的RDD,如flatMap、Map等操作
- lazy特性,在没有执行Action之前,所有的操作都只是得到一个逻辑上的RDD,内存中没有任何数据
Action
- 是对RDD最后的操作,如foreach,reduce,返回结果给Driver进程等操作
- 只有当执行到Action代码,才会触发之前所有的Transformation算子的执行
Transformation算子实战

val sc = SparkSession.builder().getOrCreate().sparkContext
//map算子:集合每个元素乘2
sc.parallelize(Array(1,2,3,4,5)).map(_ * 2)
//filter算子:过滤集合中的偶数
sc.parallelize(Array(1,2,3,4,5)).filter(_ % 2 == 0)
//flatMap算子:把每行字符串拆分成单词
sc.parallelize(Array("ns tql","jk tcl")).flatMap(_.split(" "))
//groupByKey算子:对<<出生地,姓名>>集合根据出生地分组
sc.parallelize(Array(("wuxi","ns"),("shandong","jk1"),("wuxi","jk2"))).groupByKey()
//reduceByKey算子:对<<word,1>>集合计算每个word出现的次数
sc.parallelize(Array(("ns",1),("jk",1),("ns",1))).reduceByKey(_+_)
//sortByKey算子:对<<收入,姓名>>集合根据收入降序排序
sc.parallelize(Array((10000,"ns"),(100,"jk"))).sortByKey(false)
//join算子:对<<姓名,收入>>和<<姓名,出生地>>两个集合基于姓名进行合并
sc.parallelize(Array("ns",10000),("jk",100)).join(sc.parallelize(Array(("ns","wuxi"))))
合并结果是:Array(("ns",(10000,"wuxi")),("jk",100))
//distinct算子:去除集合中重复元素
sc.parallelize(Array(1,3,1,2,3))
注意,map和flatMap的区别
- 对于一维数组是没区别的
- 对于二维集合的情况
- map是对外层集合中的每个集合的每个元素进行操作,然后返回二维集合
- flatMap首先要把二维集合先flat成一维集合,然后对每个元素进行操作,然后返回一维集合
Action算子实战

val sc = SparkSession.builder().getOrCreate().sparkContext
//reduce算子:求数组元素的和
sc.parallelize(Array(1,2,3)).reduce(_+_)
//collect算子:返回RDD中的元素集合
val res = sc.parallelize(Array(1,2,3)).collect()
返回的是:Array(1,2,3)
//take(n)算子:获取RDD中前2个元素
val res = sc.parallelize(Array(1,2,3)).take(2)
返回的是:Array(1,2)
//count算子:获取RDD元素个数
val res = sc.parallelize(Array(1,2,3)).count()
返回的是:3
//saveAsTextFile算子:保存RDD中元素到HDFS上去
sc.parallelize(Array(1,2,3)).saveAsTextFile("hdfs://hdfs路径")
//countByKey算子:对计算元祖的每个Key出现的次数
sc.parallelize(Array(("ns",100),("ns",12),("jk",14))).countByKey()
返回的是:("ns",2),("jk",1)
//foreach算子:遍历输出RDD元素
sc.parallelize(Array(1,2,3)).foreach(println(_))
