1. SprakCore
2. RDD之装饰者模式
我们的bufferreadReader就是典型的装饰者模式,通过原有类不断增强功能,并保持原有类的功能
装饰者模式:动态地将责任附加到对象上。若要扩展功能,装饰者提供了比继承更有弹性的替代方案。

并且拥有一个特殊的特征 只有最外层的装饰者调用时 才会调用内部原有的装饰对象 称为惰性加载

而我们RDD也是通过装饰者马上来包装不断装饰原有的功能

3. RDD 弹性分布式数据集

4. RDD 特性

5. RDD 编程

算子:从认知心理学角度来讲,解决问题其实是将问题的初始状态,通过一系列的转换操作(operator),变成解决状态。
5.1. RDD创建
在Spark中创建RDD的创建方式可以分为三种:从集合中创建RDD、从外部存储创建RDD、从其他RDD创建。
5.1.1. 从集合中创建RDD
- sc.parallelize(list)
- sc.makeRDD(list)
package com.atguigu.spark.day02import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}//通过读取内存集合中的数据 创建RDDobject Spark01_CreateRDD_mem {def main(args: Array[String]): Unit = {//Spark配置文件对象val conf: SparkConf = new SparkConf().setAppName(" Spark01_CreateRDD_mem").setMaster("local[*]")//创建SparkContext对象val sc: SparkContext = new SparkContext(conf)//创建一个集合val list: List[Int] = List(1, 2, 3, 4)//根据集合创建RDD 方式一// val rdd: RDD[Int] = sc.parallelize(list)//根据集合创建RDD 方式二 底层调用是parallelize方法val rdd: RDD[Int] = sc.makeRDD(list)rdd.collect().foreach(println)sc.stop()}}
5.1.2. 从外部存储系统的数据集创建
- sc.textFile(path)
package com.atguigu.spark.day02import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}//通过读取外部文件 创建RDDobject Spark02_CreateRDD_file {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("Spark02_CreateRDD_file").setMaster("local[*]")val sc: SparkContext = new SparkContext(conf)//读取本地文件数据val rdd: RDD[String] = sc.textFile("D:\\code\\spark\\input\\1.txt")rdd.collect().foreach(println)//从HDFS读取数据val hdfsRdd: RDD[String] = sc.textFile("hdfs://hadoop102:8020/input")hdfsRdd.collect().foreach(println)sc.stop()}}
5.2. 分区规则
5.2.1. 默认分区规则
- rdd.partitions 查看分区
package com.atguigu.spark.day02import org.apache.spark.rdd.RDDimport org.apache.spark.{Partition, SparkConf, SparkContext}/*** 默认分区* - 从集合中创建RDD* 取决于分配给应用的CPU的核数 如为* 则为cpu全部核数* - 读取外部文件创建RDD* math.min(取决于分配给应用的CPU的核数,2)*/object Spark03_Partition_default {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("Spark03_Partition_default").setMaster("local[*]")val sc: SparkContext = new SparkContext(conf)//通过集合创建RDD// val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))val rdd: RDD[String] = sc.textFile("D:\\code\\spark\\input\\1.txt")//查看分区效果val partitions: Array[Partition] = rdd.partitionsprintln(partitions.size) //分区数rdd.saveAsTextFile("D:\\code\\spark\\output")sc.stop()}}

5.2.2. 指定分区
- sc.makeRDD(data , partition)
package com.atguigu.spark.day02import org.apache.spark.rdd.RDDimport org.apache.spark.{Partition, SparkConf, SparkContext}/*** 指定分区分区* -根据下标与分区数进行运算 求出 [x,y) 开始到结束下标元素的具体分区分配* - start (i * arr.lent) / partition* - end ((i +1) * arr.lent ) / partition*/object Spark04_Partition_mem {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("Spark04_Partition_mem").setMaster("local[*]")val sc: SparkContext = new SparkContext(conf)//通过集合创建RDD// val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),4) //默认分区数为cpu核数//1)4个数据,设置4个分区,输出:0分区->1,1分区->2,2分区->3,3分区->4//val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4), 4)//2)4个数据,设置3个分区,输出:0分区->1,1分区->2,2分区->3,4//val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4), 3)//3)5个数据,设置3个分区,输出:0分区->1,1分区->2、3,2分区->4、5val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5), 3)//查看分区效果val partitions: Array[Partition] = rdd.partitionsprintln(partitions.size) //分区数rdd.saveAsTextFile("D:\\code\\spark\\output")sc.stop()}}

5.2.3. 读取文件指定分区
package com.atguigu.spark.day02import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}/*** 读取外部文件 创建RDD* - 默认分区规则* math.min(分配的核数,2)* - 指定分区 minPartitions 最小分区数 并不是实际分区个数* - 在实际计算分区个数的时候 会根据文件的总大小和 最小分区数进行相除运算* - 如果余数为0 最小分区数为实际分区数* - 如果余数不为0 则实际分区数要看实际切片**/object Spark05_Partition_file {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("Spark05_Partition_file").setMaster("local[*]")val sc: SparkContext = new SparkContext(conf)// val rdd: RDD[String] = sc.textFile("D:\\code\\spark\\input\\1.txt") //默认分区为2个//2)输入数据1-4,每行一个数字;输出:0=>{1、2} 1=>{3} 2=>{4} 3=>{空}//val rdd: RDD[String] = sc.textFile("input/3.txt",3)//3)输入数据1-4,一共一行;输出:0=>{1234} 1=>{空} 2=>{空} 3=>{空}val rdd: RDD[String] = sc.textFile("input/4.txt", 3)rdd.saveAsTextFile("D:\\code\\spark\\output")sc.stop()}}

getSplits文件返回的是切片规划,真正读取是在compute方法中创建LineRecordReader读取的,有两个关键变量
start=split.getStart()
end = start + split.getLength
假设读取text.txt 指定最小分区为5
abcefghjklm
此文件总大小为19字节 19%5=3 每个分区每次追加个数要求为 19/6=3字节 19字节总大小/3每次字节=6个分区 余 1 所有实际分区数为 6+1 =7
如果 目前总大小/(总大小 / 最小分区数) < 1.1 则不切片 则以指定分区数 详情源码看上图的while循环
分区0的数据为 每个分区每次读取3字节 因为读取是读取一行如果这一行数据超过了每次读取则整行读取 并读取下个分区
0~3索引的数据
abc\r\n
分区1的数据为 3~6索引的数据
ef\r\n
分区2的数据为 6~9索引的数据
g\r\n
分区3数据为 9~12索引的数据
hj\r\n
分区4数据为 12~15索引的数据
分区5数据为 15~18索引的数据
klm\r
分区6数据为 18~1索引的数据
