Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:

  • RDD : 弹性分布式数据集
  • 累加器:分布式共享只写变量
  • 广播变量:分布式共享只读变量

接下来我们一起看看这三大数据结构是如何在数据处理中使用的。

一、RDD

1、介绍

1、概念

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

  • 弹性

    • 存储的弹性:内存与磁盘的自动切换;
    • 容错的弹性:数据丢失可以自动恢复;
    • 计算的弹性:计算出错重试机制;
    • 分片的弹性:可根据需要重新分片。
  • 分布式:数据存储在大数据集群不同节点上

  • 数据集:RDD 封装了计算逻辑,并不保存数据
  • 数据抽象:RDD 是一个抽象类,需要子类具体实现
  • 不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的 RDD,在新的 RDD 里面封装计算逻辑
  • 可分区、并行计算

RDD的数据处理方式类似于IO流,也有装饰者设计模式
RDD的数据只有在调用collect方法时,才会真正执行业务逻辑操作。之前都不会
RDD是不保存数据的,但是IO可以临时保存一部分数据在缓冲区中。

2、核心属性

  1. * Internally, each RDD is characterized by five main properties:
  2. *
  3. * - A list of partitions
  4. * - A function for computing each split
  5. * - A list of dependencies on other RDDs
  6. * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
  7. * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
  8. * an HDFS file)
  • 分区列表

RDD 数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性。

  1. /**
  2. * Implemented by subclasses to return the set of partitions in this RDD. This method will only
  3. * be called once, so it is safe to implement a time-consuming computation in it.
  4. *
  5. * The partitions in this array must satisfy the following property:
  6. * `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
  7. */
  8. protected def getPartitions: Array[Partition]
  • 分区计算函数

Spark 在计算时,是使用分区函数对每一个分区进行计算

  1. /**
  2. * :: DeveloperApi ::
  3. * Implemented by subclasses to compute a given partition.
  4. */
  5. @DeveloperApi
  6. def compute(split: Partition, context: TaskContext): Iterator[T]
  • RDD 之间的依赖关系

RDD 是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个 RDD 建立依赖关系

  1. /**
  2. * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
  3. * be called once, so it is safe to implement a time-consuming computation in it.
  4. */
  5. protected def getDependencies: Seq[Dependency[_]] = deps
  • 分区器(可选)

当数据为 KV 类型数据时,可以通过设定分区器自定义数据的分区

  1. /** Optionally overridden by subclasses to specify how they are partitioned. */
  2. @transient val partitioner: Option[Partitioner] = None
  • 首选位置(可选)

计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算

  1. /**
  2. * Optionally overridden by subclasses to specify placement preferences.
  3. */
  4. protected def getPreferredLocations(split: Partition): Seq[String] = Nil

3、执行原理

从计算的角度来讲,数据处理过程中需要计算资源(内存 & CPU)和计算模型(逻辑)。执行时,需要将计算资源和计算模型进行协调和整合。

Spark 框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的计算任务。然后将任务发到已经分配资源的计算节点上, 按照指定的计算模型进行数据计算。最后得到计算结果。

RDD 是 Spark 框架中用于数据处理的核心模型,接下来我们看看,在 Yarn 环境中,RDD的工作原理:

1、启动Yarn集群
image.png

2、Spark通过申请资源创建调度节点和计算节点
image.png
3、Spark框架根据需求计算逻辑根据分区划分成不同的任务
image.png
4、调度节点根据计算节点状态将任务发送到对应的计算节点进行计算
image.png

2、基础使用

1、创建RDD

在 Spark 中创建 RDD 的创建方式可以分为四种:

  • 从集合(内存)中创建 RDD

从集合中创建 RDD,Spark 主要提供了两个方法:parallelize 和 makeRDD

  1. object RDD_Memory {
  2. def main(args: Array[String]): Unit = {
  3. val conf = new SparkConf().setMaster("local[*]").setAppName("RDD_MEMORY")
  4. val context = new SparkContext(conf)
  5. // 创建rdd
  6. // seq: Seq[T],
  7. // numSlices: Int = defaultParallelism
  8. val seq: Seq[Int] = Seq(1, 2, 3)
  9. val rdd: RDD[Int] = context.parallelize(seq);
  10. // 其实makeRDD就是封装了parallelize方法
  11. val rdd2: RDD[Int] = context.makeRDD(seq)
  12. // rdd.collect.foreach(println)
  13. rdd2.collect.foreach(println)
  14. context.stop()
  15. }
  16. }

从底层代码实现来讲,makeRDD 方法其实就是 parallelize 方法

  1. def makeRDD[T: ClassTag](
  2. seq: Seq[T],
  3. numSlices: Int = defaultParallelism): RDD[T] = withScope {
  4. // 调用parallelize()方法
  5. parallelize(seq, numSlices)
  6. }
  • 从外部存储(文件)创建 RDD

    • textFile:以行为单位读取数据,不考虑文件来自哪里 ```scala object RDD_File {

    def main(args: Array[String]): Unit = {

    val context = new SparkContext(new SparkConf().setMaster(“local[*]”).setAppName(“RDD_MEMORY”))

    // 创建rdd // 1、path路径默认从当前环境的根路径为基准,可以绝对和相对路径 // val rdd1: RDD[String] = context.textFile(“datas/1.txt”) // 2、path路径可以是具体的文件,也可以是目录,读取该目录下所有文件 val rdd2: RDD[String] = context.textFile(“datas”) // 3、path路径可以使用通配符 val rdd3: RDD[String] = context.textFile(“datas/1.txt”) // 只读取1的文件 // 4、path路径也可以是分布式存储系统,如HDFS,Ceph、Nfs等 context.textFile(“hdfs://hadoop:9820/hello”)

    rdd1.collect.foreach(println)

  1. context.stop()

} }

  1. - whoTextFiles:以文件为单位读取数据,读取的结果表示为元组,第一个元素表示文件路径,第二个元素表示文件内容
  2. ```scala
  3. // 获取文件来源
  4. val rdd5: RDD[(String, String)] = context.wholeTextFiles("datas")
  5. rdd5.collect.foreach(x => println(x._2))
  • 从其他 RDD 创建
    • 主要是通过一个 RDD 运算完后,再产生新的 RDD。
  • 直接创建 RDD(new)
    • 使用 new 的方式直接构造 RDD,一般由 Spark 框架自身使用。

2、并行度和分区

1、集合数据的分配

默认情况下,Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能够并行计算的任务数量我们称之为并行度。这个数量可以在构建 RDD 时指定。记住,这里的并行执行的任务数量,并不是指的切分任务的数量,不要混淆了。

我们在使用parallelize方法的时候,可以传入第二个参数:numSlices: Int = defaultParallelism,默认并行度。
来到TaskScheduler特质中

  1. // Get the default level of parallelism to use in the cluster, as a hint for sizing jobs.
  2. def defaultParallelism(): Int

找其实现类
image.png

  1. override def defaultParallelism(): Int = backend.defaultParallelism()

来到SchedulerBackend特质中

  1. def defaultParallelism(): Int

继续找其实现类
image.png
分模式来找对应的实现
本地实现:找spark.default.parallelism配置项的值,如果没有设置就用默认totalCores,本地所有cpu核数

  1. override def defaultParallelism(): Int =
  2. scheduler.conf.getInt("spark.default.parallelism", totalCores)

集群模式中,如果没有设置,就使用cpu总核数或者2中的最大值

  1. override def defaultParallelism(): Int = {
  2. conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
  3. }

可以使用saveAsObjectFile将rdd产出文件保存到目录中

  1. val rdd2: RDD[Int] = context.makeRDD(seq)
  2. rdd2.saveAsObjectFile("datas/output")

image.png
指定数量之后:

  1. val rdd2: RDD[Int] = context.makeRDD(seq,3)
  2. rdd2.saveAsObjectFile("datas/output")

image.png

那么这个切分,是按照什么切分呢?切分后的数据到底存放在哪里呢?
比如一个5长度的数组,切分3个分区,数据怎么分呢?

来深入源码分析:
此时5个元素,三个分片
1、先来到SparkContextparallelize()方法

  1. def parallelize[T: ClassTag](
  2. seq: Seq[T],
  3. numSlices: Int = defaultParallelism): RDD[T] = withScope {
  4. assertNotStopped()
  5. // 创建一个ParallelCollectionRDD对象,
  6. new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
  7. }

2、创建ParallelCollectionRDD对象

  1. private[spark] class ParallelCollectionRDD[T: ClassTag](
  2. sc: SparkContext,
  3. @transient private val data: Seq[T],
  4. numSlices: Int,
  5. locationPrefs: Map[Int, Seq[String]])
  6. extends RDD[T](sc, Nil) {
  7. // 在这里获取分区数量
  8. override def getPartitions: Array[Partition] = {
  9. // 获取分片
  10. val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
  11. slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
  12. }

3、进入ParallelCollectionRDD.slice(data, numSlices)方法,求出数据如何分片

  1. def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
  2. // 分片小于1抛出异常
  3. if (numSlices < 1) {
  4. throw new IllegalArgumentException("Positive number of partitions required")
  5. }
  6. // Sequences need to be sliced at the same set of index positions for operations
  7. // like RDD.zip() to behave as expected
  8. // 求出位置 5 3
  9. def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
  10. // 遍历所有分片,0, 1, 2
  11. (0 until numSlices).iterator.map { i =>
  12. // start = 0 * 5 / 3 = 0
  13. // 分片 * 数据长度 / 分片数量,决定轮询到哪个分片
  14. val start = ((i * length) / numSlices).toInt
  15. // 0 + 1 * 数据长度 / 分片数量
  16. val end = (((i + 1) * length) / numSlices).toInt
  17. // 就成了 (0,1) (1,3) (3,5)
  18. (start, end)
  19. }
  20. }
  21. // 模式匹配
  22. seq match {
  23. // 不是范围
  24. case r: Range =>
  25. positions(r.length, numSlices).zipWithIndex.map { case ((start, end), index) =>
  26. // If the range is inclusive, use inclusive range for the last slice
  27. if (r.isInclusive && index == numSlices - 1) {
  28. new Range.Inclusive(r.start + start * r.step, r.end, r.step)
  29. }
  30. else {
  31. new Range(r.start + start * r.step, r.start + end * r.step, r.step)
  32. }
  33. }.toSeq.asInstanceOf[Seq[Seq[T]]]
  34. case nr: NumericRange[_] =>
  35. // For ranges of Long, Double, BigInteger, etc
  36. val slices = new ArrayBuffer[Seq[T]](numSlices)
  37. var r = nr
  38. for ((start, end) <- positions(nr.length, numSlices)) {
  39. val sliceSize = end - start
  40. slices += r.take(sliceSize).asInstanceOf[Seq[T]]
  41. r = r.drop(sliceSize)
  42. }
  43. slices
  44. // 默认兜底处理
  45. case _ =>
  46. val array = seq.toArray // To prevent O(n^2) operations for List etc
  47. // 5,3
  48. positions(array.length, numSlices).map {
  49. // // 就成了 (0,1) (1,3) (3,5)
  50. case (start, end) =>
  51. array.slice(start, end).toSeq
  52. }.toSeq
  53. }
  54. }

4、slice最终分片

  1. override def slice(from: Int, until: Int): Array[T] = {
  2. // 获取数组
  3. val reprVal = repr
  4. //
  5. val lo = math.max(from, 0)
  6. val hi = math.min(math.max(until, 0), reprVal.length)
  7. // 最大值减去最小值,决定每个分片存放数据的数量
  8. val size = math.max(hi - lo, 0)
  9. val result = java.lang.reflect.Array.newInstance(elementClass, size)
  10. if (size > 0) {
  11. // 通过copy方法,将原数组中的每个分片的位置拷贝size个元素
  12. Array.copy(reprVal, lo, result, 0, size)
  13. }
  14. result.asInstanceOf[Array[T]]
  15. }

5、copy数组元素

  1. * @param src the source array.
  2. * @param srcPos starting position in the source array.
  3. * @param dest destination array.
  4. * @param destPos starting position in the destination array.
  5. * @param length the number of array elements to be copied.
  6. *
  7. * @see `java.lang.System#arraycopy`
  8. */
  9. def copy(src: AnyRef, srcPos: Int, dest: AnyRef, destPos: Int, length: Int) {
  10. val srcClass = src.getClass
  11. if (srcClass.isArray && dest.getClass.isAssignableFrom(srcClass))
  12. java.lang.System.arraycopy(src, srcPos, dest, destPos, length)
  13. else
  14. slowcopy(src, srcPos, dest, destPos, length)
  15. }

2、文件数据的分配

读取文件数据时,数据是按照 Hadoop 文件读取的规则进行切片分区,而切片规则和数据读取的规则有些差异。
在读取文件的时候同样可以指定最小分区数量。

  1. // minPartitions: Int = defaultMinPartitions 最小分区数量
  2. val rdd1: RDD[String] = context.textFile("datas/1.txt",2)
  3. // def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

我们这个1.txt文件如下:
image.png
看着是三个字节,其实是包含了7个字节,回车和换行符。

1、textFile进行读取

  1. def textFile(
  2. path: String,
  3. minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
  4. assertNotStopped()
  5. // 来到HadoopFile,其实是由Hadoop接管了
  6. hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
  7. minPartitions).map(pair => pair._2.toString).setName(path)
  8. }

2、HadoopFile创建一个新的HadoopRDD对象。

  1. def hadoopFile[K, V](
  2. path: String,
  3. inputFormatClass: Class[_ <: InputFormat[K, V]],
  4. keyClass: Class[K],
  5. valueClass: Class[V],
  6. minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
  7. assertNotStopped()
  8. // This is a hack to enforce loading hdfs-site.xml.
  9. // See SPARK-11227 for details.
  10. FileSystem.getLocal(hadoopConfiguration)
  11. // A Hadoop configuration can be about 10 KiB, which is pretty big, so broadcast it.
  12. val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
  13. // 我们看到了熟悉的FileInputFormat,这就解释了为什么是一行一行读取。
  14. val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
  15. new HadoopRDD(
  16. this,
  17. confBroadcast,
  18. Some(setInputPathsFunc),
  19. inputFormatClass,
  20. keyClass,
  21. valueClass,
  22. minPartitions).setName(path)
  23. }

3、等到collect收集的时候
来到HadoopRDD的getPartitions方法

  1. override def getPartitions: Array[Partition] = {
  2. val jobConf = getJobConf()
  3. // add the credentials here as this can be called before SparkContext initialized
  4. SparkHadoopUtil.get.addCredentials(jobConf)
  5. try {
  6. // 从这里获取输入流,并且获取分片
  7. val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
  8. val inputSplits = if (ignoreEmptySplits) {
  9. allInputSplits.filter(_.getLength > 0)
  10. } else {
  11. allInputSplits
  12. }
  13. if (inputSplits.length == 1 && inputSplits(0).isInstanceOf[FileSplit]) {
  14. val fileSplit = inputSplits(0).asInstanceOf[FileSplit]
  15. val path = fileSplit.getPath
  16. if (fileSplit.getLength > conf.get(IO_WARNING_LARGEFILETHRESHOLD)) {
  17. val codecFactory = new CompressionCodecFactory(jobConf)
  18. if (Utils.isFileSplittable(path, codecFactory)) {
  19. logWarning(s"Loading one large file ${path.toString} with only one partition, " +
  20. s"we can increase partition numbers for improving performance.")
  21. } else {
  22. logWarning(s"Loading one large unsplittable file ${path.toString} with only one " +
  23. s"partition, because the file is compressed by unsplittable compression codec.")
  24. }
  25. }
  26. }
  27. val array = new Array[Partition](inputSplits.size)
  28. for (i <- 0 until inputSplits.size) {
  29. array(i) = new HadoopPartition(id, i, inputSplits(i))
  30. }
  31. array
  32. } catch {
  33. case e: InvalidInputException if ignoreMissingFiles =>
  34. logWarning(s"${jobConf.get(FileInputFormat.INPUT_DIR)} doesn't exist and no" +
  35. s" partitions returned from this path.", e)
  36. Array.empty[Partition]
  37. }
  38. }

4、来到Hadoop读取分片的getSplits方法,至此获取分片数量,完成任务

  1. /** Splits files returned by {@link #listStatus(JobConf)} when
  2. * they're too big.*/
  3. public InputSplit[] getSplits(JobConf job, int numSplits)
  4. throws IOException {
  5. StopWatch sw = new StopWatch().start();
  6. FileStatus[] files = listStatus(job);
  7. // Save the number of input files for metrics/loadgen
  8. job.setLong(NUM_INPUT_FILES, files.length);
  9. // 获取文件数据的总字节大小
  10. long totalSize = 0; // compute total size
  11. for (FileStatus file: files) { // check we have valid files
  12. if (file.isDirectory()) {
  13. throw new IOException("Not a file: "+ file.getPath());
  14. }
  15. totalSize += file.getLen();
  16. }
  17. // goalSize就是总字节 / 有指定几个分区,就除几,获取每个分区应该存放多少字节
  18. long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
  19. long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
  20. FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
  21. // generate splits
  22. ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
  23. NetworkTopology clusterMap = new NetworkTopology();
  24. for (FileStatus file: files) {
  25. Path path = file.getPath();
  26. long length = file.getLen();
  27. if (length != 0) {
  28. FileSystem fs = path.getFileSystem(job);
  29. BlockLocation[] blkLocations;
  30. if (file instanceof LocatedFileStatus) {
  31. blkLocations = ((LocatedFileStatus) file).getBlockLocations();
  32. } else {
  33. blkLocations = fs.getFileBlockLocations(file, 0, length);
  34. }
  35. // 如果可以分片,进行分片操作
  36. if (isSplitable(fs, path)) {
  37. long blockSize = file.getBlockSize(); // 获取块大小,集群模式128m,local模式32m
  38. // 获取切片大小,取块大小和目标分区大小的最小值
  39. long splitSize = computeSplitSize(goalSize, minSize, blockSize);
  40. long bytesRemaining = length;
  41. while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
  42. String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
  43. length-bytesRemaining, splitSize, clusterMap);
  44. splits.add(makeSplit(path, length-bytesRemaining, splitSize,
  45. splitHosts[0], splitHosts[1]));
  46. bytesRemaining -= splitSize;
  47. }
  48. if (bytesRemaining != 0) {
  49. String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
  50. - bytesRemaining, bytesRemaining, clusterMap);
  51. splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
  52. splitHosts[0], splitHosts[1]));
  53. }
  54. } else {
  55. String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
  56. splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
  57. }
  58. } else {
  59. //Create empty hosts array for zero length files
  60. splits.add(makeSplit(path, 0, length, new String[0]));
  61. }
  62. }
  63. sw.stop();
  64. if (LOG.isDebugEnabled()) {
  65. LOG.debug("Total # of splits generated by getSplits: " + splits.size()
  66. + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
  67. }
  68. return splits.toArray(new FileSplit[splits.size()]);
  69. }

分区的数据是如何分配的呢?

1、spark读取文件,是以hadoop的FileInputFormat为默认实现的,所以是一行一行读取。和字节数没有关系

2、数据读取时以偏移量为单位,并且不会重复读取

我们的数据是 1@@ ==> 012 2@@ ==> 345 3 ==> 6

3、数据分区的偏移量范围的计算

0 ==> [0,3] => 12 1 ==> [3,6] => 3 2 ==> [6,7] =>

这个读取范围两边都可以取到
,我们读取0,1,2,3。由于是按照一行一行读取的,所以读取到了数据1和2。由于能够读取到2,所以说2所在的一行已经被读取了,不能被重复读取。所以继续往下走,读取第三行的3到第二个分区中,所以第三个分区什么也没有

3、RDD算子

RDD的方法总共分两种,大致分为:

  • 转换:功能的补充和封装,将旧的RDD包装成新的RDD,比如flatMap、map操作
  • 行动:触发任务的调度和执行

RDD的方法也称为算子。
认知心理学认为解决问题其实是将问题的状态进行改变:
问题(初始) => 操作(算子) => 问题(审核中) => 操作(算子) => 问题(完成)

这个 操作翻译过来就叫做Operator,所以RDD的方法就叫做算子。

1、RDD转换算子

RDD 根据数据处理方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value类型

  • Value类型

1、map

  1. def map[U: ClassTag](f: T => U): RDD[U]

将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。

  1. object Transform_map {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
  4. val rdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4))
  5. // 定义一个转换函数,将数值都乘以2返回
  6. def double(num: Int): Int = {
  7. num * 2
  8. }
  9. val value: RDD[Int] = rdd.map(double)
  10. // 函数至简原则,能省则省,
  11. // rdd.map((num: Int) => {num * 2})
  12. // 最简单形式:
  13. rdd.map(_*2)
  14. value.collect.foreach(println)
  15. }
  16. }

小案例:收集Apache的log日志,并获取uri
apache.log
思路:日志的话,如果格式可能一直,ip地址可能不一致,所以偏移量切割不太可行。尽量按照空格等切割。apache的日志访问uri都在最后一个,其实也可以倒着切

  1. object Transform_map_test {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
  4. val rdd: RDD[String] = context.textFile("datas/apache.log")
  5. // 方便理解
  6. rdd.map(
  7. line => {
  8. val strings: Array[String] = line.split(" ")
  9. strings(6)
  10. }
  11. )
  12. // 柯里化传值
  13. val value: RDD[String] = rdd.map(_.split(" ")(6))
  14. value.collect.foreach(println)
  15. }
  16. }

map的计算其实是并行的
演示:

  1. object Transform_map_parallel_test {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
  4. val rdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4),1)
  5. val mappedRdd: RDD[Int] = rdd.map(num => {
  6. println(Thread.currentThread.getName + "=======>" + num)
  7. num
  8. })
  9. val mappedRdd1: RDD[Int] = mappedRdd.map(num => {
  10. println(Thread.currentThread.getName + "#######》" + num)
  11. num
  12. })
  13. mappedRdd1.collect.foreach(println)
  14. context.stop()
  15. }
  16. }

Rdd的计算,在一个分区内是一个一个执行逻辑,只有前一个数据的逻辑执行完毕之后才可以执行下一个数据,分区内数据是有序的。
当只有一个分区的结果:

Executor task launch worker for task 0=======>1 Executor task launch worker for task 0#######》1 Executor task launch worker for task 0=======>2 Executor task launch worker for task 0#######》2 Executor task launch worker for task 0=======>3 Executor task launch worker for task 0#######》3 Executor task launch worker for task 0=======>4 Executor task launch worker for task 0#######》4

分区数量改为2之后,同一个分区内是有序的,但是执行顺序就是并行的,谁先执行就不知道了

Executor task launch worker for task 1=======>3 Executor task launch worker for task 1#######》3 Executor task launch worker for task 1=======>4 Executor task launch worker for task 1#######》4 Executor task launch worker for task 0=======>1 Executor task launch worker for task 0#######》1 Executor task launch worker for task 0=======>2 Executor task launch worker for task 0#######》2

2、mapPartitions
按照分区进行计算
def mapPartitionsU: ClassTag: RDD[U]

  1. object MapPartition_Test {
  2. def main(args: Array[String]): Unit = {
  3. val context: SparkContext = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("mapPartition"))
  4. val rdd = context.makeRDD(List(1, 2, 3, 4), 1)
  5. // def mapPartitions[U: ClassTag](
  6. // f: Iterator[T] => Iterator[U],
  7. // preservesPartitioning: Boolean = false): RDD[U]
  8. //
  9. val value: RDD[Int] = rdd.mapPartitions(iter => {
  10. println("--------------")
  11. iter.map(_ * 2)
  12. })
  13. value.collect.foreach(println)
  14. }
  15. }

如果是两个分区的话就可以在每个分区执行一遍该操作。——-就会被输出两次。

小功能:获取每个分区的最大值

mapPartitions就可以轻松实现

  1. def main(args: Array[String]): Unit = {
  2. val context: SparkContext = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("mapPartition"))
  3. val rdd = context.makeRDD(List(1, 2, 3, 4), 2)
  4. // 但是这个mapPartitions方法,占用内存,会导致引用指向内存中的数据导致GC不会释放。所以慎用,或者考虑使用软引用
  5. // def mapPartitions[U: ClassTag](
  6. // f: Iterator[T] => Iterator[U],
  7. // preservesPartitioning: Boolean = false): RDD[U]
  8. //
  9. val value: RDD[Int] = rdd.mapPartitions(iter => {
  10. // 最外层是迭代器,所以要包装一下
  11. List(iter.max).iterator
  12. })
  13. value.collect.foreach(println)
  14. }
  15. }

思考一下:map和mapPartitions的区别

  • 数据处理角度

Map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子是以分区为单位进行批处理操作。

  • 功能的角度

Map 算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据

  • 性能的角度

Map 算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算子类似于批处理,所以性能较高。但是 mapPartitions 算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。使用 map 操作。

3、mapPartitionsWithIndex
**
def mapPartitionsWithIndexU: ClassTag => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。

获取第二个数据分区的数据

  1. object Transform_mapPartitionsWithIndex {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
  4. val rdd: RDD[Int] = context.makeRDD(List(1,2,3,4),2)
  5. // mapPartitionsWithIndex可以根据索引选择
  6. // 比如现在只要第二个分区的数据。
  7. val mappedRdd: RDD[Int] = rdd.mapPartitionsWithIndex(
  8. (index, iter) => {
  9. if (index == 1) {
  10. iter
  11. } else {
  12. // 其他分区返回null
  13. Nil.iterator
  14. }
  15. }
  16. )
  17. mappedRdd.collect.foreach(println)
  18. }
  19. }

将数据根据索引放到不同分区

  1. object Transform_mapPartitionsWithIndex_2 {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
  4. val rdd: RDD[Int] = context.makeRDD(List(1,2,3,4),2)
  5. // mapPartitionsWithIndex可以根据索引选择
  6. //
  7. val mappedRdd: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex(
  8. // 将获取到的数据,根据分区索引,将数据放到不同的分区
  9. (index, iter) => {
  10. iter.map(num => (index, num))
  11. }
  12. )
  13. mappedRdd.collect.foreach(println)
  14. }
  15. }

4、flatMap
def flatMapU: ClassTag: RDD[U]
将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射

  1. object Transform_flatMap {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
  4. val rdd: RDD[List[Int]] = context.makeRDD(List(List(1,2),List(3,4)))
  5. // 不能使用_
  6. val mappedRdd = rdd.flatMap(list => list)
  7. mappedRdd.collect.foreach(println)
  8. }
  9. }

我们进行wordCount的时候,拆分字符串就可以使用这个方法,因为String切割之后是一个个的Array[String],所以扁平映射将所有String拿出来。

小功能:将 List(List(1,2),3,List(4,5))进行扁平化操作

这个是两个List并且混杂着一个Int,所以外面不能确定元素的类型,可以考虑使用模式匹配。你不是,让你变成List类型!

  1. object Transform_flatMap_2 {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
  4. val rdd = context.makeRDD(List(List(1, 2), 3, List(4, 5)))
  5. // 不能使用_
  6. val mappedRdd = rdd.flatMap{
  7. // 不是集合,变成集合
  8. case list: List[_] => list
  9. case data => List(data)
  10. }
  11. mappedRdd.collect.foreach(println)
  12. }
  13. }

5、glom
def glom(): RDD[Array[T]]
将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变

  1. object Transform_glom {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
  4. val rdd = context.makeRDD(List(1, 2, 3, 4), 2)
  5. // glom:对同一个分区内的数据转换为相同类型数组处理,分区不变
  6. val value: RDD[Array[Int]] = rdd.glom()
  7. value.collect.foreach(array => println(array.mkString(", ")))
  8. }
  9. }

小功能:计算所有分区最大值求和(分区内取最大值,分区间最大值求和)

  1. object Transform_glom {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
  4. val rdd = context.makeRDD(List(1, 2, 3, 4), 2)
  5. // glom:对同一个分区内的数据转换为相同类型数组处理,分区不变
  6. val value = rdd.glom() // 分区内转成Array
  7. .map( _.max) // 每个分区求出最大值
  8. println(value.collect.sum) // 求和
  9. }
  10. }

6、groupBy
def groupByK(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为 shuffle。极限情况下,数据可能被分在同一个分区中

一个组的数据在一个分区中,但是并不是说一个分区中只有一个组
**

  1. object Transform_groupBy {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
  4. val rdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4), 2)
  5. // groupBy分组,按照余数作为key分组
  6. val value: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2)
  7. value.collect.foreach(println)
  8. }
  9. }

小功能:按照首字母分组

  1. object Transform_groupBy1 {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
  4. val rdd: RDD[String] = context.makeRDD(List("Hello", "Spark", "Hadoop", "Storm"), 2)
  5. // 根据首字母分组
  6. val value: RDD[(Char, Iterable[String])] = rdd.groupBy(_.charAt(0))
  7. value.collect.foreach(println)
  8. }
  9. }

小功能:分析apache.log日志中时间段的统计量

  1. object Transform_groupBy2 {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
  4. val rdd: RDD[String] = context.textFile("datas/apache.log")
  5. // 获取访问数量
  6. val value: RDD[(String, Int)] = rdd.map(
  7. f = line => {
  8. // 方式一:如果日期格式固定,通过字符串截串
  9. // 方式二:通过SimpleDateFormat格式转换后获取
  10. // 17/05/2015:10:05:03
  11. var time = line.split(" ")(3)
  12. // 获取到之后再转换成只有小时的形式
  13. val date: Date = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss").parse(time)
  14. // 返回
  15. val hour: String = new SimpleDateFormat("HH").format(date)
  16. // 返回二元组形式
  17. (hour, 1)
  18. }
  19. ).groupBy(_._1) // 根据时间分组
  20. .map { case (hour, iter) => (hour, iter.size)} // 分组之后将结果聚合,
  21. value.collect.foreach(println)
  22. }
  23. }

7、filter
def filter(f: T => Boolean): RDD[T]

将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜
**

  1. object Transform_filter {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
  4. val rdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4), 2)
  5. // 过滤数据
  6. val value: RDD[Int] = rdd.filter(_ % 2 == 0)
  7. value.collect.foreach(println)
  8. context.stop()
  9. }
  10. }

小功能:获取apache.log中访问时间在17/05/2015的

  1. object Transform_filter_Test {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
  4. val rdd: RDD[String] = context.textFile("datas/apache.log")
  5. // 过滤数据,===当然也可以使用map,但是数据会被丢弃,不如直接过滤
  6. val value: RDD[String] = rdd.filter(
  7. // 可以直接匹配以这个日期开头的
  8. line => {
  9. val time: String = line.split(" ")(3)
  10. time.startsWith("17/05/2015")
  11. }
  12. )
  13. value.collect.foreach(println)
  14. context.stop()
  15. }
  16. }

8、sample

  1. /**
  2. * Return a sampled subset of this RDD.
  3. *
  4. * @param withReplacement: 一个数是否能被抽取多次,即放回其中。
  5. * @param fraction : 期待rdd中数据的概率
  6. * 如果withReplacement设置为false,即不放回,表示每个元素被选中的可能性; fraction must be [0, 1]
  7. * 如果withReplacement设置为true,放回数据: 表示每个元素被选中的次数; fraction must be greater than or equal to 0
  8. * @param seed :随机生成的种子,同样种子每次产生的数据是相同的。默认是系统当前时间戳
  9. *
  10. * @note This is NOT guaranteed to provide exactly the fraction of the count
  11. * of the given [[RDD]].
  12. */
  13. def sample(
  14. withReplacement: Boolean,
  15. fraction: Double,
  16. seed: Long = Utils.random.nextLong): RDD[T] = {
  17. require(fraction >= 0,
  18. s"Fraction must be nonnegative, but got ${fraction}")
  19. withScope {
  20. require(fraction >= 0.0, "Negative fraction value: " + fraction)
  21. if (withReplacement) {
  22. // PoissonSampler算法保证放回
  23. new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
  24. } else {
  25. // BernoulliSampler算法保证不放回
  26. new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
  27. }
  28. }
  29. }

根据指定的规则从数据集中抽取数据

  1. object Transform_sample {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
  4. val rdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
  5. val value: RDD[Int] = rdd.sample(
  6. false // 是否放回
  7. , 0.3 // false,每个元素出现的概率,
  8. // , 1 // 种子,相同种子的时间戳生成数据一样的
  9. )
  10. println(value.collect.mkString(", "))
  11. context.stop()
  12. }
  13. }

**

思考一个问题:有啥用,抽奖吗?

其实不是这样的,主要是为了解决极端情况下的数据倾斜问题

9、distinct
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

将数据集中重复的数据去重.

  1. object Transform_distinct {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
  4. val rdd = context.makeRDD(List(1, 2, 3, 4,1, 2, 3, 4))
  5. // distinct去重
  6. val value: RDD[Int] = rdd.distinct()
  7. value.collect.foreach(println)
  8. }
  9. }

distinct是如何将数据去重的呢?
我们知道,scala中也有distinct函数,但是是使用底层HashSet数据结构实现的去重操作:

  1. def distinct: Repr = {
  2. val isImmutable = this.isInstanceOf[immutable.Seq[_]]
  3. if (isImmutable && lengthCompare(1) <= 0) repr
  4. else {
  5. val b = newBuilder
  6. // hashset
  7. val seen = new mutable.HashSet[A]()
  8. var it = this.iterator
  9. var different = false
  10. while (it.hasNext) {
  11. val next = it.next
  12. if (seen.add(next)) b += next else different = true
  13. }
  14. if (different || !isImmutable) b.result() else repr
  15. }
  16. }

但是spark中的distinct却是这样做的:

  1. /**
  2. * Return a new RDD containing the distinct elements in this RDD.
  3. */
  4. def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
  5. def removeDuplicatesInPartition(partition: Iterator[T]): Iterator[T] = {
  6. // Create an instance of external append only map which ignores values.
  7. val map = new ExternalAppendOnlyMap[T, Null, Null](
  8. createCombiner = _ => null,
  9. mergeValue = (a, b) => a,
  10. mergeCombiners = (a, b) => a)
  11. map.insertAll(partition.map(_ -> null))
  12. map.iterator.map(_._1)
  13. }
  14. // 在这里匹配可选的分区
  15. partitioner match {
  16. case Some(_) if numPartitions == partitions.length =>
  17. mapPartitions(removeDuplicatesInPartition, preservesPartitioning = true)
  18. // 来到这里兜底,将数据变成二元组形式,根据key聚合之后,只保留第一个,然后每个结果取元组第一个元素完成去重操作
  19. case _ => map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
  20. }
  21. }

10、coalesce

def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T]
根据数据量
缩减分区**,用于大数据集过滤后,提高小数据集的执行效率
当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本

  1. object Transform_coalesce {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
  4. val rdd = context.makeRDD(List(1, 2, 3, 4, 5, 6),4)
  5. // coalesce缩减分区
  6. val value: RDD[Int] = rdd.coalesce(2)
  7. value.saveAsTextFile("datas/output")
  8. context.stop()
  9. }
  10. }

分区后两个分区分别是123,456

如果是三个分区呢?

  1. object Transform_coalesce {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
  4. val rdd = context.makeRDD(List(1, 2, 3, 4, 5, 6),3)
  5. // coalesce缩减分区
  6. val value: RDD[Int] = rdd.coalesce(2)
  7. value.saveAsTextFile("datas/output")
  8. context.stop()
  9. }
  10. }

数据变成了12,3456???

因为coalesce默认是不会将数据打乱重新组合的。这样子缩减分区可能出现数据倾斜。可以使用shuffle处理。

  1. object Transform_coalesce {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
  4. val rdd = context.makeRDD(List(1, 2, 3, 4, 5, 6),3)
  5. // coalesce缩减分区
  6. // 进行shuffle洗牌
  7. val value: RDD[Int] = rdd.coalesce(2,true)
  8. value.saveAsTextFile("datas/output")
  9. context.stop()
  10. }
  11. }

数据变成了145,236,顺序完全乱了,但是数据不倾斜了。

思考:我想扩大分区怎么办?

扩大分区需要将coalesce调大并且一定要进行shuffle操作才可以打乱分区。虽然数据打乱,但是扩大了分区。

11、repartition
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
是一种简化的扩大分区,底层调用就是coalesce

  1. object Transform_repartition {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
  4. val rdd = context.makeRDD(List(1, 2, 3, 4, 5, 6),2)
  5. // coalesce缩减分区
  6. // 进行shuffle洗牌
  7. val value: RDD[Int] = rdd.repartition(3)
  8. value.saveAsTextFile("datas/output")
  9. context.stop()
  10. }
  11. }

12、sortBy
def sortByK => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程

  1. object Transform_sortBy {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
  4. val rdd = context.makeRDD(List(1, 3, 6, 4, 5, 2), 2)
  5. // 按照自身升序排列,分区数量不会改变,但是会进行shuffle打乱操作
  6. // rdd.sortBy(k => k).saveAsTextFile("datas/output")
  7. // 第二个参数控制降序排列
  8. rdd.sortBy(k => k,false).saveAsTextFile("datas/output")
  9. context.stop()
  10. }
  11. }
  • 双Value类型

13、intersection
对源 RDD 和参数 RDD 求交集后返回一个新的 RDD
14、union
对源 RDD 和参数 RDD 求并集后返回一个新的 RDD
15、subtract
以一个 RDD 元素为主,去除两个 RDD 重复元素,将其他元素保留下来。求差集D

思考:如果两个RDD数据类型不一致怎么办

16、zip
将两个 RDD 中的元素,以键值对的形式进行合并。其中,键值对中的 Key 为第 1 个 RDD中的元素,Value 为第 2 个 RDD 中的相同位置的元素。

思考:如果两个RDD数据类型不一致怎么办 思考:如果两个RDD数据分区不一致怎么办 思考:如果两个RDD分区数据量不一致怎么办

  1. object Transform_doubleValue {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
  4. val rdd1 = context.makeRDD(List(1, 2, 3, 4))
  5. val rdd2 = context.makeRDD(List(3, 4, 5, 6))
  6. // 交集
  7. val rdd3: RDD[Int] = rdd1.intersection(rdd2)
  8. println(rdd3.collect.mkString(","))
  9. // 并集
  10. val rdd4: RDD[Int] = rdd1.union(rdd2)
  11. println(rdd4.collect.mkString(","))
  12. // 差集
  13. val rdd5: RDD[Int] = rdd1.subtract(rdd2)
  14. println(rdd5.collect.mkString(","))
  15. // 拉链
  16. val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
  17. println(rdd6.collect.mkString(","))
  18. context.stop()
  19. }
  20. }

数据类型问题
交集、并集、差集的数据源必须要求数据类型一致,方法签名规定了返回泛型T

  1. def intersection(other: RDD[T]): RDD[T]

但是zip拉链可以返回不同数据类型。

  1. def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]

分区数量问题:

  1. val rdd1 = context.makeRDD(List(1, 2, 3, 4),2)
  2. val rdd2 = context.makeRDD(List(3, 4, 5, 6),3)
  3. // Can't zip RDDs with unequal numbers of partitions: List(2, 3)
  4. val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
  5. println(rdd6.collect.mkString(","))

运行时出错,不相等的数据分区,所以zip要求分区数量一致

分区数据量问题:

  1. val rdd1 = context.makeRDD(List(1, 2, 3, 4, 5, 6), 3)
  2. val rdd2 = context.makeRDD(List(3, 4, 5, 6), 3)
  3. // Can only zip RDDs with same number of elements in each partition
  4. val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
  5. println(rdd6.collect.mkString(","))

只能够对分区数据量相同的分区进行拉链

  • key-value类型

17、partitionBy

  1. /**
  2. * Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
  3. */
  4. class PairRDDFunctions[K, V](self: RDD[(K, V)])
  5. (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
  6. extends Logging with Serializable {

这个方法时PairRDDFunctions中的,并不是RDD的方法,并且只对键值类型的RDD才可以使用。
我们没有看到它和RDD的继承关系,为什么它也是RDD呢?
image.png
在RDD类中,有伴生对象RDD,定义了隐式转换函数,将键值类型的RDD转换成PairRDDFunctions

  1. implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
  2. (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
  3. new PairRDDFunctions(rdd)
  4. }

这个partitionBy方法需要传入一个Partitioner分区器,我们以它的实现类HashPartitioner来传递

  1. object Transform_partitionBy {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
  4. val rdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4, 5, 6))
  5. val mappedRdd: RDD[(Int, Int)] = rdd.map((_, 1))
  6. // RDD => PairRDDFunctions
  7. // 不是RDD的方法,但是PairRddFunctions
  8. val value: RDD[(Int, Int)] = mappedRdd.partitionBy(new HashPartitioner(2)) // 需要传入分区器,这里以Hash分区
  9. value.collect.foreach(println)
  10. context.stop()
  11. }
  12. }

来看一下这个HashPartitioner,对数据进行分区的方法

  1. class HashPartitioner(partitions: Int) extends Partitioner {
  2. require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
  3. def numPartitions: Int = partitions
  4. def getPartition(key: Any): Int = key match {
  5. case null => 0
  6. case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  7. }
  8. // 其实就是取模运算
  9. def nonNegativeMod(x: Int, mod: Int): Int = {
  10. val rawMod = x % mod
  11. rawMod + (if (rawMod < 0) mod else 0)
  12. }

思考:如果重分区的分区器和当前RDD的分区器一样怎么办?

首先第一次调用partitionBy的时候传入HashPartitioner(2),方法中会跟自身partitioner进行比较,如果相同返回self本身,不同则洗牌。

  1. def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
  2. if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
  3. throw new SparkException("HashPartitioner cannot partition array keys.")
  4. }
  5. // 这里的 == 比较在scala中就是调用equals方法进行比较
  6. if (self.partitioner == Some(partitioner)) {
  7. self
  8. } else {
  9. new ShuffledRDD[K, V, V](self, partitioner)
  10. }
  11. }

在Partitioner中的equals方法如下:

  1. override def equals(other: Any): Boolean = other match {
  2. // 如果是HashPartitioner,比较两次分区数量是否一致
  3. case h: HashPartitioner =>
  4. h.numPartitions == numPartitions
  5. // 其他返回false,可以重分区
  6. case _ =>
  7. false
  8. }

所以两次如果传入相同的HashPartitioner和相同的分区数量,是不会重新分区的,而是将原先数据直接返回。

Spark还有其他分区器吗?

image.png
Partitioner是一个抽象类,有三个子类
RangePartitioner一般在排序的时候使用。我们在使用sortBy的时候就使用了

  1. def sortBy[K](
  2. f: (T) => K,
  3. ascending: Boolean = true,
  4. numPartitions: Int = this.partitions.length)
  5. (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
  6. this.keyBy[K](f)
  7. // 这里
  8. .sortByKey(ascending, numPartitions)
  9. .values
  10. }
  1. def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
  2. : RDD[(K, V)] = self.withScope
  3. {
  4. // RangePartitioner
  5. val part = new RangePartitioner(numPartitions, self, ascending)
  6. new ShuffledRDD[K, V, V](self, part)
  7. .setKeyOrdering(if (ascending) ordering else ordering.reverse)
  8. }

PythonPartitioner是锁着的,只有在特定包权限下才可以使用

  1. private[spark] class PythonPartitioner(
  2. override val numPartitions: Int,
  3. val pyPartitionFunctionId: Long)
  4. extends Partitioner {

也可以自己继承Partitioner定制分区规则

18、reduceByKey
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
可以将数据按照相同的 Key 对 Value 进行聚合

  1. object Transform_reduceByKey {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
  4. val rdd = context.makeRDD(List(("a",1),("a",2) ,("a",3),("b",4)))
  5. // 根据相同key对数据进行两两聚合。
  6. // scala中是两两聚合,spark是基于scala,所以也是两两聚合
  7. // 如果相同的key只有一个,则不会参与聚合操作
  8. val reducedRdd: RDD[(String, Int)] = rdd.reduceByKey((x: Int, y: Int) => {
  9. println(s"x: $x , y: $y")
  10. x + y
  11. })
  12. reducedRdd.collect.foreach(println)
  13. context.stop()
  14. }
  15. }

19、groupByKey
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
将数据源的数据根据 key 对 value 进行分组

  1. object Transform_groupByKey {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
  4. val rdd = context.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)))
  5. // 根据相同key对数据分组,形成一个对偶元组,value是一个确定类型的集合
  6. val value: RDD[(String, Iterable[Int])] = rdd.groupByKey()
  7. value.collect.foreach(println)
  8. // 对比group,分组的规则不明确,可以是key或自定义规则,分组后的value,也是一个k,v形式的
  9. val value1: RDD[(String, Iterable[(String, Int)])] = rdd.groupBy(_._1)
  10. value1.collect.foreach(println)
  11. context.stop()
  12. }
  13. }

groupByKey和reduceByKey的区别?

  • 从 shuffle 的角度:reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。
  • 从功能的角度:reduceByKey 其实包含分组和聚合的功能。groupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用 groupByKey

reduceByKey分区内和分区间的计算规则是相同的。

20、aggregateByKey

  1. /**
  2. * The former operation is used for merging values within a
  3. * partition, and the latter is used for merging values between partitions. To avoid memory
  4. * allocation, both of these functions are allowed to modify and return their first argument
  5. * instead of creating a new U.
  6. 第一个参数表示分区内的聚合规则,第二个参数表示分区间的计算规则
  7. */
  8. def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
  9. combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
  10. aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)
  11. }

将数据根据不同的规则进行分区内计算和分区间计算

  1. object Transform_aggregationByKey {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
  4. val rdd = context.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)))
  5. // 柯里化传值,第一个参数是分区内聚合规则
  6. // 参数表示第一个值的初始值,比如第一个数,没有第二个数,如何比较它是大是小
  7. // 第二个参数是分区间聚合规则
  8. // 参数第一个表示序列的比较规则
  9. // 参数第二个表示聚合的操作
  10. val value: RDD[(String, Int)] = rdd.aggregateByKey(0)(
  11. // 分区内计算规则
  12. (x, y) => math.max(x, y),
  13. // 分区间计算规则
  14. (x, y) => x + y
  15. )
  16. value.collect.foreach(println)
  17. context.stop()
  18. }
  19. }

小练习:求相同key的平均值

**

  1. object Transform_aggregationByKey_Test {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
  4. val rdd = context.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)))
  5. // 获取函数平均值
  6. val value: RDD[(String, Int)] = rdd.aggregateByKey((0, 0))(
  7. // 转换成元组
  8. (t, v) => {
  9. // 第一个表示为总和,第二个为该key出现的次数
  10. (t._1 + v, t._2 + 1)
  11. },
  12. (t1, t2) => {
  13. // 聚合
  14. (t1._1 + t2._1, t1._2 + t2._2)
  15. }
  16. )
  17. // 聚合之后根据key将value进行映射
  18. .mapValues {
  19. // 获取平均值
  20. case (sum, count) => sum / count
  21. }
  22. value.collect.foreach(println)
  23. context.stop()
  24. }
  25. }


21、foldByKey**
def foldByKey(
zeroValue: V,
partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]

当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey。

  1. object Transform_foldByKey {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
  4. val rdd = context.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)),2)
  5. // 聚合计算时,分区内和分区间计算规则相同,可以简化
  6. val value: RDD[(String, Int)] = rdd.foldByKey(0)(_ + _)
  7. value.collect.foreach(println)
  8. context.stop()
  9. }
  10. }

22、combineByKey
def combineByKeyC => C,
mergeCombiners: (C, C) => C,
numPartitions: Int): RDD[(K, C)]

最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。

aggregationByKey的数据还需要一个初始值,我们可不可以把第一个出现的值作为初始值呢?是不是就省去了一些操作

  1. object Transform_combineByKey {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
  4. val rdd = context.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)))
  5. // 获取函数平均值
  6. // 第一个参数表示第一个值的转换,
  7. // 第二个参数表示分区内的计算规则
  8. // 第三个参数表示分区间的计算规则
  9. val value: RDD[(String, Int)] = rdd.combineByKey(v => (v,1),
  10. // 需要加上类型声明
  11. (t: (Int,Int), v) => {
  12. (t._1 + v,t._2 + 1)
  13. },
  14. (t1: (Int,Int),t2: (Int,Int)) => {
  15. (t1._1 + t2._1,t1._2 + t2._2)
  16. }
  17. )
  18. // 聚合之后根据key将value进行映射
  19. .mapValues {
  20. // 获取平均值
  21. case (sum, count) => sum / count
  22. }
  23. value.collect.foreach(println)
  24. context.stop()
  25. }
  26. }

四个聚合算子的区别:
本质都是调用combineByKeyWithClassTag方法,区别在于计算规则和初始值的规则

  1. object Transform_aggregations_distinction {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
  4. val rdd = context.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)))
  5. // 四种聚合算子的区别
  6. /*
  7. aggregateByKey:
  8. combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v), // 初始值进行转换
  9. cleanedSeqOp // 分区内计算规则
  10. , combOp // 分区间计算规则
  11. , partitioner) // 分区器
  12. foldByKey:
  13. combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v), // 初始值进行转换
  14. cleanedFunc // 分区内计算规则
  15. , cleanedFunc // 分区间计算规则
  16. , partitioner) // 分区器
  17. reduceByKey:
  18. combineByKeyWithClassTag[V]((v: V) => v // 初始值进行转换
  19. , func // 分区内计算规则
  20. , func // 分区间计算规则
  21. , partitioner) // 分区器
  22. combineByKey:
  23. combineByKeyWithClassTag(createCombiner // 第一个值作为初始值
  24. , mergeValue // 分区内计算规则
  25. , mergeCombiners // 分区间计算规则
  26. , defaultPartitioner(self)) // 分区器
  27. */
  28. rdd.aggregateByKey(0)(_+_,_+_)
  29. rdd.foldByKey(0)(_+_)
  30. rdd.reduceByKey(_+_)
  31. rdd.combineByKey(v=>v,(a: Int,b) => a+b,(a: Int,b: Int) => a+b)
  32. context.stop()
  33. }
  34. }

24、join
**
def joinW]): RDD[(K, (V, W))]
在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的(K,(V,W))的 RDD

  1. object Transform_join {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
  4. val rdd = context.makeRDD(List(("a",1),("a",2),("c",3)))
  5. val rdd2 = context.makeRDD(List(("a",4),("a",5),("f",6)))
  6. // join : 两个不同数据源的数据,相同的key的value会连接在一起,形成元组
  7. // 如果两个数据源key没有匹配上,那么数据不会出现在结果中
  8. // 如果两个数据源key有多个相同的,会依次进行匹配,可能出现笛卡尔乘积,数据量增加。
  9. val value: RDD[(String, (Int, Int))] = rdd.join(rdd2)
  10. value.collect.foreach(println)
  11. context.stop()
  12. }
  13. }

25、leftOuterJoin
def leftOuterJoinW],
partitioner: Partitioner): RDD[(K, (V, Option[W]))]
类似于 SQL 语句的左外连接

26、rightOuterJoin
def rightOuterJoinW], partitioner: Partitioner)
: RDD[(K, (Option[V], W))]
类似于SQL语句的右外连接

  1. object Transform_leftOuterJoin {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
  4. val rdd = context.makeRDD(List(("a",1),("b",2),("c",3)))
  5. val rdd2 = context.makeRDD(List(("a",4),("b",5),("c",6)))
  6. // leftOuterJoin
  7. val value: RDD[(String, (Int, Option[Int]))] = rdd.leftOuterJoin(rdd2)
  8. // rightOuterJoin
  9. val value1: RDD[(String, (Option[Int], Int))] = rdd.rightOuterJoin(rdd2)
  10. value.collect.foreach(println)
  11. value1.collect.foreach(println)
  12. context.stop()
  13. }
  14. }

27、cogroup
def cogroupW1, W2, W3], other2: RDD[(K, W2)], other3: RDD[(K, W3)])
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]

在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD

  1. object Transform_cogroup {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
  4. val rdd = context.makeRDD(List(("a",1),("b",2),("c",3),("c",6)))
  5. val rdd2 = context.makeRDD(List(("a",4),("b",5)))
  6. val rdd3 = context.makeRDD(List(("a",4),("b",5),("d",5)))
  7. val rdd4 = context.makeRDD(List(("a",4),("b",5),("c",2)))
  8. // cogroup: connect + group
  9. //把相同key连接在一起,value是一个个组,如果没有就是()空的
  10. // 如果一个数据源中多个相同key,连接放在一起
  11. // 最多可以同时连接三个group
  12. val value: RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int], Iterable[Int]))] = rdd.cogroup(rdd2,rdd3,rdd4)
  13. value.collect().foreach(println)
  14. context.stop()
  15. }
  16. }

2、案例实操

agent.log:时间戳,省份,城市,用户,广告,中间字段使用空格分隔。

需求描述:统计出每一个省份每个广告被点击数量排行的 Top3

agent.log

  1. object Transform_case {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("operator-case"))
  4. // 1、获取原始数据: 时间戳,省份,城市,用户,广告
  5. val rawRdd: RDD[String] = context.textFile("datas/agent.log")
  6. // 2、将原始数据进行结构的转换。方便统计
  7. // 时间戳,省份,城市,用户,广告 => (( 省份,广告), 1)
  8. val transformedRdd: RDD[((String, String), Int)] = rawRdd.map(
  9. line => {
  10. val datas: Array[String] = line.split(" ")
  11. ((datas(1), datas(4)), 1)
  12. }
  13. )
  14. // 3、将转换后的数据,分组聚合
  15. // (( 省份,广告), 1) => (( 省份,广告), sum)
  16. val reducedRdd: RDD[((String, String), Int)] = transformedRdd.reduceByKey(_ + _)
  17. // 4、将聚合后的数据进行结构的转换
  18. // (( 省份,广告), sum) => (省份,(广告, sum))
  19. val convertedRdd: RDD[(String, (String, Int))] = reducedRdd.map {
  20. case ((prv, ad), sum) => (prv, (ad, sum))
  21. }
  22. // 5、将数据根据省份分组
  23. // (省份,【(广告A,sumA)】,【(广告B,sumB)】
  24. val groupedRdd: RDD[(String, Iterable[(String, Int)])] = convertedRdd.groupByKey()
  25. // 6、将分组后的数据组内降序排序,取前三
  26. val value: RDD[(String, List[(String, Int)])] = groupedRdd.mapValues(
  27. iter => iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)
  28. )
  29. // 7、采集数据打印在控制台
  30. value.collect.foreach(println)
  31. context.stop()
  32. }
  33. }

3、行动算子

所谓的行动算子,其实就是触发作业(Job)执行的方法,底层代码调用的是环境对象的runJob方法,底层代码中会创建ActiveJob,并提交执行。

  1. def collect(): Array[T] = withScope {
  2. val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  3. Array.concat(results: _*)
  4. }
  5. ......

// 一系列的封装之后,来到DAGScheduler中的runJob

  1. def runJob[T, U](
  2. rdd: RDD[T],
  3. func: (TaskContext, Iterator[T]) => U,
  4. partitions: Seq[Int],
  5. callSite: CallSite,
  6. resultHandler: (Int, U) => Unit,
  7. properties: Properties): Unit = {
  8. val start = System.nanoTime
  9. // 在这里提交Job
  10. val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
  11. ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
  12. waiter.completionFuture.value.get match {
  13. case scala.util.Success(_) =>
  14. logInfo("Job %d finished: %s, took %f s".format
  15. (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
  16. case scala.util.Failure(exception) =>
  17. logInfo("Job %d failed: %s, took %f s".format
  18. (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
  19. // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
  20. val callerStackTrace = Thread.currentThread().getStackTrace.tail
  21. exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
  22. throw exception
  23. }
  24. }
  1. def submitJob[T, U](
  2. rdd: RDD[T],
  3. func: (TaskContext, Iterator[T]) => U,
  4. partitions: Seq[Int],
  5. callSite: CallSite,
  6. resultHandler: (Int, U) => Unit,
  7. properties: Properties): JobWaiter[U] = {
  8. // Check to make sure we are not launching a task on a partition that does not exist.
  9. val maxPartitions = rdd.partitions.length
  10. partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
  11. throw new IllegalArgumentException(
  12. "Attempting to access a non-existent partition: " + p + ". " +
  13. "Total number of partitions: " + maxPartitions)
  14. }
  15. val jobId = nextJobId.getAndIncrement()
  16. if (partitions.isEmpty) {
  17. val clonedProperties = Utils.cloneProperties(properties)
  18. if (sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) == null) {
  19. clonedProperties.setProperty(SparkContext.SPARK_JOB_DESCRIPTION, callSite.shortForm)
  20. }
  21. val time = clock.getTimeMillis()
  22. listenerBus.post(
  23. SparkListenerJobStart(jobId, time, Seq.empty, clonedProperties))
  24. listenerBus.post(
  25. SparkListenerJobEnd(jobId, time, JobSucceeded))
  26. // Return immediately if the job is running 0 tasks
  27. return new JobWaiter[U](this, jobId, 0, resultHandler)
  28. }
  29. assert(partitions.nonEmpty)
  30. val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
  31. val waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler)
  32. // 在这里会触发一个Job被提交的事件,具体怎么处理后面再说
  33. eventProcessLoop.post(JobSubmitted(
  34. jobId, rdd, func2, partitions.toArray, callSite, waiter,
  35. Utils.cloneProperties(properties)))
  36. waiter
  37. }

在这个eventProcessLoop中,也就是DAGScheduler中,有handleXxxx,处理各种事件的方法,这里处理Job被提交

  1. private[scheduler] def handleJobSubmitted(jobId: Int,
  2. finalRDD: RDD[_],
  3. func: (TaskContext, Iterator[_]) => _,
  4. partitions: Array[Int],
  5. callSite: CallSite,
  6. listener: JobListener,
  7. properties: Properties): Unit = {
  8. var finalStage: ResultStage = null
  9. try {
  10. // New stage creation may throw an exception if, for example, jobs are run on a
  11. // HadoopRDD whose underlying HDFS files have been deleted.
  12. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
  13. } catch {
  14. case e: BarrierJobSlotsNumberCheckFailed =>
  15. // If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically.
  16. val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId,
  17. (_: Int, value: Int) => value + 1)
  18. logWarning(s"Barrier stage in job $jobId requires ${e.requiredConcurrentTasks} slots, " +
  19. s"but only ${e.maxConcurrentTasks} are available. " +
  20. s"Will retry up to ${maxFailureNumTasksCheck - numCheckFailures + 1} more times")
  21. if (numCheckFailures <= maxFailureNumTasksCheck) {
  22. messageScheduler.schedule(
  23. new Runnable {
  24. override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,
  25. partitions, callSite, listener, properties))
  26. },
  27. timeIntervalNumTasksCheck,
  28. TimeUnit.SECONDS
  29. )
  30. return
  31. } else {
  32. // Job failed, clear internal data.
  33. barrierJobIdToNumTasksCheckFailures.remove(jobId)
  34. listener.jobFailed(e)
  35. return
  36. }
  37. case e: Exception =>
  38. logWarning("Creating new stage failed due to exception - job: " + jobId, e)
  39. listener.jobFailed(e)
  40. return
  41. }
  42. // Job submitted, clear internal data.
  43. barrierJobIdToNumTasksCheckFailures.remove(jobId)
  44. // 这里创建一个ActiveJob
  45. val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
  46. clearCacheLocs()
  47. logInfo("Got job %s (%s) with %d output partitions".format(
  48. job.jobId, callSite.shortForm, partitions.length))
  49. logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
  50. logInfo("Parents of final stage: " + finalStage.parents)
  51. logInfo("Missing parents: " + getMissingParentStages(finalStage))
  52. val jobSubmissionTime = clock.getTimeMillis()
  53. jobIdToActiveJob(jobId) = job
  54. activeJobs += job
  55. finalStage.setActiveJob(job)
  56. val stageIds = jobIdToStageIds(jobId).toArray
  57. val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
  58. listenerBus.post(
  59. SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
  60. submitStage(finalStage)
  61. }

1、reduce
def reduce(f: (T, T) => T): T
聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据

2、collect
def collect(): Array[T]
在驱动程序中,以数组 Array 的形式返回数据集的所有元素

3、count
def count(): Long
返回 RDD 中元素的个数

4、first
def first(): T
返回 RDD 中的第一个元素

5、take
def take(num: Int): Array[T]
返回一个由 RDD 的前 n 个元素组成的数组

6、takeOrdered
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
返回一个由RDD的前n个元素组成的数组

  1. object Action_Normal {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("operator-action"))
  4. val rdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4))
  5. // reduce : 对数据进行归约计算,直接出结果
  6. val i: Int = rdd.reduce(_ + _)
  7. println(i)
  8. // collect : 收集数据,并且在Driver中转换成数组
  9. val ints: Array[Int] = rdd.collect()
  10. println(ints.mkString(", "))
  11. // count : 统计数据的个数
  12. val l: Long = rdd.count()
  13. println(l)
  14. // first : 取数据源的第一个数据
  15. val i1: Int = rdd.first()
  16. println(i1)
  17. // take : 取数据源的前n个数据
  18. val ints1: Array[Int] = rdd.take(3)
  19. println(ints1.mkString(", "))
  20. val rdd1 = context.makeRDD(List(4,2,1,3))
  21. // takeOrdered : 先对数据源进行排序操作,再取前n个数据
  22. val ints2: Array[Int] = rdd1.takeOrdered(3)(Ordering.Int.reverse)
  23. println(ints2.mkString(", "))
  24. context.stop()
  25. }
  26. }

7、aggregate
def aggregateU: ClassTag(seqOp: (U, T) => U, combOp: (U, U) => U): U
分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合

  1. object Action_aggregate {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("operator-action"))
  4. val rdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4),2)
  5. // aggregate : 聚合
  6. // 分区内:0 + 1 + 2 ,0 + 3 + 4, 分区间:3 + 7 = 10
  7. // val res: Int = rdd.aggregate(0)(_ + _, _ + _)
  8. // 如果没有指定分区,8个分区, 4个空,4个有数据。
  9. // 每个分区内计算都是加10=80 + (1 + 2 + 3 + 4) + 10(分区间计算)
  10. val res: Int = rdd.aggregate(10)(_ + _, _ + _)
  11. /*
  12. aggregateByKey : 聚合操作的初始值只参与分区内计算
  13. aggregate : 聚合操作的初始值不仅参与分区内计算,而且参与分区间计算
  14. */
  15. println(res)
  16. context.stop()
  17. }
  18. }

8、fold
def fold(zeroValue: T)(op: (T, T) => T): T
折叠操作,aggregate 的简化版操作
如果分区内和分区间计算规则一致时,可以使用fold简化

9、countByKey
def countByKey(): Map[K, Long]
统计每种 key 的个数
10、countByValue
def countByValue()(implicit ord: Ordering[T]
统计每种value的个数

  1. object Action_fold_countByKey_Value {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("operator-action"))
  4. val rdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4),2)
  5. // aggregate : 聚合
  6. // 分区内:0 + 1 + 2 ,0 + 3 + 4, 分区间:3 + 7 = 10
  7. // val res: Int = rdd.aggregate(0)(_ + _, _ + _)
  8. // 如果没有指定分区,8个分区, 4个空,4个有数据。
  9. // 每个分区内计算都是加10=80 + (1 + 2 + 3 + 4) + 10(分区间计算)
  10. val res: Int = rdd.fold(10)(_ + _)
  11. println(res)
  12. val rdd1: RDD[(String, Int)] = context.makeRDD(List(("a", 1), ("b", 1), ("b", 1)))
  13. // countByKey : 根据键值对类型,计算key的数量
  14. val stringToLong: collection.Map[String, Long] = rdd1.countByKey()
  15. println(stringToLong)
  16. rdd.countByValue()
  17. // countByValue : 任何类型,获取单值的数量
  18. val valRes: collection.Map[Int, Long] = rdd.countByValue()
  19. println(valRes)
  20. context.stop()
  21. }
  22. }

11、save的相关算法
def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile(
path: String,
codec: Option[Class[_ <: CompressionCodec]] = None): Unit

  1. object Action_save {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("operator-action"))
  4. val rdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4),2)
  5. rdd.saveAsTextFile("output")
  6. rdd.saveAsObjectFile("output1")
  7. rdd.map((_,1)).saveAsSequenceFile("output2")
  8. context.stop()
  9. }
  10. }

12、foreach

  1. /**
  2. * Applies a function f to all elements of this RDD.
  3. */
  4. def foreach(f: T => Unit): Unit = withScope {
  5. val cleanF = sc.clean(f)
  6. sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
  7. }

我们经常使用rdd.collect().foreach()遍历打印,谁又知道rdd.foreach()区别呢?

  1. object Action_foreach {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("operator-action"))
  4. val rdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4),2)
  5. // 这个其实是在Driver端内存集合中的循环遍历方法
  6. rdd.collect().foreach(println)
  7. // foreach其实是Executor端内存集合数据打印
  8. rdd.foreach(println)
  9. // RDD的方法和Scala集合对象中的方法不一样
  10. // 集合对象的方法都是在同一个节点的内存中完成的
  11. // RDD的方法可以将计算逻辑发送到Executor(分布式节点)执行
  12. // RDD方法外部的操作都是在Driver端进行的,而方法内部的逻辑代码是在Executor端进行的。
  13. context.stop()
  14. }
  15. }

4、wordCount的11种死法

1、groupBy

通过对相同数据的分组,进行聚合操作,实现功能。

  1. /**
  2. * 1、groupBy : 传统
  3. */
  4. def wordCount1(sc: SparkContext) = {
  5. val rdd: RDD[String] = sc.makeRDD(List("Hello Spark", "Hello Scala"))
  6. // 分割
  7. val words: RDD[String] = rdd.flatMap(_.split(" "))
  8. // 分组
  9. val group: RDD[(String, Iterable[String])] = words.groupBy(word => word)
  10. // 聚合(求大小)
  11. val wordCount: RDD[(String, Int)] = group.mapValues(iter => iter.size)
  12. }

2、groupByKey

  1. /**
  2. * 2、groupByKey : 简化一丢丢,但是效率上,由于是走shuffle的,会打乱
  3. */
  4. def wordCount2(sc: SparkContext) = {
  5. val rdd: RDD[String] = sc.makeRDD(List("Hello Spark", "Hello Scala"))
  6. // 分割
  7. val words: RDD[String] = rdd.flatMap(_.split(" "))
  8. // 转换成元组形式
  9. val word: RDD[(String, Int)] = words.map((_, 1))
  10. // 分组
  11. val group: RDD[(String, Iterable[Int])] = word.groupByKey()
  12. // 聚合(求大小)
  13. val wordCount: RDD[(String, Int)] = group.mapValues(iter => iter.size)
  14. }

3、reduceByKey

  1. /**
  2. * 3、reduceByKey : 效率比groupByKey好点,不会走shuffle,直接聚合
  3. */
  4. def wordCount3(sc: SparkContext) = {
  5. val rdd: RDD[String] = sc.makeRDD(List("Hello Spark", "Hello Scala"))
  6. // 分割
  7. val words: RDD[String] = rdd.flatMap(_.split(" "))
  8. // 转换成元组形式
  9. val word: RDD[(String, Int)] = words.map((_, 1))
  10. // 聚合(求大小)
  11. val wordCount: RDD[(String, Int)] = word.reduceByKey(_ + _)
  12. }

4、aggregateByKey

  1. /**
  2. * 4、aggregateByKey :
  3. */
  4. def wordCount4(sc: SparkContext) = {
  5. val rdd: RDD[String] = sc.makeRDD(List("Hello Spark", "Hello Scala"))
  6. // 分割
  7. val words: RDD[String] = rdd.flatMap(_.split(" "))
  8. // 转换成元组形式
  9. val word: RDD[(String, Int)] = words.map((_, 1))
  10. // 聚合(求大小)
  11. val wordCount: RDD[(String, Int)] = word.aggregateByKey(0)(_ + _, _ + _)
  12. wordCount.collect().foreach(println)
  13. }

5、foldByKey

  1. /**
  2. * 5、foldByKey :分区内和分区间规则相同时可简化
  3. */
  4. def wordCount5(sc: SparkContext) = {
  5. val rdd: RDD[String] = sc.makeRDD(List("Hello Spark", "Hello Scala"))
  6. // 分割
  7. val words: RDD[String] = rdd.flatMap(_.split(" "))
  8. // 转换成元组形式
  9. val word: RDD[(String, Int)] = words.map((_, 1))
  10. // 聚合(求大小)
  11. val wordCount: RDD[(String, Int)] = word.foldByKey(0)(_ + _)
  12. wordCount.collect().foreach(println)
  13. }

6、combineByKey

  1. /**
  2. * 6、combineByKey :三个参数,初始值进行操作
  3. */
  4. def wordCount6(sc: SparkContext) = {
  5. val rdd: RDD[String] = sc.makeRDD(List("Hello Spark", "Hello Scala"))
  6. // 分割
  7. val words: RDD[String] = rdd.flatMap(_.split(" "))
  8. // 转换成元组形式
  9. val word: RDD[(String, Int)] = words.map((_, 1))
  10. // 聚合(求大小)
  11. val wordCount: RDD[(String, Int)] = word.combineByKey(v => v,
  12. (x: Int, y) => x + y,
  13. (x1: Int, x2: Int) => x1 + x2
  14. )
  15. wordCount.collect().foreach(println)
  16. }

7、countByKey

  1. /**
  2. * 7、countByKey :直接对key的数量聚合
  3. */
  4. def wordCount7(sc: SparkContext) = {
  5. val rdd: RDD[String] = sc.makeRDD(List("Hello Spark", "Hello Scala"))
  6. // 分割
  7. val words: RDD[String] = rdd.flatMap(_.split(" "))
  8. // 转换成元组形式
  9. val word: RDD[(String, Int)] = words.map((_, 1))
  10. val wordCount: collection.Map[String, Long] = word.countByKey()
  11. println(wordCount)
  12. }

8、countByValue

  1. /**
  2. * 8、countByValue :直接对value的数量聚合
  3. */
  4. def wordCount8(sc: SparkContext) = {
  5. val rdd: RDD[String] = sc.makeRDD(List("Hello Spark", "Hello Scala"))
  6. // 分割
  7. val words: RDD[String] = rdd.flatMap(_.split(" "))
  8. // 转换成元组形式
  9. val wordCount: collection.Map[String, Long] = words.countByValue()
  10. println(wordCount)
  11. }

9、reduce

  1. /**
  2. * 9、reduce :直接聚合
  3. */
  4. def wordCount9(sc: SparkContext) = {
  5. val rdd: RDD[String] = sc.makeRDD(List("Hello Spark", "Hello Scala"))
  6. // 分割
  7. val words: RDD[String] = rdd.flatMap(_.split(" "))
  8. // 为了能够直接reduce聚合,考虑将每个单词转换成Map形式,次数为1
  9. val maps: RDD[mutable.Map[String, Long]] = words.map(
  10. word => {
  11. // 1、创建一个单独的Map,将每个单词放入其中
  12. mutable.Map[String, Long]((word, 1))
  13. }
  14. )
  15. // 2、将所有Map计算
  16. // reduce只能对相同类型进行聚合
  17. val wordCount: mutable.Map[String, Long] = maps.reduce(
  18. // 遍历所有map,不断往第一个map中更新数据
  19. (map1, map2) => {
  20. map2.foreach {
  21. case (word, count) => {
  22. // 取出所有word的count
  23. val cnt: Long = map1.getOrElse(word, 0L)
  24. // 给map1中更新值
  25. map1.update(word, cnt + count)
  26. }
  27. }
  28. // 将map1返回
  29. map1
  30. }
  31. )
  32. println(wordCount)
  33. }

10、aggregate

  1. /**
  2. * 10、aggregate :跟aggregateByKey差不多
  3. */
  4. def wordCount10(sc: SparkContext) = {
  5. val rdd: RDD[String] = sc.makeRDD(List("Hello Spark", "Hello Scala"))
  6. // 拆分成单词
  7. val words: RDD[String] = rdd.flatMap(_.split(" "))
  8. // 根据key进行分组
  9. val wordCount: mutable.Map[String, Long] = words.aggregate(mutable.Map[String, Long]())(
  10. (map: mutable.Map[String, Long], word) => {
  11. // 分区内集合,一个分区中没有改数据,获取0放入,否则数量+1
  12. val count = map.getOrElse(word, 0L)
  13. map.put(word, count + 1L)
  14. map
  15. },
  16. // 分区间聚合
  17. (map1: mutable.Map[String, Long], map2: mutable.Map[String, Long]) => {
  18. map2.foreach {
  19. case (word, count) => {
  20. // 取出所有word的count
  21. val cnt: Long = map1.getOrElse(word, 0L)
  22. // 给map1中更新值
  23. map1.update(word, cnt + count)
  24. }
  25. }
  26. // 将map1返回
  27. map1
  28. }
  29. )
  30. println(wordCount)
  31. }

11、fold

待续

  1. object wordCount8fold {
  2. def main(args: Array[String]): Unit = {
  3. //partitionBy
  4. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("File - RDD")
  5. val sc = new SparkContext(sparkConf)
  6. val rdd = sc.makeRDD(
  7. List(("a", 1), ("b", 2), ("a", 1), ("b", 2), ("a", 1), ("b", 2)), 2
  8. )
  9. val strRDD: RDD[String] = rdd.map {
  10. case (str, sum) => {
  11. (str + " ") * sum
  12. }
  13. }
  14. val flatMapRDD = strRDD.flatMap(_.split(" "))
  15. flatMapRDD.map(s => mutable.Map(s -> 1)).fold(mutable.Map[String, Int]())(
  16. (map1: mutable.Map[String, Int], map2: mutable.Map[String, Int]) => {
  17. map1.foldLeft(map2)(
  18. (innerMap, kv) => {
  19. innerMap(kv._1) = innerMap.getOrElse(kv._1, 0) + kv._2
  20. innerMap
  21. }
  22. )
  23. }
  24. ).foreach(println)
  25. // println (result.collect().mkString(","))
  26. }
  27. }
  28. 原文链接:https://blog.csdn.net/qq_34158117/article/details/106611349