1.1 为什么需要RDD
分布式计算需要:
• 分区控制
• Shuffle控制
• 数据存储\序列化\发送
• 数据计算API
• 等一系列功能
这些功能, 不能简单的通过Python内置的本地集合对象(如 List\ 字典等)去完成. 我们在分布式框架中, 需要有一个统一的数据抽象对象, 来实现上述分布式计算所需功能. 这个抽象对象, 就是RDD。
1.2 什么是RDD
Spark起源 在Spark开山之作Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing这篇paper中(以下简称 RDD Paper),Matei等人提出了RDD这种数据结构,文中开头对RDD的定义 是 
RDD定义 RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可 分区、里面的元素可并行计算的集合。
Dataset:一个数据集合,用于存放数据的。
Distributed:RDD中的数据是分布式存储的,可用于分布式计算。
Resilient:RDD中的数据可以存储在内存中或者磁盘中。
RDD有5个特点:
1、RDD是Spark提供的核心抽象,全称为Resillient Distributed Dataset,即弹性分布式数据集。
2、RDD在抽象上来说是一种元素集合,包含了数据。它是被分区的,分为多个分区,每个分区分布在集群中的不同节点上,从而让RDD中的数据可以被并行操作。(分布式数据集)
3、RDD通常通过Hadoop上的文件,即HDFS文件或者Hive表,来进行创建;有时也可以通过应用程序中的集合来创建。
4、RDD最重要的特性就是,提供了容错性,可以自动从节点失败中恢复过来。即如果某个节点上的RDD partition,因为节点故障,导致数据丢了,那么RDD会自动通过自己的数据来源重新计算该partition。这一切对使用者是透明的。
5、RDD的数据默认情况下存放在内存中的,但是在内存资源不足时,Spark会自动将RDD数据写入磁盘。(弹性
1.3 RDD特性
1.a list of partiotioner
一组分片(partition),即数据集的基本组成单位,对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。每个分配的存储是由BlockManager实现的,每个分区都会被逻辑映射成BlockManager的一个Block,而这个Block会被一个Task负责计算。
2.a function for partiotioner
Spark中的RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。比如说一个分区有1,2,3 在rdd1.map(10),把RDD里面的每一个元素取出来乘以10,每个分片都应用这个map的函数。
*3.A list of dependencies on other RDDs
RDD之间有一系列的依赖rdd1.map(10).flatMap(..).map(..).reduceByKey(…),构建成为DAG,这个DAG会构造成很多个阶段,这些阶段叫做stage,RDDstage之间会有依赖关系,后面根据前面的依赖关系来构建,如果前面的数据丢了,它会记住前面的依赖,从前面进行重新恢复。每一个算子都会产生新的RDD。textFile 与flatMap会产生两个RDD.
4.Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
可选:key-value型的RDD是根据哈希来分区的,类似于mapreduce当中的paritioner接口,控制Key分到哪个reduce。
一个partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个基于范围的RangePartitioner。只有对于key-value的RDD,才会有Partitioner,非key-value的RDD的Partitioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
*5.Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。
# coding:utf8# 导入Spark的相关包from pyspark import SparkConf, SparkContextif __name__ == '__main__':# 0. 初始化执行环境 构建SparkContext对象conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)# 演示通过并行化集合的方式去创建RDD, 本地集合 -> 分布式对象(RDD)rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9])# parallelize方法, 没有给定 分区数, 默认分区数是多少? 根据CPU核心来定print("默认分区数: ", rdd.getNumPartitions())rdd = sc.parallelize([1, 2, 3], 3)print("分区数: ", rdd.getNumPartitions())# collect方法, 是将RDD(分布式对象)中每个分区的数据, 都发送到Driver中, 形成一个Python List对象# collect: 分布式 转 -> 本地集合print("rdd的内容是: ", rdd.collect())
默认分区数: 12分区数: 3rdd的内容是: [1, 2, 3]
