1. SprakCore
2. RDD之装饰者模式
我们的bufferreadReader就是典型的装饰者模式,通过原有类不断增强功能,并保持原有类的功能
装饰者模式:动态地将责任附加到对象上。若要扩展功能,装饰者提供了比继承更有弹性的替代方案。
并且拥有一个特殊的特征 只有最外层的装饰者调用时 才会调用内部原有的装饰对象 称为惰性加载
而我们RDD也是通过装饰者马上来包装不断装饰原有的功能
3. RDD 弹性分布式数据集
4. RDD 特性
5. RDD 编程
算子:从认知心理学角度来讲,解决问题其实是将问题的初始状态,通过一系列的转换操作(operator),变成解决状态。
5.1. RDD创建
在Spark中创建RDD的创建方式可以分为三种:从集合中创建RDD、从外部存储创建RDD、从其他RDD创建。
5.1.1. 从集合中创建RDD
- sc.parallelize(list)
- sc.makeRDD(list)
package com.atguigu.spark.day02
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
//通过读取内存集合中的数据 创建RDD
object Spark01_CreateRDD_mem {
def main(args: Array[String]): Unit = {
//Spark配置文件对象
val conf: SparkConf = new SparkConf().setAppName(" Spark01_CreateRDD_mem").setMaster("local[*]")
//创建SparkContext对象
val sc: SparkContext = new SparkContext(conf)
//创建一个集合
val list: List[Int] = List(1, 2, 3, 4)
//根据集合创建RDD 方式一
// val rdd: RDD[Int] = sc.parallelize(list)
//根据集合创建RDD 方式二 底层调用是parallelize方法
val rdd: RDD[Int] = sc.makeRDD(list)
rdd.collect().foreach(println)
sc.stop()
}
}
5.1.2. 从外部存储系统的数据集创建
- sc.textFile(path)
package com.atguigu.spark.day02
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
//通过读取外部文件 创建RDD
object Spark02_CreateRDD_file {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("Spark02_CreateRDD_file").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
//读取本地文件数据
val rdd: RDD[String] = sc.textFile("D:\\code\\spark\\input\\1.txt")
rdd.collect().foreach(println)
//从HDFS读取数据
val hdfsRdd: RDD[String] = sc.textFile("hdfs://hadoop102:8020/input")
hdfsRdd.collect().foreach(println)
sc.stop()
}
}
5.2. 分区规则
5.2.1. 默认分区规则
- rdd.partitions 查看分区
package com.atguigu.spark.day02
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partition, SparkConf, SparkContext}
/**
* 默认分区
* - 从集合中创建RDD
* 取决于分配给应用的CPU的核数 如为* 则为cpu全部核数
* - 读取外部文件创建RDD
* math.min(取决于分配给应用的CPU的核数,2)
*/
object Spark03_Partition_default {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("Spark03_Partition_default").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
//通过集合创建RDD
// val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val rdd: RDD[String] = sc.textFile("D:\\code\\spark\\input\\1.txt")
//查看分区效果
val partitions: Array[Partition] = rdd.partitions
println(partitions.size) //分区数
rdd.saveAsTextFile("D:\\code\\spark\\output")
sc.stop()
}
}
5.2.2. 指定分区
- sc.makeRDD(data , partition)
package com.atguigu.spark.day02
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partition, SparkConf, SparkContext}
/**
* 指定分区分区
* -根据下标与分区数进行运算 求出 [x,y) 开始到结束下标元素的具体分区分配
* - start (i * arr.lent) / partition
* - end ((i +1) * arr.lent ) / partition
*/
object Spark04_Partition_mem {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("Spark04_Partition_mem").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
//通过集合创建RDD
// val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),4) //默认分区数为cpu核数
//1)4个数据,设置4个分区,输出:0分区->1,1分区->2,2分区->3,3分区->4
//val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4), 4)
//2)4个数据,设置3个分区,输出:0分区->1,1分区->2,2分区->3,4
//val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4), 3)
//3)5个数据,设置3个分区,输出:0分区->1,1分区->2、3,2分区->4、5
val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5), 3)
//查看分区效果
val partitions: Array[Partition] = rdd.partitions
println(partitions.size) //分区数
rdd.saveAsTextFile("D:\\code\\spark\\output")
sc.stop()
}
}
5.2.3. 读取文件指定分区
package com.atguigu.spark.day02
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 读取外部文件 创建RDD
* - 默认分区规则
* math.min(分配的核数,2)
* - 指定分区 minPartitions 最小分区数 并不是实际分区个数
* - 在实际计算分区个数的时候 会根据文件的总大小和 最小分区数进行相除运算
* - 如果余数为0 最小分区数为实际分区数
* - 如果余数不为0 则实际分区数要看实际切片
*
*/
object Spark05_Partition_file {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("Spark05_Partition_file").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
// val rdd: RDD[String] = sc.textFile("D:\\code\\spark\\input\\1.txt") //默认分区为2个
//2)输入数据1-4,每行一个数字;输出:0=>{1、2} 1=>{3} 2=>{4} 3=>{空}
//val rdd: RDD[String] = sc.textFile("input/3.txt",3)
//3)输入数据1-4,一共一行;输出:0=>{1234} 1=>{空} 2=>{空} 3=>{空}
val rdd: RDD[String] = sc.textFile("input/4.txt", 3)
rdd.saveAsTextFile("D:\\code\\spark\\output")
sc.stop()
}
}
getSplits文件返回的是切片规划,真正读取是在compute方法中创建LineRecordReader读取的,有两个关键变量
start=split.getStart()
end = start + split.getLength
假设读取text.txt 指定最小分区为5
abc
ef
g
hj
klm
此文件总大小为19字节 19%5=3 每个分区每次追加个数要求为 19/6=3字节 19字节总大小/3每次字节=6个分区 余 1 所有实际分区数为 6+1 =7
如果 目前总大小/(总大小 / 最小分区数) < 1.1 则不切片 则以指定分区数 详情源码看上图的while循环
分区0的数据为 每个分区每次读取3字节 因为读取是读取一行如果这一行数据超过了每次读取则整行读取 并读取下个分区
0~3索引的数据
abc\r\n
分区1的数据为 3~6索引的数据
ef\r\n
分区2的数据为 6~9索引的数据
g\r\n
分区3数据为 9~12索引的数据
hj\r\n
分区4数据为 12~15索引的数据
分区5数据为 15~18索引的数据
klm\r
分区6数据为 18~1索引的数据