1、reduce算子

按照官网的解释,传入的参数是一个函数,一个双参数,返回值唯一的函数,建议,该函数是可交换的,是可联合的,如此,才能实现正确的并行计算。
20180305104803274.png
这里的函数,我平时用过加法操作,最大值操作,最小值操作;记得大数据里提过这个概念,诸如此类可以合并操作的函数,是最合适的,平均值类的,就不适用于此种操作了。
对于Int类型的,就类似于求和操作;如果对于String类型的,则会实现拼接操作。

  1. scala> val rdd1 = sc.makeRDD(1 to 10)
  2. rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at makeRDD at <console>:24
  3. scala> rdd1.reduce(_+_)
  4. res3: Int = 55

2、collect算子

在driver的程序中,以数组的形式,返回数据集的所有元素,这通常会在使用filter或者其它操作后,返回一个足够小的数据子集再使用。
将计算结果回收到Driver端。当数据量很大时就不要回收了,会造成oom。

  1. scala> var rdd1 = sc.makeRDD(1 to 10)
  2. rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at makeRDD at <console>:24
  3. scala> rdd1.collect
  4. res2: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

3、count算子

返回数据集的元素个数

  1. scala> val rdd1 = sc.makeRDD(1 to 10)
  2. rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at makeRDD at <console>:24
  3. scala> rdd1.count
  4. res4: Long = 10

4、first算子

返回数据集的第一个元素(类似于take(1))。

  1. scala> val rdd1 = sc.makeRDD(1 to 10)
  2. rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at <console>:24
  3. scala> rdd1.first
  4. res5: Int = 1

5、take算子

返回一个数组,由数据集的前n个元素组成。注意此操作目前并非并行执行的,而是driver程序所在机器。

  1. scala> val rdd1 = sc.makeRDD(1 to 10)
  2. rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at makeRDD at <console>:24
  3. scala> rdd1.take(3)
  4. res6: Array[Int] = Array(1, 2, 3)

6、takeSample算子

takeSample(withReplacement,num,seed)
withReplacement:结果中是否可重复
num:取多少个
seed:随机种子
返回一个数组,在数据集中随机采样num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定的随机数生成器种子
原理:takeSample()函数和sample函数是一个原理,但是不使用相对比例采样,而是按设定的采样个数进行采样,同时返回结果不再是RDD,而是相当于对采样后的数据进行collect(),返回结果的集合为单机的数组

7、takeOrdered算子

takeOrdered和top类似,只不过以和top相反的顺序返回元素。
top默认倒序,takeOrdered默认正序
top方法其实就是调用的taskOrdered,然后反转的结果

  1. def top(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
  2. takeOrdered(num)(ord.reverse)
  3. }
  4. scala> val rdd1 = sc.makeRDD(1 to 10)
  5. rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at makeRDD at <console>:24
  6. scala> rdd1.top(5)
  7. res22: Array[Int] = Array(10, 9, 8, 7, 6)
  8. scala> rdd1.takeOrdered(5)
  9. res23: Array[Int] = Array(1, 2, 3, 4, 5)

8、saveAsTextFile算子

saveAsTextFile用于将RDD以文本文件的格式存储到文件系统中

  1. val conf = new SparkConf()
  2. .setAppName("saveFile")
  3. .setMaster("local[*]")
  4. val sc = new SparkContext(conf)
  5. val rdd1: RDD[Int] = sc.parallelize(1 to 10)
  6. rdd1.repartition(1).saveAsTextFile("/tmp/fff")

9、saveAsSequenceFile算子

saveAsSequenceFile用于将RDD以SequenceFile的文件格式保存到HDFS上。使用方法和saveAsTextFile类似

10、saveAsObjectFile算子

saveAsObjectFile用于将RDD中的元素序列化成对象,存储到文件中。使用方法和saveAsTextFile类似

11、countByKey算子

对(K,V)类型的RDD有效,返回一个(K,Int)对的map,表示每一个可以对应的元素个数

  1. scala> val rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",3)))
  2. rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[3] at makeRDD at <console>:24
  3. scala> rdd1.countByKey
  4. res1: scala.collection.Map[String,Long] = Map(B -> 2, A -> 2, C -> 1)

12、countByValue算子

根据数据集每个元素相同的内容来计数。返回相同内容的元素对应的条数。

  1. object Operator_countByValue {
  2. def main(args: Array[String]): Unit = {
  3. val conf = new SparkConf()
  4. conf.setMaster("local").setAppName("countByValue")
  5. val sc = new SparkContext(conf)
  6. val rdd1 = sc.makeRDD(List("a","a","b"))
  7. val rdd2 = rdd1.countByValue()
  8. rdd2.foreach(println)
  9. sc.stop()
  10. }
  11. }
  12. //结果:
  13. (a,2)
  14. (b,1)

13、foreach算子

在数据集的每一个元素上,运行函数func,t通常用于更新一个累加器变量,或者和外部存储系统做交互

  1. scala> val rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",3)))
  2. rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[9] at makeRDD at <console>:24
  3. scala> rdd1.collect.foreach(println(_))
  4. (A,0)
  5. (A,2)
  6. (B,1)
  7. (B,2)
  8. (C,3)
  9. scala> rdd1.foreach(println(_))