1. SprakCore

2. RDD之装饰者模式

我们的bufferreadReader就是典型的装饰者模式,通过原有类不断增强功能,并保持原有类的功能

装饰者模式:动态地将责任附加到对象上。若要扩展功能,装饰者提供了比继承更有弹性的替代方案。

03. SprakCore - 图1

并且拥有一个特殊的特征 只有最外层的装饰者调用时 才会调用内部原有的装饰对象 称为惰性加载

03. SprakCore - 图2

而我们RDD也是通过装饰者马上来包装不断装饰原有的功能

03. SprakCore - 图3

3. RDD 弹性分布式数据集

03. SprakCore - 图4

4. RDD 特性

03. SprakCore - 图5

5. RDD 编程

03. SprakCore - 图6

算子:从认知心理学角度来讲,解决问题其实是将问题的初始状态,通过一系列的转换操作(operator),变成解决状态。

5.1. RDD创建

在Spark中创建RDD的创建方式可以分为三种:从集合中创建RDD、从外部存储创建RDD、从其他RDD创建。

5.1.1. 从集合中创建RDD

  • sc.parallelize(list)
  • sc.makeRDD(list)
  1. package com.atguigu.spark.day02
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. //通过读取内存集合中的数据 创建RDD
  5. object Spark01_CreateRDD_mem {
  6. def main(args: Array[String]): Unit = {
  7. //Spark配置文件对象
  8. val conf: SparkConf = new SparkConf().setAppName(" Spark01_CreateRDD_mem").setMaster("local[*]")
  9. //创建SparkContext对象
  10. val sc: SparkContext = new SparkContext(conf)
  11. //创建一个集合
  12. val list: List[Int] = List(1, 2, 3, 4)
  13. //根据集合创建RDD 方式一
  14. // val rdd: RDD[Int] = sc.parallelize(list)
  15. //根据集合创建RDD 方式二 底层调用是parallelize方法
  16. val rdd: RDD[Int] = sc.makeRDD(list)
  17. rdd.collect().foreach(println)
  18. sc.stop()
  19. }
  20. }

5.1.2. 从外部存储系统的数据集创建

  • sc.textFile(path)
  1. package com.atguigu.spark.day02
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. //通过读取外部文件 创建RDD
  5. object Spark02_CreateRDD_file {
  6. def main(args: Array[String]): Unit = {
  7. val conf: SparkConf = new SparkConf().setAppName("Spark02_CreateRDD_file").setMaster("local[*]")
  8. val sc: SparkContext = new SparkContext(conf)
  9. //读取本地文件数据
  10. val rdd: RDD[String] = sc.textFile("D:\\code\\spark\\input\\1.txt")
  11. rdd.collect().foreach(println)
  12. //从HDFS读取数据
  13. val hdfsRdd: RDD[String] = sc.textFile("hdfs://hadoop102:8020/input")
  14. hdfsRdd.collect().foreach(println)
  15. sc.stop()
  16. }
  17. }

5.2. 分区规则

5.2.1. 默认分区规则

  • rdd.partitions 查看分区
  1. package com.atguigu.spark.day02
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{Partition, SparkConf, SparkContext}
  4. /**
  5. * 默认分区
  6. * - 从集合中创建RDD
  7. * 取决于分配给应用的CPU的核数 如为* 则为cpu全部核数
  8. * - 读取外部文件创建RDD
  9. * math.min(取决于分配给应用的CPU的核数,2)
  10. */
  11. object Spark03_Partition_default {
  12. def main(args: Array[String]): Unit = {
  13. val conf: SparkConf = new SparkConf().setAppName("Spark03_Partition_default").setMaster("local[*]")
  14. val sc: SparkContext = new SparkContext(conf)
  15. //通过集合创建RDD
  16. // val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
  17. val rdd: RDD[String] = sc.textFile("D:\\code\\spark\\input\\1.txt")
  18. //查看分区效果
  19. val partitions: Array[Partition] = rdd.partitions
  20. println(partitions.size) //分区数
  21. rdd.saveAsTextFile("D:\\code\\spark\\output")
  22. sc.stop()
  23. }
  24. }

03. SprakCore - 图7

5.2.2. 指定分区

  • sc.makeRDD(data , partition)
  1. package com.atguigu.spark.day02
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{Partition, SparkConf, SparkContext}
  4. /**
  5. * 指定分区分区
  6. * -根据下标与分区数进行运算 求出 [x,y) 开始到结束下标元素的具体分区分配
  7. * - start (i * arr.lent) / partition
  8. * - end ((i +1) * arr.lent ) / partition
  9. */
  10. object Spark04_Partition_mem {
  11. def main(args: Array[String]): Unit = {
  12. val conf: SparkConf = new SparkConf().setAppName("Spark04_Partition_mem").setMaster("local[*]")
  13. val sc: SparkContext = new SparkContext(conf)
  14. //通过集合创建RDD
  15. // val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),4) //默认分区数为cpu核数
  16. //1)4个数据,设置4个分区,输出:0分区->1,1分区->2,2分区->3,3分区->4
  17. //val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4), 4)
  18. //2)4个数据,设置3个分区,输出:0分区->1,1分区->2,2分区->3,4
  19. //val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4), 3)
  20. //3)5个数据,设置3个分区,输出:0分区->1,1分区->2、3,2分区->4、5
  21. val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5), 3)
  22. //查看分区效果
  23. val partitions: Array[Partition] = rdd.partitions
  24. println(partitions.size) //分区数
  25. rdd.saveAsTextFile("D:\\code\\spark\\output")
  26. sc.stop()
  27. }
  28. }

03. SprakCore - 图8

5.2.3. 读取文件指定分区

  1. package com.atguigu.spark.day02
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. /**
  5. * 读取外部文件 创建RDD
  6. * - 默认分区规则
  7. * math.min(分配的核数,2)
  8. * - 指定分区 minPartitions 最小分区数 并不是实际分区个数
  9. * - 在实际计算分区个数的时候 会根据文件的总大小和 最小分区数进行相除运算
  10. * - 如果余数为0 最小分区数为实际分区数
  11. * - 如果余数不为0 则实际分区数要看实际切片
  12. *
  13. */
  14. object Spark05_Partition_file {
  15. def main(args: Array[String]): Unit = {
  16. val conf: SparkConf = new SparkConf().setAppName("Spark05_Partition_file").setMaster("local[*]")
  17. val sc: SparkContext = new SparkContext(conf)
  18. // val rdd: RDD[String] = sc.textFile("D:\\code\\spark\\input\\1.txt") //默认分区为2个
  19. //2)输入数据1-4,每行一个数字;输出:0=>{1、2} 1=>{3} 2=>{4} 3=>{空}
  20. //val rdd: RDD[String] = sc.textFile("input/3.txt",3)
  21. //3)输入数据1-4,一共一行;输出:0=>{1234} 1=>{空} 2=>{空} 3=>{空}
  22. val rdd: RDD[String] = sc.textFile("input/4.txt", 3)
  23. rdd.saveAsTextFile("D:\\code\\spark\\output")
  24. sc.stop()
  25. }
  26. }

03. SprakCore - 图9

getSplits文件返回的是切片规划,真正读取是在compute方法中创建LineRecordReader读取的,有两个关键变量

start=split.getStart()

end = start + split.getLength

假设读取text.txt 指定最小分区为5

  1. abc
  2. ef
  3. g
  4. hj
  5. klm

此文件总大小为19字节 19%5=3 每个分区每次追加个数要求为 19/6=3字节 19字节总大小/3每次字节=6个分区 余 1 所有实际分区数为 6+1 =7

如果 目前总大小/(总大小 / 最小分区数) < 1.1 则不切片 则以指定分区数 详情源码看上图的while循环

分区0的数据为 每个分区每次读取3字节 因为读取是读取一行如果这一行数据超过了每次读取则整行读取 并读取下个分区

0~3索引的数据

  1. abc\r\n

分区1的数据为 3~6索引的数据

  1. ef\r\n

分区2的数据为 6~9索引的数据

  1. g\r\n

分区3数据为 9~12索引的数据

  1. hj\r\n

分区4数据为 12~15索引的数据

分区5数据为 15~18索引的数据

  1. klm\r

分区6数据为 18~1索引的数据