一 读取数据源

基于内存

  1. object Spark01_RDD_Memory {
  2. def main(args: Array[String]): Unit = {
  3. val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
  4. val sc: SparkContext = new SparkContext(sparkConf)
  5. /**
  6. * 基于内存 当作数据源
  7. */
  8. val seq: Seq[Int] = Seq(1, 2, 3)
  9. val rdd: RDD[Int] = sc.makeRDD(seq)
  10. rdd.collect().foreach(print)
  11. sc.stop()
  12. }
  13. }


基于文件

textFile: 以行 为单位来读取数据,读取的数据都是字符串

  1. object Spark02_RDD_File {
  2. def main(args: Array[String]): Unit = {
  3. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD1")
  4. val sc: SparkContext = new SparkContext(sparkConf)
  5. // 1 绝对路径/相对路径
  6. // 2 指定文件
  7. // 3 指定目录(选中所有目录下所有文件)
  8. val rdd: RDD[String] = sc.textFile("datas")
  9. rdd.collect().foreach(println)
  10. // 4 通配符 * 模糊匹配 如 "datas/1.*.txt"
  11. // 5 可以是分布式存储系统路径:HDFS
  12. sc.stop()
  13. }
  14. }

wholeTextFiles:以文件为单位读取数据

  1. object Spark02_RDD_File1 {
  2. def main(args: Array[String]): Unit = {
  3. val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD3")
  4. val sc: SparkContext = new SparkContext(sparkConf)
  5. // 读取的结果表示为元组,第一个元素表示文件路径,第二个元素表示文件内容
  6. val rdd: RDD[(String, String)] = sc.wholeTextFiles("datas")
  7. rdd.collect().foreach(println)
  8. sc.stop()
  9. }
  10. }

image.png

并行度和分区

  • Driver能够将一个作业切分成多个任务后,发送给Executor节点并行计算
  • 能并行计算的任务数叫并行度,这个数量能在构建RDD时指定
  • RDD的计算一个分区内的数据是一个一个执行逻辑,只有一个执行完才会执行下一个

1.基于内存

1.1 分区设定

makeRDD方法可以传递第二个参数,这个参数表示分区的数量

  1. //TODO 准备环境
  2. val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD4")
  3. val sc: SparkContext = new SparkContext(sparkConf)
  4. //TODO 创建RDD
  5. // 第二个参数不传递会采用默认值 :defaultParallelism
  6. // scheduler.conf.getInt("spark.default.parallelism", totalCores)
  7. // 如果获取不到,那么就使用totalCores属性:为当前环境的最大可用核数
  8. // 设置分区 1. spark.default.parallelism 2. makeRDD(list,n)
  9. val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3),2)
  10. //将处理的数据保存成分区文件
  11. rdd.saveAsTextFile("output")
  12. //TODO 关闭
  13. sc.stop()

源码
image.png
image.png

image.png

1.2 分区数据的分配

集合长度 分区数量

image.png

  1. 如果数据源是 List12345 分区数量 3
  2. 那么计算得出 01)(13)(35
  3. 前闭后开
  4. 分区1 1
  5. 分区2 2 3
  6. 分区3: 4 5

2. 基于文件

2.1 分区设定

  • textFile第二个参数指定分区数
  • minPartitions:真正的分区数量可能比 设定/默认 的数量大
  1. //TODO 准备环境
  2. val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD4")
  3. val sc: SparkContext = new SparkContext(sparkConf)
  4. //TODO 创建RDD
  5. // 1.textFile可以将文件作为数据处理的数据源,第二个参数指定分区数
  6. // minPartitions:最小分区数
  7. // math.min(defaultParallelism,2) // 8,2
  8. // 2.如果不实用默认,也可以直接指定
  9. // 分区数的计算方式:
  10. // 文件总字节大小 = size
  11. // 平均每个分区存放 = size / minPartitions
  12. // (size % 平均大小 )> 0.1 ==> 分区 + 1
  13. val rdd: RDD[String] = sc.textFile("datas/3.txt")
  14. rdd.saveAsTextFile("output")
  15. //TODO 关闭
  16. sc.stop()
  1. 总字节 / 分区数量 = 分区平均字节量
  2. 总字节 /分区平均字节量 = 分区数量

2.2 分区数据的 分配

  • 读取文件和hadoop一样,是一行一行的读取
  • 读取之后进行分区时用到了字节
  • 如果数据源是多个文件,那么计算分区时是以文件为单位进行分区

例子:

  1. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("app")
  2. val sc = new SparkContext(sparkConf)
  3. val rdd = sc.textFile("datas/1.txt",2)

image.png

最终:会有两个文件(两个分区)