概述

全称为Resilient Distributed Datasets,弹性分布式数据集,是Spark中最基本的数据(计算)抽象,它代表一个不可变,可分区,里面的元素可并行计算的集合。RDD在逻辑上是一个数据集,在物理上则可以分块分布在不同的机器上并发运行。RDD允许用户在执行多个查询时显示的将工作缓存在内存中,后续的查询能够重用工作集,这极大的提升了查询速度。
在Spark 中,对数据的所有操作不外乎创建RDD,转换已有RDD以及调用RDD操作进行求值,每个RDD都被分为多个分区,这些分区运行在集群的不同节点上,RDD可以包含Python,Java,Scala中任意类型的对象,甚至可以是用户自定义对象。
RDD是Spark的核心,也是整个Spark的架构基础。它的特性可以总结如下:

  • 它是不变的数据结构存储
  • 它是支持跨集群的分布式数据结构
  • 可以根据数据记录的key对结构进行分区
  • 提供了粗粒度的操作,且这些操作都支持分区
  • 它将数据存储在内存中,从而提供了低延迟性

    弹性

    弹性即是RDD的每个分区的大小都是不固定的,不像hdfs那样,每个数据块就是128MB。因此,RDD是有弹性的。
    分布式即是RDD的每个分区分布在集群的各个节点上,而非集中存在于一个节点。

    创建RDD

    并行集合(数组)

    方法:parallelize
  1. val conf = new SparkConf().setAppName("CreateRDDApp").setMaster("local")
  2. val sc = new SparkContext(conf)
  3. val rdd = sc.makeRDD(Array(1,2,3,4,5))
  4. val r = sc.parallelize(Array(1,2,3,4,5))

Spark——RDD概述 - 图1
执行spark-shell

  1. # spark-shell
  2. Spark context available as 'sc' (master = local[*], app id = local-1565534714937).
  3. Spark session available as 'spark'.
  4. Welcome to
  5. ____ __
  6. / __/__ ___ _____/ /__
  7. _\ \/ _ \/ _ `/ __/ '_/
  8. /___/ .__/\_,_/_/ /_/\_\ version 2.3.3
  9. /_/
  10. Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_222)
  11. Type in expressions to have them evaluated.
  12. Type :help for more information.
  13. scala> val array = Array(1,2,3,4,5)
  14. array: Array[Int] = Array(1, 2, 3, 4, 5)
  15. scala> val rdd = sc.parallelize(array)
  16. rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26

文件系统中加载

SparkContext通过textFile()读取数据生成内存中的RDD
支持的数据类型:

  • 本地文件系统(file://)
  • 分布式文件系统HDFS加载数据(hdfs://)
  • 云端Amazon S3
  1. // 通过外部数据
  2. sc.textFile("") // 本地路径 或者hdfs

Spark——RDD概述 - 图2
文件系统读读取
设置文件word.txt

  1. Hadoop is good
  2. Spark is fast
  3. Spark is better
  1. val conf = new SparkConf().setAppName("SparkWordCount").
  2. setMaster("local[*]")
  3. val sc = new SparkContext(conf)
  4. val lines= sc.textFile("file:///Users/baxiang/BigData/SparkNote/src/word.txt")
  5. lines.collect().foreach(println)

操作

支持两种操作转化操作和行动操作。RDD的转化操作是返回一个新的RDD的操作,map和filter
行动操作则是驱动器程序返回结果或把结果写入外部系统的操作 count,first.
Spark——RDD概述 - 图3
Spark采用惰性计算模式,RDD只有第一次在一个行动操作中得到时,才会真正计算,spark可以优化整个计算过程,默认情况下,spark的RDD会在每次他们进行行动操作是重新计算。
如果需要多个行动中重用一个RDD,可以使用RDD.persist()让Spark把这个RDD缓存下来。
Spark——RDD概述 - 图4
Spark——RDD概述 - 图5

filter过滤

筛选出满足函数func的元素,并返回一个新的数据集
Spark——RDD概述 - 图6
Spark——RDD概述 - 图7

map

将每个元素传递到函数func中,并将结果返回为一个新的数据集
Spark——RDD概述 - 图8
Spark——RDD概述 - 图9
Spark——RDD概述 - 图10
Spark——RDD概述 - 图11

flatMap(func)

与map()相似,但每个输入元素都可以映射到0或多个输出结果
Spark——RDD概述 - 图12
Spark——RDD概述 - 图13

groupByKey()

应用于(K,V)键值对的数据集时,返回一个新的(K,Iterable)形式的数据集
Spark——RDD概述 - 图14

reduceByKey(func)

应用于(K,V)键值对的数据集时,返回一个新的(K,V)形式的数据集,其中每个值是将每个key传递到函数func中进行聚合后的结果。

  • 进行分组得到(key,value-list)
  • 根据传入的函数再将value-list做操作
    Spark——RDD概述 - 图15
    打印元素
    在实际编程中,我们经常需要把RDD中的元素打印输出到屏幕上(标准输出stdout),一般会采用语句rdd.foreach(println)或者rdd.map(println)。当采用本地模式(local)在单机上执行时,这些语句会打印出一个RDD中的所有元素。但是,当采用集群模式执行时,在worker节点上执行打印语句是输出到worker节点的stdout中,而不是输出到任务控制节点Driver Program中,因此,任务控制节点Driver Program中的stdout是不会显示打印语句的这些输出内容的。为了能够把所有worker节点上的打印输出信息也显示到Driver Program中,可以使用collect()方法,比如,rdd.collect().foreach(println),但是,由于collect()方法会把各个worker节点上的所有RDD元素都抓取到Driver Program中,因此,这可能会导致内存溢出。因此,当你只需要打印RDD的部分元素时,可以采用语句rdd.take(100).foreach(println)。