一、RDD继续

1、RDD序列化

1、对象序列化case

我们现在进行这样一个操作,使用一个最普遍的foreach进行打印一个对象User,看是否出现问题。

  1. object Test01 {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("serializable"))
  4. val rdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4))
  5. val user = new User
  6. // 遍历打印
  7. rdd.foreach(
  8. num => {
  9. println(user.age + num)
  10. }
  11. )
  12. context.stop()
  13. }
  14. }

会出现以下错误:任务没有序列化。
任务怎么会没有序列化呢?真正的原因其实是这个User对象没有序列化。

  1. Exception in thread "main" org.apache.spark.SparkException: Task not serializable
  2. at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
  3. at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
  4. at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
  5. at org.apache.spark.SparkContext.clean(SparkContext.scala:2356)
  6. at org.apache.spark.rdd.RDD.$anonfun$foreach$1(RDD.scala:985)
  7. at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  8. at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  9. at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
  10. at org.apache.spark.rdd.RDD.foreach(RDD.scala:984)
  11. at org.yyds.spark.core.rdd.serialize.Test01$.main(Test01.scala:16)
  12. at org.yyds.spark.core.rdd.serialize.Test01.main(Test01.scala)
  13. Caused by: java.io.NotSerializableException: org.yyds.spark.core.rdd.serialize.User
  14. Serialization stack:
  15. - object not serializable (class: org.yyds.spark.core.rdd.serialize.User, value: org.yyds.spark.core.rdd.serialize.User@7fc645e4)
  16. - element of array (index: 0)
  17. - array (class [Ljava.lang.Object;, size 1)
  18. - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
  19. - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.yyds.spark.core.rdd.serialize.Test01$, functionalInterfaceMethod=scala/runtime/java8/JFunction1$mcVI$sp.apply$mcVI$sp:(I)V, implementation=invokeStatic org/yyds/spark/core/rdd/serialize/Test01$.$anonfun$main$1:(Lorg/yyds/spark/core/rdd/serialize/User;I)V, instantiatedMethodType=(I)V, numCaptured=1])
  20. - writeReplace data (class: java.lang.invoke.SerializedLambda)
  21. - object (class org.yyds.spark.core.rdd.serialize.Test01$$$Lambda$631/439720255, org.yyds.spark.core.rdd.serialize.Test01$$$Lambda$631/439720255@37c2eacb)
  22. at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
  23. at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
  24. at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
  25. at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413)
  26. ... 10 more

现在让这个User进行序列化看看

  1. class User extends Serializable {
  2. var age: Int = _
  3. }

结果如下: 因为foreach内部是在Executor端进行的,所以是个并行任务,谁先执行谁后执行不知道。

  1. 4
  2. 2
  3. 1
  4. 3

我们还可以使用样例类的方式来进行执行,因为样例类在执行的时候会混入序列化特质。

  1. //class User extends Serializable {
  2. // 样例类会在编译时混入序列化特质,实现序列化接口
  3. case class User(){
  4. var age: Int = _
  5. }

当然,我们是做User的属性和List中的数据相加。而foreach是遍历每一个数据执行操作。如果List中没有数据,不会进行相加操作,那没有实现序列化特质的User还会出现序列化问题吗?

  1. object Test01 {
  2. def main(args: Array[String]): Unit = {
  3. val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("serializable"))
  4. val rdd: RDD[Int] = context.makeRDD(List[Int]())
  5. val user = new User
  6. // 遍历打印
  7. rdd.foreach(
  8. num => {
  9. println(user.age + num)
  10. }
  11. )
  12. context.stop()
  13. }
  14. }
  15. //class User extends Serializable {
  16. // 样例类会在编译时混入序列化特质,实现序列化接口
  17. //case class User(){
  18. class User{
  19. var age: Int = _
  20. }

依然会出现序列化问题!
**
我们来看foreach的执行顺序:

  1. def foreach(f: T => Unit): Unit = withScope {
  2. val cleanF = sc.clean(f)
  3. sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
  4. }

经过一系列的runHob之后,来到SparkContext的runJob中,这里执行清理工作,进行闭包检测。

  1. def runJob[T, U: ClassTag](
  2. rdd: RDD[T],
  3. func: (TaskContext, Iterator[T]) => U,
  4. partitions: Seq[Int],
  5. resultHandler: (Int, U) => Unit): Unit = {
  6. if (stopped.get()) {
  7. throw new IllegalStateException("SparkContext has been shutdown")
  8. }
  9. val callSite = getCallSite
  10. // 在这里
  11. val cleanedFunc = clean(func)
  12. logInfo("Starting job: " + callSite.shortForm)
  13. if (conf.getBoolean("spark.logLineage", false)) {
  14. logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
  15. }
  16. dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
  17. progressBar.foreach(_.finishAll())
  18. rdd.doCheckpoint()
  19. }
  1. private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
  2. ClosureCleaner.clean(f, checkSerializable)
  3. f
  4. }

来到ClosureCleaner的clean方法

  1. private def clean(
  2. func: AnyRef,
  3. checkSerializable: Boolean,
  4. cleanTransitively: Boolean,
  5. accessedFields: Map[Class[_], Set[String]]): Unit = {
  6. // indylambda check. Most likely to be the case with 2.12, 2.13
  7. // so we check first
  8. // non LMF-closures should be less frequent from now on
  9. val maybeIndylambdaProxy = IndylambdaScalaClosures.getSerializationProxy(func)
  10. // 在这里进行闭包检测
  11. if (!isClosure(func.getClass) && maybeIndylambdaProxy.isEmpty) {
  12. logDebug(s"Expected a closure; got ${func.getClass.getName}")
  13. return
  14. }
  15. // TODO: clean all inner closures first. This requires us to find the inner objects.
  16. // TODO: cache outerClasses / innerClasses / accessedFields
  17. if (func == null) {
  18. return
  19. }
  20. if (maybeIndylambdaProxy.isEmpty) {
  21. logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) +++")
  22. // A list of classes that represents closures enclosed in the given one
  23. // 获取闭包中的内部类们
  24. val innerClasses = getInnerClosureClasses(func)
  25. // A list of enclosing objects and their respective classes, from innermost to outermost
  26. // An outer object at a given index is of type outer class at the same index
  27. val (outerClasses, outerObjects) = getOuterClassesAndObjects(func)
  28. // For logging purposes only
  29. val declaredFields = func.getClass.getDeclaredFields
  30. val declaredMethods = func.getClass.getDeclaredMethods
  31. if (log.isDebugEnabled) {
  32. logDebug(s" + declared fields: ${declaredFields.size}")
  33. declaredFields.foreach { f => logDebug(s" $f") }
  34. logDebug(s" + declared methods: ${declaredMethods.size}")
  35. declaredMethods.foreach { m => logDebug(s" $m") }
  36. logDebug(s" + inner classes: ${innerClasses.size}")
  37. innerClasses.foreach { c => logDebug(s" ${c.getName}") }
  38. logDebug(s" + outer classes: ${outerClasses.size}" )
  39. outerClasses.foreach { c => logDebug(s" ${c.getName}") }
  40. }
  41. // Fail fast if we detect return statements in closures
  42. getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0)
  43. // If accessed fields is not populated yet, we assume that
  44. // the closure we are trying to clean is the starting one
  45. if (accessedFields.isEmpty) {
  46. logDebug(" + populating accessed fields because this is the starting closure")
  47. // Initialize accessed fields with the outer classes first
  48. // This step is needed to associate the fields to the correct classes later
  49. initAccessedFields(accessedFields, outerClasses)
  50. // Populate accessed fields by visiting all fields and methods accessed by this and
  51. // all of its inner closures. If transitive cleaning is enabled, this may recursively
  52. // visits methods that belong to other classes in search of transitively referenced fields.
  53. for (cls <- func.getClass :: innerClasses) {
  54. getClassReader(cls).accept(new FieldAccessFinder(accessedFields, cleanTransitively), 0)
  55. }
  56. }
  57. logDebug(s" + fields accessed by starting closure: ${accessedFields.size} classes")
  58. accessedFields.foreach { f => logDebug(" " + f) }
  59. // List of outer (class, object) pairs, ordered from outermost to innermost
  60. // Note that all outer objects but the outermost one (first one in this list) must be closures
  61. var outerPairs: List[(Class[_], AnyRef)] = outerClasses.zip(outerObjects).reverse
  62. var parent: AnyRef = null
  63. if (outerPairs.nonEmpty) {
  64. val outermostClass = outerPairs.head._1
  65. val outermostObject = outerPairs.head._2
  66. if (isClosure(outermostClass)) {
  67. logDebug(s" + outermost object is a closure, so we clone it: ${outermostClass}")
  68. } else if (outermostClass.getName.startsWith("$line")) {
  69. // SPARK-14558: if the outermost object is a REPL line object, we should clone
  70. // and clean it as it may carray a lot of unnecessary information,
  71. // e.g. hadoop conf, spark conf, etc.
  72. logDebug(s" + outermost object is a REPL line object, so we clone it:" +
  73. s" ${outermostClass}")
  74. } else {
  75. // The closure is ultimately nested inside a class; keep the object of that
  76. // class without cloning it since we don't want to clone the user's objects.
  77. // Note that we still need to keep around the outermost object itself because
  78. // we need it to clone its child closure later (see below).
  79. logDebug(s" + outermost object is not a closure or REPL line object," +
  80. s" so do not clone it: ${outermostClass}")
  81. parent = outermostObject // e.g. SparkContext
  82. outerPairs = outerPairs.tail
  83. }
  84. } else {
  85. logDebug(" + there are no enclosing objects!")
  86. }
  87. // Clone the closure objects themselves, nulling out any fields that are not
  88. // used in the closure we're working on or any of its inner closures.
  89. for ((cls, obj) <- outerPairs) {
  90. logDebug(s" + cloning instance of class ${cls.getName}")
  91. // We null out these unused references by cloning each object and then filling in all
  92. // required fields from the original object. We need the parent here because the Java
  93. // language specification requires the first constructor parameter of any closure to be
  94. // its enclosing object.
  95. val clone = cloneAndSetFields(parent, obj, cls, accessedFields)
  96. // If transitive cleaning is enabled, we recursively clean any enclosing closure using
  97. // the already populated accessed fields map of the starting closure
  98. if (cleanTransitively && isClosure(clone.getClass)) {
  99. logDebug(s" + cleaning cloned closure recursively (${cls.getName})")
  100. // No need to check serializable here for the outer closures because we're
  101. // only interested in the serializability of the starting closure
  102. clean(clone, checkSerializable = false, cleanTransitively, accessedFields)
  103. }
  104. parent = clone
  105. }
  106. // Update the parent pointer ($outer) of this closure
  107. if (parent != null) {
  108. val field = func.getClass.getDeclaredField("$outer")
  109. field.setAccessible(true)
  110. // If the starting closure doesn't actually need our enclosing object, then just null it out
  111. if (accessedFields.contains(func.getClass) &&
  112. !accessedFields(func.getClass).contains("$outer")) {
  113. logDebug(s" + the starting closure doesn't actually need $parent, so we null it out")
  114. field.set(func, null)
  115. } else {
  116. // Update this closure's parent pointer to point to our enclosing object,
  117. // which could either be a cloned closure or the original user object
  118. field.set(func, parent)
  119. }
  120. }
  121. logDebug(s" +++ closure $func (${func.getClass.getName}) is now cleaned +++")
  122. } else {
  123. val lambdaProxy = maybeIndylambdaProxy.get
  124. val implMethodName = lambdaProxy.getImplMethodName
  125. logDebug(s"Cleaning indylambda closure: $implMethodName")
  126. // capturing class is the class that declared this lambda
  127. val capturingClassName = lambdaProxy.getCapturingClass.replace('/', '.')
  128. val classLoader = func.getClass.getClassLoader // this is the safest option
  129. // scalastyle:off classforname
  130. val capturingClass = Class.forName(capturingClassName, false, classLoader)
  131. // scalastyle:on classforname
  132. // Fail fast if we detect return statements in closures
  133. val capturingClassReader = getClassReader(capturingClass)
  134. capturingClassReader.accept(new ReturnStatementFinder(Option(implMethodName)), 0)
  135. val isClosureDeclaredInScalaRepl = capturingClassName.startsWith("$line") &&
  136. capturingClassName.endsWith("$iw")
  137. val outerThisOpt = if (lambdaProxy.getCapturedArgCount > 0) {
  138. Option(lambdaProxy.getCapturedArg(0))
  139. } else {
  140. None
  141. }
  142. // only need to clean when there is an enclosing "this" captured by the closure, and it
  143. // should be something cleanable, i.e. a Scala REPL line object
  144. val needsCleaning = isClosureDeclaredInScalaRepl &&
  145. outerThisOpt.isDefined && outerThisOpt.get.getClass.getName == capturingClassName
  146. if (needsCleaning) {
  147. // indylambda closures do not reference enclosing closures via an `$outer` chain, so no
  148. // transitive cleaning on the `$outer` chain is needed.
  149. // Thus clean() shouldn't be recursively called with a non-empty accessedFields.
  150. assert(accessedFields.isEmpty)
  151. initAccessedFields(accessedFields, Seq(capturingClass))
  152. IndylambdaScalaClosures.findAccessedFields(
  153. lambdaProxy, classLoader, accessedFields, cleanTransitively)
  154. logDebug(s" + fields accessed by starting closure: ${accessedFields.size} classes")
  155. accessedFields.foreach { f => logDebug(" " + f) }
  156. if (accessedFields(capturingClass).size < capturingClass.getDeclaredFields.length) {
  157. // clone and clean the enclosing `this` only when there are fields to null out
  158. val outerThis = outerThisOpt.get
  159. logDebug(s" + cloning instance of REPL class $capturingClassName")
  160. val clonedOuterThis = cloneAndSetFields(
  161. parent = null, outerThis, capturingClass, accessedFields)
  162. val outerField = func.getClass.getDeclaredField("arg$1")
  163. outerField.setAccessible(true)
  164. outerField.set(func, clonedOuterThis)
  165. }
  166. }
  167. logDebug(s" +++ indylambda closure ($implMethodName) is now cleaned +++")
  168. }
  169. // ====== 在这里检查是否进行了序列化
  170. if (checkSerializable) {
  171. ensureSerializable(func)
  172. }
  173. }
  1. private def ensureSerializable(func: AnyRef): Unit = {
  2. try {
  3. // 尝试对当前环境进行序列化,如果无法序列化而导致抛出异常,则抛出异常:Task not serializable
  4. if (SparkEnv.get != null) {
  5. SparkEnv.get.closureSerializer.newInstance().serialize(func)
  6. }
  7. } catch {
  8. case ex: Exception => throw new SparkException("Task not serializable", ex)
  9. }
  10. }

2、属性、方法序列化case

  1. object Test_02 {
  2. def main(args: Array[String]): Unit = {
  3. //1.创建 SparkConf并设置 App名称
  4. val conf: SparkConf= new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
  5. val sc: SparkContext = new SparkContext(conf)
  6. val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark",
  7. "hive", "lalala"))
  8. val search = new Search("hello")
  9. //3.2 函数传递,打印:ERROR Task not serializable
  10. search.getMatch1(rdd).collect().foreach(println)
  11. //3.3 属性传递,打印:ERROR Task not serializable
  12. search.getMatch2(rdd).collect().foreach(println)
  13. //4.关闭连接
  14. sc.stop()
  15. }
  16. }
  17. class Search(query: String) {
  18. def isMatch(s: String): Boolean = {
  19. s.contains(query)
  20. }
  21. // 函数序列化案例
  22. def getMatch1(rdd: RDD[String]): RDD[String] = {
  23. //rdd.filter(this.isMatch)
  24. rdd.filter(isMatch)
  25. }
  26. // 属性序列化案例
  27. def getMatch2(rdd: RDD[String]): RDD[String] = {
  28. //rdd.filter(x => x.contains(this.query))
  29. rdd.filter(x => x.contains(query))
  30. //val q = query
  31. //rdd.filter(x => x.contains(q))
  32. }
  33. }

无法序列化错误。
原因:构造参数是作为类的属性使用的。方法在调用的时候都是进行的this.属性操作,所以属性的调用会导致类的序列化。

解决方案:
1、混入Serializable特质
2、使用case class
3、局部变量代替类的属性传参

3、总结

  • 闭包检测

从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor端执行。那么在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给 Executor端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测Scala2.12 版本后闭包编译方式发生了改变
**

  • 序列化属性和方法

从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor端执行
**

4、Kryo序列化框架

参考地址:https://github.com/EsotericSoftware/kryo
Java 的序列化能够序列化任何的类。但是比较重(字节多),序列化后,对象的提交也比较大。Spark 出于性能的考虑,Spark2.0 开始支持另外一种 Kryo 序列化机制。Kryo 速度是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 来序列化。

注意:即使使用 Kryo 序列化,也要继承 Serializable 接口。

  1. object Test_Serializable_Kryo {
  2. def main(args: Array[String]): Unit = {
  3. val conf: SparkConf = new SparkConf()
  4. .setAppName("SerDemo")
  5. .setMaster("local[*]")
  6. // 替换默认的序列化机制
  7. .set("spark.serializer",
  8. "org.apache.spark.serializer.KryoSerializer")
  9. // 注册需要使用 kryo 序列化的自定义类
  10. .registerKryoClasses(Array(classOf[Searcher]))
  11. val sc = new SparkContext(conf)
  12. val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello atguigu",
  13. "atguigu", "hahah"), 2)
  14. val searcher = new Searcher("hello")
  15. val result: RDD[String] = searcher.getMatchedRDD1(rdd)
  16. result.collect.foreach(println)
  17. }
  18. }
  19. case class Searcher(val query: String) {
  20. def isMatch(s: String) = {
  21. s.contains(query)
  22. }
  23. def getMatchedRDD1(rdd: RDD[String]) = {
  24. rdd.filter(isMatch)
  25. }
  26. def getMatchedRDD2(rdd: RDD[String]) = {
  27. val q = query
  28. rdd.filter(_.contains(q))
  29. }
  30. }

2、RDD依赖关系

image.png

image.png

1、血缘关系

RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建 RDD 的一系列 Lineage(血统)记录下来,以便恢复丢失的分区。RDD 的 Lineage 会记录 RDD 的元数据信息和转换行为,当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。

演示:
使用toDebugString打印血缘关系

  1. object Test_Dep {
  2. def main(args: Array[String]): Unit = {
  3. // 1、连接Spark
  4. val conf: SparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
  5. val sparkContext = new SparkContext(conf)
  6. val lines: RDD[String] = sparkContext.textFile("datas/2.txt")
  7. // 使用toDebugString打印血缘信息
  8. println(lines.toDebugString)
  9. println("=========================================")
  10. val split: RDD[String] = lines.flatMap(_.split(" "))
  11. println(split.toDebugString)
  12. println("=========================================")
  13. val map: RDD[(String, Int)] = split.map(word => (word, 1))
  14. println(map.toDebugString)
  15. println("=========================================")
  16. val value: RDD[(String, Int)] = map.reduceByKey(_ + _)
  17. println(value.toDebugString)
  18. println("=========================================")
  19. value.collect().foreach(println)
  20. sparkContext.stop()
  21. }
  22. }

打印如下:

  1. (1) datas/2.txt MapPartitionsRDD[1] at textFile at Test_Dep.scala:13 []
  2. | datas/2.txt HadoopRDD[0] at textFile at Test_Dep.scala:13 []
  3. =========================================
  4. (1) MapPartitionsRDD[2] at flatMap at Test_Dep.scala:18 []
  5. | datas/2.txt MapPartitionsRDD[1] at textFile at Test_Dep.scala:13 []
  6. | datas/2.txt HadoopRDD[0] at textFile at Test_Dep.scala:13 []
  7. =========================================
  8. (1) MapPartitionsRDD[3] at map at Test_Dep.scala:22 []
  9. | MapPartitionsRDD[2] at flatMap at Test_Dep.scala:18 []
  10. | datas/2.txt MapPartitionsRDD[1] at textFile at Test_Dep.scala:13 []
  11. | datas/2.txt HadoopRDD[0] at textFile at Test_Dep.scala:13 []
  12. =========================================
  13. (1) ShuffledRDD[4] at reduceByKey at Test_Dep.scala:26 []
  14. // 这里的+-表示在这里进行了shuffle操作,血缘关系断了,
  15. // 这个(1)表示第一个分区
  16. +-(1) MapPartitionsRDD[3] at map at Test_Dep.scala:22 []
  17. | MapPartitionsRDD[2] at flatMap at Test_Dep.scala:18 []
  18. | datas/2.txt MapPartitionsRDD[1] at textFile at Test_Dep.scala:13 []
  19. | datas/2.txt HadoopRDD[0] at textFile at Test_Dep.scala:13 []
  20. =========================================
  21. (Spark,1)
  22. (Hello,2)
  23. (Scala,1)

2、依赖关系

其实就是相邻两个RDD之间的关系。

使用dependencies打印依赖关系

  1. object Test_Dep {
  2. def main(args: Array[String]): Unit = {
  3. // 1、连接Spark
  4. val conf: SparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
  5. val sparkContext = new SparkContext(conf)
  6. val lines: RDD[String] = sparkContext.textFile("datas/2.txt")
  7. // 使用toDebugString打印血缘信息
  8. println(lines.dependencies)
  9. println("=========================================")
  10. val split: RDD[String] = lines.flatMap(_.split(" "))
  11. println(split.dependencies)
  12. println("=========================================")
  13. val map: RDD[(String, Int)] = split.map(word => (word, 1))
  14. println(map.dependencies)
  15. println("=========================================")
  16. val value: RDD[(String, Int)] = map.reduceByKey(_ + _)
  17. println(value.dependencies)
  18. println("=========================================")
  19. value.collect().foreach(println)
  20. sparkContext.stop()
  21. }
  22. }

结果:

  1. List(org.apache.spark.OneToOneDependency@68a4dcc6)
  2. =========================================
  3. List(org.apache.spark.OneToOneDependency@cb7fa71)
  4. =========================================
  5. List(org.apache.spark.OneToOneDependency@625d9132)
  6. =========================================
  7. List(org.apache.spark.ShuffleDependency@575e572f)
  8. =========================================
  9. (Spark,1)
  10. (Hello,2)
  11. (Scala,1)
  12. Process finished with exit code 0

1、窄依赖

窄依赖表示每一个上游RDD 的 Partition 最多被下游RDD 的一个 分区使用,窄依赖我们形象的比喻为独生子女。

  1. /**
  2. * :: DeveloperApi ::
  3. * Represents a one-to-one dependency between partitions of the parent and child RDDs.
  4. */
  5. @DeveloperApi
  6. class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  7. override def getParents(partitionId: Int): List[Int] = List(partitionId)
  8. }

OneToOne
image.png

2、宽依赖

spark中并没有定义宽的概念,不过我们习惯与窄依赖对应,叫做宽依赖,表示上游RDD的数据被多个下游RDD的分区使用,会引起Shuffle

  1. /**
  2. * :: DeveloperApi ::
  3. * Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,
  4. * the RDD is transient since we don't need it on the executor side.
  5. *
  6. * @param _rdd the parent RDD
  7. * @param partitioner partitioner used to partition the shuffle output
  8. * @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If not set
  9. * explicitly then the default serializer, as specified by `spark.serializer`
  10. * config option, will be used.
  11. * @param keyOrdering key ordering for RDD's shuffles
  12. * @param aggregator map/reduce-side aggregator for RDD's shuffle
  13. * @param mapSideCombine whether to perform partial aggregation (also known as map-side combine)
  14. * @param shuffleWriterProcessor the processor to control the write behavior in ShuffleMapTask
  15. */
  16. @DeveloperApi
  17. class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
  18. @transient private val _rdd: RDD[_ <: Product2[K, V]],
  19. val partitioner: Partitioner,
  20. val serializer: Serializer = SparkEnv.get.serializer,
  21. val keyOrdering: Option[Ordering[K]] = None,
  22. val aggregator: Option[Aggregator[K, V, C]] = None,
  23. val mapSideCombine: Boolean = false,
  24. val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor)
  25. extends Dependency[Product2[K, V]] {

Shuffle
image.png

3、RDD阶段划分

1、简单解释

DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。例如,DAG 记录了 RDD 的转换过程和任务的阶段。
image.pngimage.png

比如去搞团建,旅游分组去逛,不同分组之间可以看完一个景点直接去下一个景点,但是如果要是有集体性的,需要等待所有分组汇总之后一起进行。
image.png
image.png

2、源码解读

image.png

开始的开始还是从collect行动算子开始。

  1. def collect(): Array[T] = withScope {
  2. val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  3. Array.concat(results: _*)
  4. }

来到SparkContext的runJob方法,谁调用就将当前的rdd传入

  1. def runJob[T, U: ClassTag](
  2. rdd: RDD[T],
  3. func: (TaskContext, Iterator[T]) => U,
  4. partitions: Seq[Int],
  5. resultHandler: (Int, U) => Unit): Unit = {
  6. if (stopped.get()) {
  7. throw new IllegalStateException("SparkContext has been shutdown")
  8. }
  9. val callSite = getCallSite
  10. val cleanedFunc = clean(func)
  11. logInfo("Starting job: " + callSite.shortForm)
  12. if (conf.getBoolean("spark.logLineage", false)) {
  13. logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
  14. }
  15. // 进入有向无环图的runJob
  16. dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
  17. progressBar.foreach(_.finishAll())
  18. rdd.doCheckpoint()
  19. }

DAGScheduler的runJob

  1. def runJob[T, U](
  2. rdd: RDD[T],
  3. func: (TaskContext, Iterator[T]) => U,
  4. partitions: Seq[Int],
  5. callSite: CallSite,
  6. resultHandler: (Int, U) => Unit,
  7. properties: Properties): Unit = {
  8. val start = System.nanoTime
  9. // 提交任务
  10. val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
  11. ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
  12. waiter.completionFuture.value.get match {
  13. case scala.util.Success(_) =>
  14. logInfo("Job %d finished: %s, took %f s".format
  15. (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
  16. case scala.util.Failure(exception) =>
  17. logInfo("Job %d failed: %s, took %f s".format
  18. (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
  19. // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
  20. val callerStackTrace = Thread.currentThread().getStackTrace.tail
  21. exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
  22. throw exception
  23. }
  24. }

提交任务的最后会发布一个Job被提交的事件

  1. eventProcessLoop.post(JobSubmitted(
  2. jobId, rdd, func2, partitions.toArray, callSite, waiter,
  3. Utils.cloneProperties(properties)))

会有一个方法handleJobSubmitted处理该事件
1、其中第一步就是创建一个ResultStage,一个Job只有一个

  1. // New stage creation may throw an exception if, for example, jobs are run on a
  2. // HadoopRDD whose underlying HDFS files have been deleted.
  3. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)

如何创建ResultStage的呢?

  1. private def createResultStage(
  2. rdd: RDD[_],
  3. func: (TaskContext, Iterator[_]) => _,
  4. partitions: Array[Int],
  5. jobId: Int,
  6. callSite: CallSite): ResultStage = {
  7. checkBarrierStageWithDynamicAllocation(rdd)
  8. checkBarrierStageWithNumSlots(rdd)
  9. checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
  10. // 在这里还会获取之前创建的阶段作为上一步阶段
  11. val parents = getOrCreateParentStages(rdd, jobId)
  12. val id = nextStageId.getAndIncrement()
  13. // 在这里根据当前的rdd创建一个ResultStage,作为最终返回的阶段处理
  14. val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
  15. stageIdToStage(id) = stage
  16. updateJobIdStageIdMaps(jobId, stage)
  17. stage
  18. }

2、在创建之前获取上一步阶段getOrCreateParentStages,先获取所有Shuffle依赖,并对每个依赖获取或者创建shuffle阶段

  1. private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
  2. // 根据当前rdd获取
  3. getShuffleDependencies(rdd).map { shuffleDep =>
  4. getOrCreateShuffleMapStage(shuffleDep, firstJobId)
  5. }.toList
  6. }

3、如何获取Shuffle依赖的呢?将创建一个集合,获取当前rdd的所有依赖关系,

  1. private[scheduler] def getShuffleDependencies(
  2. rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
  3. val parents = new HashSet[ShuffleDependency[_, _, _]]
  4. val visited = new HashSet[RDD[_]]
  5. // waitingForVisit就是准备去的下一个阶段
  6. val waitingForVisit = new ListBuffer[RDD[_]]
  7. // 将当前rdd添加进去
  8. waitingForVisit += rdd
  9. while (waitingForVisit.nonEmpty) {
  10. // 从所有rdd中取出阶段
  11. val toVisit = waitingForVisit.remove(0)
  12. // 如果没有拜访过,
  13. if (!visited(toVisit)) {
  14. visited += toVisit
  15. // 遍历所有依赖关系,如果当前是shuffle依赖,创建一个shuffle,如果是普通依赖,准备去拜访
  16. toVisit.dependencies.foreach {
  17. case shuffleDep: ShuffleDependency[_, _, _] =>
  18. parents += shuffleDep
  19. case dependency =>
  20. waitingForVisit.prepend(dependency.rdd)
  21. }
  22. }
  23. }
  24. parents
  25. }

获取或者创建shuffle阶段:

  1. private def getOrCreateShuffleMapStage(
  2. shuffleDep: ShuffleDependency[_, _, _],
  3. firstJobId: Int): ShuffleMapStage = {
  4. // 获取该shuffle阶段
  5. shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
  6. case Some(stage) =>
  7. stage
  8. case None =>
  9. // 如果没有shuffle,创建一个shuffleMapStage
  10. // Create stages for all missing ancestor shuffle dependencies.
  11. getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
  12. // Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
  13. // that were not already in shuffleIdToMapStage, it's possible that by the time we
  14. // get to a particular dependency in the foreach loop, it's been added to
  15. // shuffleIdToMapStage by the stage creation process for an earlier dependency. See
  16. // SPARK-13902 for more information.
  17. if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
  18. createShuffleMapStage(dep, firstJobId)
  19. }
  20. }
  21. // Finally, create a stage for the given shuffle dependency.
  22. createShuffleMapStage(shuffleDep, firstJobId)
  23. }
  24. }

createShuffleMapStage

  1. def createShuffleMapStage[K, V, C](
  2. shuffleDep: ShuffleDependency[K, V, C], jobId: Int): ShuffleMapStage = {
  3. val rdd = shuffleDep.rdd
  4. checkBarrierStageWithDynamicAllocation(rdd)
  5. checkBarrierStageWithNumSlots(rdd)
  6. checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions)
  7. val numTasks = rdd.partitions.length
  8. val parents = getOrCreateParentStages(rdd, jobId)
  9. val id = nextStageId.getAndIncrement()
  10. // new一个ShuffleMapStage
  11. val stage = new ShuffleMapStage(
  12. id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)
  13. stageIdToStage(id) = stage
  14. shuffleIdToMapStage(shuffleDep.shuffleId) = stage
  15. updateJobIdStageIdMaps(jobId, stage)
  16. if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
  17. // Kind of ugly: need to register RDDs with the cache and map output tracker here
  18. // since we can't do it in the RDD constructor because # of partitions is unknown
  19. logInfo(s"Registering RDD ${rdd.id} (${rdd.getCreationSite}) as input to " +
  20. s"shuffle ${shuffleDep.shuffleId}")
  21. mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
  22. }
  23. stage
  24. }

4、RDD任务划分

RDD 任务切分中间分为:Application、Job、Stage 和 Task

  • Application:初始化一个 SparkContext 即生成一个 Application;
  • Job:一个 Action 算子就会生成一个 Job;
  • Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1;
  • Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数。

注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系。

image.png

源码解读

阶段划分完成之后,在handleJobSubmitted中,最后会提交任务

  1. submitStage(finalStage)

获取未提交的阶段,提交任务

  1. private def submitStage(stage: Stage): Unit = {
  2. val jobId = activeJobForStage(stage)
  3. if (jobId.isDefined) {
  4. logDebug(s"submitStage($stage (name=${stage.name};" +
  5. s"jobs=${stage.jobIds.toSeq.sorted.mkString(",")}))")
  6. if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
  7. // 获取未提交的阶段,根据id排个序
  8. val missing = getMissingParentStages(stage).sortBy(_.id)
  9. logDebug("missing: " + missing)
  10. // 如果获取到了所有阶段则提交任务
  11. if (missing.isEmpty) {
  12. logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
  13. // 提交任务
  14. submitMissingTasks(stage, jobId.get)
  15. } else {
  16. // 否则遍历所有未提交的阶段,先提交这些阶段
  17. for (parent <- missing) {
  18. submitStage(parent)
  19. }
  20. waitingStages += stage
  21. }
  22. }
  23. } else {
  24. abortStage(stage, "No active job for stage " + stage.id, None)
  25. }
  26. }

来到submitMissingTasks提交任务中,根据任务类型的不同划分不同的任务种类,最终汇总到partitionsToCompute中,它的数量就是执行的任务的数量;。

  1. private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
  2. logDebug("submitMissingTasks(" + stage + ")")
  3. // Before find missing partition, do the intermediate state clean work first.
  4. // The operation here can make sure for the partially completed intermediate stage,
  5. // `findMissingPartitions()` returns all partitions every time.
  6. stage match {
  7. // 如果是shuffle阶段
  8. case sms: ShuffleMapStage if stage.isIndeterminate && !sms.isAvailable =>
  9. mapOutputTracker.unregisterAllMapOutput(sms.shuffleDep.shuffleId)
  10. case _ =>
  11. }
  12. // Figure out the indexes of partition ids to compute.
  13. val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
  14. // Use the scheduling pool, job group, description, etc. from an ActiveJob associated
  15. // with this Stage
  16. val properties = jobIdToActiveJob(jobId).properties
  17. runningStages += stage
  18. // SparkListenerStageSubmitted should be posted before testing whether tasks are
  19. // serializable. If tasks are not serializable, a SparkListenerStageCompleted event
  20. // will be posted, which should always come after a corresponding SparkListenerStageSubmitted
  21. // event.
  22. // ===判断阶段是哪个阶段
  23. stage match {
  24. // 是shuffle,就提交shuffle阶段
  25. case s: ShuffleMapStage =>
  26. outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
  27. // 是ResultStage,就提交阶段
  28. case s: ResultStage =>
  29. outputCommitCoordinator.stageStart(
  30. stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
  31. }
  32. val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
  33. stage match {
  34. case s: ShuffleMapStage =>
  35. partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
  36. case s: ResultStage =>
  37. partitionsToCompute.map { id =>
  38. val p = s.partitions(id)
  39. (id, getPreferredLocs(stage.rdd, p))
  40. }.toMap
  41. }
  42. } catch {
  43. case NonFatal(e) =>
  44. stage.makeNewStageAttempt(partitionsToCompute.size)
  45. listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
  46. abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
  47. runningStages -= stage
  48. return
  49. }
  50. stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
  51. // If there are tasks to execute, record the submission time of the stage. Otherwise,
  52. // post the even without the submission time, which indicates that this stage was
  53. // skipped.
  54. if (partitionsToCompute.nonEmpty) {
  55. stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
  56. }
  57. listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
  58. // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
  59. // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
  60. // the serialized copy of the RDD and for each task we will deserialize it, which means each
  61. // task gets a different copy of the RDD. This provides stronger isolation between tasks that
  62. // might modify state of objects referenced in their closures. This is necessary in Hadoop
  63. // where the JobConf/Configuration object is not thread-safe.
  64. var taskBinary: Broadcast[Array[Byte]] = null
  65. var partitions: Array[Partition] = null
  66. try {
  67. // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
  68. // For ResultTask, serialize and broadcast (rdd, func).
  69. var taskBinaryBytes: Array[Byte] = null
  70. // taskBinaryBytes and partitions are both effected by the checkpoint status. We need
  71. // this synchronization in case another concurrent job is checkpointing this RDD, so we get a
  72. // consistent view of both variables.
  73. RDDCheckpointData.synchronized {
  74. taskBinaryBytes = stage match {
  75. case stage: ShuffleMapStage =>
  76. JavaUtils.bufferToArray(
  77. closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
  78. case stage: ResultStage =>
  79. JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
  80. }
  81. partitions = stage.rdd.partitions
  82. }
  83. if (taskBinaryBytes.length > TaskSetManager.TASK_SIZE_TO_WARN_KIB * 1024) {
  84. logWarning(s"Broadcasting large task binary with size " +
  85. s"${Utils.bytesToString(taskBinaryBytes.length)}")
  86. }
  87. taskBinary = sc.broadcast(taskBinaryBytes)
  88. } catch {
  89. // In the case of a failure during serialization, abort the stage.
  90. case e: NotSerializableException =>
  91. abortStage(stage, "Task not serializable: " + e.toString, Some(e))
  92. runningStages -= stage
  93. // Abort execution
  94. return
  95. case e: Throwable =>
  96. abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))
  97. runningStages -= stage
  98. // Abort execution
  99. return
  100. }
  101. // =====在这里其实有了任务的概念,根据任务类型的不同去执行不同任务
  102. val tasks: Seq[Task[_]] = try {
  103. val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
  104. stage match {
  105. // 如果是shuffle任务,就去new一个ShuffleMapTask
  106. case stage: ShuffleMapStage =>
  107. stage.pendingPartitions.clear()
  108. // ============= 就看这个partitionsToComput中的数量了,有多少就起多少任务
  109. partitionsToCompute.map { id =>
  110. val locs = taskIdToLocations(id)
  111. val part = partitions(id)
  112. stage.pendingPartitions += id
  113. new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
  114. taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
  115. Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
  116. }
  117. // 如果是ResultStage任务,就去new一个ResultTask
  118. case stage: ResultStage =>
  119. partitionsToCompute.map { id =>
  120. val p: Int = stage.partitions(id)
  121. val part = partitions(p)
  122. val locs = taskIdToLocations(id)
  123. new ResultTask(stage.id, stage.latestInfo.attemptNumber,
  124. taskBinary, part, locs, id, properties, serializedTaskMetrics,
  125. Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
  126. stage.rdd.isBarrier())
  127. }
  128. }
  129. } catch {
  130. case NonFatal(e) =>
  131. abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
  132. runningStages -= stage
  133. return
  134. }
  135. if (tasks.nonEmpty) {
  136. logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
  137. s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
  138. taskScheduler.submitTasks(new TaskSet(
  139. tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
  140. } else {
  141. // Because we posted SparkListenerStageSubmitted earlier, we should mark
  142. // the stage as completed here in case there are no tasks to run
  143. markStageAsFinished(stage, None)
  144. stage match {
  145. case stage: ShuffleMapStage =>
  146. logDebug(s"Stage ${stage} is actually done; " +
  147. s"(available: ${stage.isAvailable}," +
  148. s"available outputs: ${stage.numAvailableOutputs}," +
  149. s"partitions: ${stage.numPartitions})")
  150. markMapStageJobsAsFinished(stage)
  151. case stage : ResultStage =>
  152. logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")
  153. }
  154. submitWaitingChildStages(stage)
  155. }
  156. }

计算任务数量

  1. // Figure out the indexes of partition ids to compute.
  2. val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

来到抽象类Stage中

  1. def findMissingPartitions(): Seq[Int]

抽象类就有具体的实现
image.png
ResultStage中

  1. override def findMissingPartitions(): Seq[Int] = {
  2. val job = activeJob.get
  3. // 遍历所有分区的数量。
  4. (0 until job.numPartitions).filter(id => !job.finished(id))
  5. }

ShuffleMapStage中也是一样的流程,获取分区数量

  1. override def findMissingPartitions(): Seq[Int] = {
  2. mapOutputTrackerMaster
  3. .findMissingPartitions(shuffleDep.shuffleId)
  4. .getOrElse(0 until numPartitions)
  5. }

总结:有多少分区就有多少任务执行。

5、RDD持久化

如果我们想要对一个rdd进行重复使用,比如需要进行reduceByKey,还要进行groupByKey操作,这就涉及到了可重用的问题了。

1、cache&persist

  1. object Persist01 {
  2. def main(args: Array[String]): Unit = {
  3. val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("persist"))
  4. // wc
  5. val rdd: RDD[String] = sc.makeRDD(List("Hello Spark", "Hello Scala"))
  6. val words: RDD[String] = rdd.flatMap(_.split(" "))
  7. val map: RDD[(String, Int)] = words.map(
  8. word => {
  9. println("************")
  10. (word, 1)
  11. }
  12. )
  13. val wordCount: RDD[(String, Int)] = map.reduceByKey(_ + _)
  14. // 第一次收集数据,做reduceByKey操作
  15. wordCount.collect().foreach(println)
  16. println("=======================================================")
  17. // 如果想要进行第二个操作的时候,就需要再次重复创建一个rdd,执行相关操作。但是这样的效率非常低下。
  18. val rdd1: RDD[String] = sc.makeRDD(List("Hello Spark", "Hello Scala"))
  19. val words1: RDD[String] = rdd1.flatMap(_.split(" "))
  20. val map1: RDD[(String, Int)] = words1.map(
  21. word => {
  22. println("************")
  23. (word, 1)
  24. }
  25. )
  26. val wc: RDD[(String, Iterable[Int])] = map1.groupByKey()
  27. wc.collect.foreach(println)
  28. sc.stop()
  29. }
  30. }

但是这样子的效率非常低不如我们可以复用rdd

  1. // 重用同一个map的结果
  2. val wc: RDD[(String, Iterable[Int])] = map.groupByKey()
  3. wc.collect.foreach(println)
  4. sc.stop()

但是我们说过了,RDD本身是不保存数据的,但是rdd对象是可以重用的。
rdd既然不保存数据,那么在上一次操作完成就应该清空了数据,我们第二次可以用到,是因为如果rdd进行复用,会将之前获取数据的流程重新走一遍。

所以我们的结果中打印了两遍*号。

但是多次走了这些获取数据的流程,我们执行的效率就会降低,所以我们有没有办法将第一次rdd获取到数据的时候,将结果保存下来给第二次使用呢?

当然可以,可以通过cache()方法在内存中缓存,也可以持久化到磁盘中。

  1. object Persist01 {
  2. def main(args: Array[String]): Unit = {
  3. val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("persist"))
  4. // wc
  5. val rdd: RDD[String] = sc.makeRDD(List("Hello Spark", "Hello Scala"))
  6. val words: RDD[String] = rdd.flatMap(_.split(" "))
  7. val map: RDD[(String, Int)] = words.map(
  8. word => {
  9. println("************")
  10. (word, 1)
  11. }
  12. )
  13. val wordCount: RDD[(String, Int)] = map.reduceByKey(_ + _)
  14. // 第一次收集数据,做reduceByKey操作
  15. // 如果想要进行第二个操作的时候,就需要再次重复创建一个rdd,执行相关操作。但是这样的效率非常低下。
  16. // 调用cache或者persist方法在内存中缓存,或者持久化到磁盘
  17. // map.cache()
  18. map.persist()
  19. wordCount.collect().foreach(println)
  20. println("=======================================================")
  21. // 重用同一个map的结果
  22. val wc: RDD[(String, Iterable[Int])] = map.groupByKey()
  23. wc.collect.foreach(println)
  24. sc.stop()
  25. }
  26. }

那cache和persist其实就是一个方法。
但是这些方法并不会立即缓存,而是在后面触发行动算子的时候才会执行缓存。

  1. /**
  2. * Persist this RDD with the default storage level (`MEMORY_ONLY`).
  3. */
  4. def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
  5. /**
  6. * Persist this RDD with the default storage level (`MEMORY_ONLY`).
  7. */
  8. def cache(): this.type = persist()

image.png
设置为持久化到磁盘中

  1. map.persist(StorageLevel.DISK_ONLY)

持久化到磁盘的数据,在Job执行完成之后会被删除

2、checkpoint

检查点的设立让那些血缘关系比较长的对象,在过程中可能出现错误的情况有了余地。

这里请注意:检查点会被持久化到磁盘中,并且在Job执行完成之后不会被删除的!
**

  1. // 设置检查点保存的路径,可以使分布式存储系统如HDFS:Ceph,NFS等
  2. sc.setCheckpointDir("cp/xxx")
  3. map.checkpoint()

3、区别

cache

  • 将数据临时存储在内存中进行数据重用
  • 在血缘关系中添加新的依赖。一旦出现问题,可以从头读取数据。

persist

  • 将数据临时存储在磁盘文件中进行数据重用
  • 涉及到磁盘IO,性能较低,但是数据安全
  • 如果作业执行完毕,临时保存数据文件就会被删除

checkpoint

  • 将数据长久地保存在磁盘文件中进行数据重用
  • 涉及到磁盘IO,性能较低,但是数据安全
  • 为了保证数据安全,所以一般情况下,会独立执行作业
  • 执行过程中,会切断血缘关系,重新建立新的血缘关系
  • checkpoint等于改变数据源

6、RDD分区器

Spark 目前支持 Hash 分区和 Range 分区,和用户自定义分区。Hash 分区为当前的默认
分区。分区器直接决定了 RDD 中分区的个数、RDD 中每条数据经过 Shuffle 后进入哪个分
区,进而决定了 Reduce 的个数。

  • 只有 Key-Value 类型的 RDD 才有分区器,非 Key-Value 类型的 RDD 分区的值是 None
  • 每个 RDD 的分区 ID 范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的。

我们前面已经见过了自带的HashPartitioner和RangePartitioner等,如果我们想要自己指定分区规则的话怎么办呢。可以自己继承Partitioner类,自定义getPartition方法实现分区逻辑。

  1. abstract class Partitioner extends Serializable {
  2. def numPartitions: Int
  3. def getPartition(key: Any): Int
  4. }
  1. class CustomPartitioner extends Partitioner {
  2. // 分区数量
  3. override def numPartitions: Int = 3
  4. // 根据数据的key返回数据所在的分区索引(从0开始)
  5. override def getPartition(key: Any): Int = {
  6. key match {
  7. case "it666" => 0
  8. case "itdachang" => 1
  9. case "guigu" => 2
  10. case _ => 0
  11. }
  12. }
  13. }
  14. object CustomPartitioner {
  15. def main(args: Array[String]): Unit = {
  16. val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("customPartitioer"))
  17. val rdd: RDD[(String, String)] = sc.makeRDD(List(
  18. ("it666", "6666"),
  19. ("itdachang", "5555"),
  20. ("guigu", "22222"),
  21. ("25354", "22222"),
  22. ),3)
  23. val value: RDD[(String, String)] = rdd.partitionBy(new CustomPartitioner)
  24. value.saveAsTextFile("datas/output")
  25. sc.stop()
  26. }
  27. }

7、RDD文件读取与保存

Spark 的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。
文件格式分为:text 文件、csv 文件、sequence 文件以及 Object 文件;
文件系统分为:本地文件系统、HDFS、HBASE 以及数据库。

  • text

    1. // 读取输入文件
    2. val inputRDD: RDD[String] = sc.textFile("input/1.txt")
    3. // 保存数据
    4. inputRDD.saveAsTextFile("output")
  • sequence

SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 对而设计的一种平面文件(Flat File)。在 SparkContext 中,可以调用 sequenceFilekeyClass, valueClass

  1. // 保存数据为 SequenceFile
  2. dataRDD.saveAsSequenceFile("output")
  3. // 读取 SequenceFile文件
  4. sc.sequenceFile[Int,Int]("output").collect().foreach(println)
  • object 对象文件
  1. // 保存数据
  2. dataRDD.saveAsObjectFile("output")
  3. // 读取数据
  4. sc.objectFile[Int]("output").collect().foreach(println)

二、累加器

分布式共享只写变量

累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,传回 Driver 端进行 merge

我们有时候完成一个数据的累加操作的时候,可能用到reduce方法,但是这个方法是一个分布式计算的方法,会被Driver传递到Executor端进行计算,最终的数据在Executor端是进行了累加,但是Driver端的这个变量依旧是0.
如果我们想要实现该功能,可以使用累加器这种数据结构。

1、系统累加器

系统系统了三种累加器

  • longAccumulator
  • doubleAccumulator
  • collectionAccumulator

    1. object AccTest01 {
    2. def main(args: Array[String]): Unit = {
    3. val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("acc"))
    4. val rdd = sc.makeRDD(List(1, 2, 3, 4))
    5. var sum = 0
    6. // 不可取!
    7. // rdd.foreach(sum += _)
    8. // 使用累加器
    9. // longAccumulator
    10. // sc.doubleAccumulator
    11. // sc.collectionAccumulator : java.util.List
    12. val sumAccumulator: LongAccumulator = sc.longAccumulator("sum")
    13. rdd.foreach(sumAccumulator.add(_))
    14. println(sumAccumulator.value)
    15. sc.stop()
    16. }
    17. }

    2、问题演示

    1. object AccTest02 {
    2. def main(args: Array[String]): Unit = {
    3. val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("acc"))
    4. val rdd = sc.makeRDD(List(1, 2, 3, 4))
    5. val sumAccumulator: LongAccumulator = sc.longAccumulator("sum")
    6. val mappedRdd: RDD[Unit] = rdd.map(
    7. num => {
    8. sumAccumulator.add(num)
    9. }
    10. )
    11. // 少加:抓换算子中调用累加器,如果没有行动算子的话不会执行
    12. // 多加:转换算子中调用累加器,如果行动算子出现多次的话会多次执行。
    13. mappedRdd.collect
    14. mappedRdd.collect
    15. println(sumAccumulator.value)
    16. sc.stop()
    17. }
    18. }

    所以一般累加器用在行动算子中

    3、自定义累加器

1、继承AccumulatorV2,定义输入输出泛型
2、实现方法
3、在sparkContext中注册

  1. object AccTest03 {
  2. def main(args: Array[String]): Unit = {
  3. val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("acc"))
  4. val rdd: RDD[String] = sc.makeRDD(List("hello", "spark", "hello"))
  5. // 自定义累加器
  6. val wcAcc = new WordCountAccumulator()
  7. // 向spark注册
  8. sc.register(wcAcc,"wordCountAccumulator")
  9. rdd.foreach(
  10. wcAcc.add
  11. )
  12. println(wcAcc.value)
  13. // 使用
  14. sc.stop()
  15. }
  16. /**
  17. * wordCount的累加器,传入一个单词,计算数量
  18. */
  19. class WordCountAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {
  20. private var map = mutable.Map[String, Long]()
  21. // 判断累加器是否为0
  22. override def isZero: Boolean = map.isEmpty
  23. // 如何复制
  24. override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = new WordCountAccumulator
  25. // 重置累加器
  26. override def reset(): Unit = map.clear
  27. // 添加数据
  28. override def add(word: String): Unit = {
  29. map.update(word, map.getOrElse(word, 0L) + 1)
  30. }
  31. // 合并数据,Driver端合并两个累加器的数据
  32. override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
  33. val map1 = this.map
  34. val map2 = other.value
  35. map2.foreach {
  36. case (word, count) => {
  37. val newCount: Long = map1.getOrElse(word, 0L) + count
  38. map1.update(word, newCount)
  39. }
  40. }
  41. }
  42. // 获取累加器的值
  43. override def value: mutable.Map[String, Long] = this.map
  44. }
  45. }

三、广播变量

分布式共享只读变量。

如果我们想要将两个map获取并集,可以使用join完成。但是join存在一个非常大的缺点,会走shuffle,导致性能下降。

我们能不能够不走shuffle完成这个操作呢?那就需要用到map了。对每一个数据进行映射。

sc.broadcast(data: T)
broadcast.value获取广播变量

  1. object BroadCastTest {
  2. def main(args: Array[String]): Unit = {
  3. val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("broadcast"))
  4. //
  5. val rdd: RDD[(String, Int)] = sc.makeRDD(List(
  6. ("a", 1),
  7. ("b", 2),
  8. ("c", 3)
  9. ))
  10. val map: mutable.Map[String, Long] = mutable.Map[String, Long](("a", 4), ("b", 5), ("c", 6))
  11. // 不走join,没有shuffle操作,但是
  12. // 闭包数据都是以Task为单位发送的,每个任务中包含闭包数据
  13. // Executor就是一个JVM,在Executor端进行多个Task,每个Task的变量都将这个map存了一份
  14. // 可以使用广播变量代替,将数据放在Executor的内存中,每个Task共享
  15. // val value: RDD[(String, (Int, Long))] = rdd.map {
  16. // case (w, c) => {
  17. // val l: Long = map.getOrElse(w, 0L)
  18. // (w, (c, l))
  19. // }
  20. // }
  21. // 创建广播变量
  22. val broadcast: Broadcast[mutable.Map[String, Long]] = sc.broadcast(map)
  23. val value: RDD[(String, (Int, Long))] = rdd.map {
  24. case (w, c) => {
  25. // 使用广播变量
  26. val l: Long = broadcast.value.getOrElse(w, 0L)
  27. (w, (c, l))
  28. }
  29. }
  30. value.collect.foreach(println)
  31. sc.stop()
  32. }
  33. }

四、案例实操

image.png

上面的数据图是从数据文件中截取的一部分内容,表示为电商网站的用户行为数据,主
要包含用户的 4种行为:搜索,点击,下单,支付。数据规则如下:

  • 数据文件中每行数据采用下划线分隔数据
  • 每一行数据表示用户的一次行为,这个行为只能是 4 种行为的一种
  • 如果搜索关键字为 null,表示数据不是搜索数据
  • 如果点击的品类 ID 和产品 ID 为-1,表示数据不是点击数据
  • 针对于下单行为,一次可以下单多个商品,所以品类 ID 和产品 ID 可以是多个,id 之间采用逗号分隔,如果本次不是下单行为,则数据采用 null 表示
  • 支付行为和下单行为类似

image.png
image.png

user_visit_action.txt

1、需求1:Top10热门品类

1、方式1:reduceByKey+cogroup+tuple’s sort rule

1、分割出商品种类、点击数量、下单数量、支付数量
2、将数据按照商品种类进行排列,并且使用元组的比较顺序组合三个数量
3、排序且top n

  1. object HotCategoryTopAnalysis1 {
  2. def main(args: Array[String]): Unit = {
  3. val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("HotCategoryTopAnalysis"))
  4. // 1、读取文件获取原始数据
  5. val originRdd: RDD[String] = sc.textFile("datas/user_visit_action.txt")
  6. // 缓存下
  7. originRdd.cache()
  8. // 2、统计品类的点击数量,过滤没有点击的商品分类
  9. val clickedActionRdd: RDD[String] = originRdd.filter(
  10. data => {
  11. val datas: Array[String] = data.split("_")
  12. datas(6) != "-1"
  13. }
  14. )
  15. // 将每个商品分类切割后,返回word count的形式
  16. val clickedCountRdd: RDD[(String, Int)] = clickedActionRdd.map(action => {
  17. (action.split("_")(6), 1)
  18. }).reduceByKey(_+_)
  19. clickedCountRdd.collect.foreach(println)
  20. println("-------------------------------")
  21. // 3、获取下单的数量
  22. val orderRdd: RDD[String] = originRdd.filter(data => {
  23. val datas: Array[String] = data.split("_")
  24. datas(8) != "null"
  25. })
  26. // 可能同时下单多个,支付也是,所以再此分割,切的是字符串,所以扁平化到最外层
  27. val orderCountRdd: RDD[(String, Int)] = orderRdd.flatMap(data => {
  28. val datas: Array[String] = data.split("_")
  29. val cids: Array[String] = datas(8).split(",")
  30. cids.map((_, 1))
  31. }).reduceByKey(_+_)
  32. orderCountRdd.collect.foreach(println)
  33. println("-------------------------------")
  34. // 4、获取支付的数量
  35. val payedRdd: RDD[String] = originRdd.filter(data => {
  36. val datas: Array[String] = data.split("_")
  37. datas(10) != "null"
  38. })
  39. // 可能同时下单多个,支付也是,所以再此分割,切的是字符串,所以扁平化到最外层
  40. val payCountRdd: RDD[(String, Int)] = payedRdd.flatMap(data => {
  41. val datas: Array[String] = data.split("_")
  42. val cids: Array[String] = datas(10).split(",")
  43. cids.map((_, 1))
  44. }).reduceByKey(_+_)
  45. payCountRdd.collect.foreach(println)
  46. println("-------------------------------")
  47. // 5、根据商品分类合并数量,
  48. // 使用cogroup : connect + group
  49. val cogroupRdd: RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))]
  50. = clickedCountRdd.cogroup(orderCountRdd, payCountRdd)
  51. val resultRdd: RDD[(String, (Int, Int, Int))] = cogroupRdd.mapValues {
  52. // 排序成元组
  53. case (clickIter, orderIter, payIter) => {
  54. // 分别统计每个的数量
  55. var clickCount = 0
  56. var orderCount = 0
  57. var payCount = 0
  58. // 点击不一定下单,下单不一定支付,所以有个顺序判断
  59. val clkIter: Iterator[Int] = clickIter.iterator
  60. if (clkIter.hasNext) {
  61. clickCount = clkIter.next()
  62. }
  63. val odIter: Iterator[Int] = orderIter.iterator
  64. if (odIter.hasNext) {
  65. orderCount = odIter.next()
  66. }
  67. val pyIter: Iterator[Int] = payIter.iterator
  68. if (pyIter.hasNext) {
  69. payCount = pyIter.next()
  70. }
  71. (clickCount,orderCount,payCount)
  72. }
  73. }
  74. resultRdd.collect.foreach(println)
  75. // 6、收集
  76. val res = resultRdd.sortBy(_._2, false).take(10)
  77. res.foreach(println)
  78. sc.stop()
  79. }
  80. }

2、方式2:方式1+cache+map+union

1、最初的数据多次使用,应该进行缓存
2、cogroup有可能出现shuffle,性能较低,考虑替换
将数据结构进行转换,(品类,(0,0,0))
两两聚合

  1. // cogroupRDD的 getDependencies在数据源分区不一致的情况下,会进行shuffle操作
  2. override def getDependencies: Seq[Dependency[_]] = {
  3. rdds.map { rdd: RDD[_] =>
  4. if (rdd.partitioner == Some(part)) {
  5. logDebug("Adding one-to-one dependency with " + rdd)
  6. new OneToOneDependency(rdd)
  7. } else {
  8. logDebug("Adding shuffle dependency with " + rdd)
  9. new ShuffleDependency[K, Any, CoGroupCombiner](
  10. rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part, serializer)
  11. }
  12. }
  13. }

解决方案:

  1. object HotCategoryTopAnalysis2 {
  2. def main(args: Array[String]): Unit = {
  3. val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("HotCategoryTopAnalysis"))
  4. // 1、读取文件获取原始数据
  5. val originRdd: RDD[String] = sc.textFile("datas/user_visit_action.txt")
  6. // 缓存下
  7. originRdd.cache()
  8. // 2、统计品类的点击数量,过滤没有点击的商品分类
  9. val clickedActionRdd: RDD[String] = originRdd.filter(
  10. data => {
  11. val datas: Array[String] = data.split("_")
  12. datas(6) != "-1"
  13. }
  14. )
  15. // 将每个商品分类切割后,返回word count的形式
  16. val clickedCountRdd: RDD[(String, Int)] = clickedActionRdd.map(action => {
  17. (action.split("_")(6), 1)
  18. }).reduceByKey(_+_)
  19. clickedCountRdd.collect.foreach(println)
  20. println("-------------------------------")
  21. // 3、获取下单的数量
  22. val orderRdd: RDD[String] = originRdd.filter(data => {
  23. val datas: Array[String] = data.split("_")
  24. datas(8) != "null"
  25. })
  26. // 可能同时下单多个,支付也是,所以再此分割,切的是字符串,所以扁平化到最外层
  27. val orderCountRdd: RDD[(String, Int)] = orderRdd.flatMap(data => {
  28. val datas: Array[String] = data.split("_")
  29. val cids: Array[String] = datas(8).split(",")
  30. cids.map((_, 1))
  31. }).reduceByKey(_+_)
  32. orderCountRdd.collect.foreach(println)
  33. println("-------------------------------")
  34. // 4、获取支付的数量
  35. val payedRdd: RDD[String] = originRdd.filter(data => {
  36. val datas: Array[String] = data.split("_")
  37. datas(10) != "null"
  38. })
  39. // 可能同时下单多个,支付也是,所以再此分割,切的是字符串,所以扁平化到最外层
  40. val payCountRdd: RDD[(String, Int)] = payedRdd.flatMap(data => {
  41. val datas: Array[String] = data.split("_")
  42. val cids: Array[String] = datas(10).split(",")
  43. cids.map((_, 1))
  44. }).reduceByKey(_+_)
  45. payCountRdd.collect.foreach(println)
  46. println("-------------------------------")
  47. // 5、根据商品分类合并数量,
  48. // 使用转换数据结构的方式替换cogroup可能出现shuffle的问题
  49. val rdd1: RDD[(String, (Int, Int, Int))] = clickedCountRdd.map{
  50. case (w,c) => {
  51. (w,(c,0,0))
  52. }
  53. }
  54. val rdd2: RDD[(String, (Int, Int, Int))] = clickedCountRdd.map{
  55. case (w,c) => {
  56. (w,(0, c, 0))
  57. }
  58. }
  59. val rdd3: RDD[(String, (Int, Int, Int))] = clickedCountRdd.map{
  60. case (w,c) => {
  61. (w,(0, 0, c))
  62. }
  63. }
  64. // 使用union操作连接三个rdd
  65. val resultRdd: RDD[(String, (Int, Int, Int))] = rdd1.union(rdd2).union(rdd3)
  66. .reduceByKey((w1,w2) => {
  67. (w1._1 + w2._1,w1._2+w2._2,w1._3+w2._3)
  68. })
  69. // 6、收集
  70. val res = resultRdd.sortBy(_._2, false).take(10)
  71. res.foreach(println)
  72. sc.stop()
  73. }
  74. }

3、方式3:降低shuffle次数

1、既然能够让数据变成这个样子,为什么不在一开始的时候将数据转换成样子呢?
2、存在大量的shuffle操作(reduceByKey)
reduceByKey聚合算子,spark会提供优化,底层做了预聚合、缓存等

  1. object HotCategoryTopAnalysis3 {
  2. def main(args: Array[String]): Unit = {
  3. val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("HotCategoryTopAnalysis"))
  4. // 1、读取文件获取原始数据
  5. val originRdd: RDD[String] = sc.textFile("datas/user_visit_action.txt")
  6. // 缓存下
  7. originRdd.cache()
  8. // 2、将数据转换结构
  9. val convertedRdd: RDD[(String, (Int, Int, Int))] = originRdd.flatMap(
  10. data => {
  11. val datas: Array[String] = data.split("_")
  12. // 统计点击数据
  13. if (datas(6) != "-1") {
  14. List((datas(6), (1, 0, 0)))
  15. }
  16. // 统计下单数据
  17. else if (datas(8) != "null") {
  18. // 切割id
  19. val ids: Array[String] = datas(8).split(",")
  20. ids.map(id => (id, (0, 1, 0)))
  21. }
  22. // 统计支付数据
  23. else if (datas(10) != "null") {
  24. // 切割id
  25. val ids: Array[String] = datas(10).split(",")
  26. ids.map(id => (id, (0, 0, 1)))
  27. }
  28. // 没有数据
  29. else {
  30. Nil
  31. }
  32. }
  33. )
  34. // 3、将数据聚合
  35. val resultRdd: RDD[(String, (Int, Int, Int))] = convertedRdd.reduceByKey(
  36. (w1, w2) => {
  37. (w1._1 + w2._1, w1._2 + w2._2, w1._3 + w2._3)
  38. })
  39. // 4、收集
  40. val res = resultRdd.sortBy(_._2, false).take(10)
  41. res.foreach(println)
  42. sc.stop()
  43. }
  44. }

4、方式4:自定义累加器+封装对象+自定义排序规则=去除shuffle操作

为了不让shuffle发生,使用累加器。

  1. object HotCategoryTopAnalysis4 {
  2. def main(args: Array[String]): Unit = {
  3. val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("HotCategoryTopAnalysis"))
  4. // 1、读取文件获取原始数据
  5. val originRdd: RDD[String] = sc.textFile("datas/user_visit_action.txt")
  6. // 缓存下
  7. originRdd.cache()
  8. // 定义累加器并注册
  9. val categoryAccumulator: HotCategoryAccumulator = new HotCategoryAccumulator
  10. sc.register(categoryAccumulator,"categoryAccumulator")
  11. // 2、将数据转换结构
  12. val convertedRdd = originRdd.foreach(
  13. data => {
  14. val datas: Array[String] = data.split("_")
  15. // 统计点击数据
  16. if (datas(6) != "-1") {
  17. categoryAccumulator.add((datas(6),"click"))
  18. }
  19. // 统计下单数据
  20. else if (datas(8) != "null") {
  21. // 切割id
  22. val ids: Array[String] = datas(8).split(",")
  23. ids.foreach(id => {
  24. categoryAccumulator.add(id,"order")
  25. })
  26. }
  27. // 统计支付数据
  28. else if (datas(10) != "null") {
  29. // 切割id
  30. val ids: Array[String] = datas(10).split(",")
  31. ids.foreach(categoryAccumulator.add(_,"pay"))
  32. }
  33. }
  34. )
  35. // 3、获取到累加器结果
  36. val accVal: mutable.Map[String, HotCategory] = categoryAccumulator.value
  37. // 只获取分类
  38. val categories: mutable.Iterable[HotCategory] = accVal.map(_._2)
  39. // 自定义排序
  40. val res: List[Category] = categories.toList.sortWith(
  41. (s1,s2) => {
  42. if(s1.clickCnt > s2.clickCnt){
  43. true
  44. }else if (s1.clickCnt == s2.clickCnt){
  45. if(s1.orderCnt > s2.orderCnt){
  46. true
  47. }else if (s1.orderCnt == s2.orderCnt){
  48. s1.payCnt > s2.payCnt
  49. }else {
  50. false
  51. }
  52. }else {
  53. false
  54. }
  55. }
  56. )
  57. res.take(10).foreach(println)
  58. sc.stop()
  59. }
  60. // 自定义累加器
  61. // IN : 输出商品分类和行为(点击、下单、支付)
  62. // OUT: 输出结果
  63. class HotCategoryAccumulator extends AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] {
  64. private val map = mutable.Map[String, HotCategory]()
  65. override def isZero: Boolean = map.isEmpty
  66. override def copy(): AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] = new HotCategoryAccumulator
  67. override def reset(): Unit = map.clear
  68. override def add(v: (String, String)): Unit = {
  69. // 从map中拿。
  70. val category: HotCategory = map.getOrElse(v._1, HotCategory(v._1, 0, 0, 0))
  71. val actionType: String = v._2
  72. // 根据类型判断哪种行为
  73. if(actionType == "click"){
  74. category.clickCnt += 1
  75. }else if (actionType == "order"){
  76. category.orderCnt += 1
  77. }else if (actionType == "pay"){
  78. category.payCnt += 1
  79. }
  80. // 放回去
  81. map.update(v._1,category)
  82. }
  83. // 合并两个map
  84. override def merge(other: AccumulatorV2[(String, String), mutable.Map[String, HotCategory]]): Unit = {
  85. val map1 = this.map
  86. val map2 = other.value
  87. // 遍历map2,往map1中放
  88. map2.foreach{
  89. case (cid,category) => {
  90. val ct: HotCategory = map1.getOrElse(cid, HotCategory(cid, 0, 0, 0))
  91. ct.payCnt += category.payCnt
  92. ct.orderCnt += category.orderCnt
  93. ct.clickCnt += category.clickCnt
  94. map1.update(cid,ct)
  95. }
  96. }
  97. }
  98. override def value: mutable.Map[String, HotCategory] = this.map
  99. }
  100. // 自定义对象类,封装三个数量
  101. case class HotCategory(cid: String, var clickCnt: Int, var orderCnt: Int, var payCnt: Int)
  102. }

2、需求2:Top10热门品类中每个品类的活跃Session Top10

在需求一的基础上,增加每个品类用户 session 的点击统计

  1. object HotCategoryTopSessionAnalysis {
  2. def main(args: Array[String]): Unit = {
  3. val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("HotCategoryTopAnalysis"))
  4. val originRdd: RDD[String] = sc.textFile("datas/user_visit_action.txt")
  5. // 缓存下
  6. originRdd.cache()
  7. val top10Ids: Array[String] = top10(originRdd)
  8. // 1. 过滤原始数据,保留点击和前10品类ID
  9. val filterActionRdd = originRdd.filter(
  10. action => {
  11. val datas = action.split("_")
  12. if (datas(6) != "-1") {
  13. top10Ids.contains(datas(6))
  14. } else {
  15. false
  16. }
  17. }
  18. )
  19. // 2. 根据品类ID和sessionid进行点击量的统计
  20. val reduceRdd: RDD[((String, String), Int)] = filterActionRdd.map(
  21. action => {
  22. val datas = action.split("_")
  23. ((datas(6), datas(2)), 1)
  24. }
  25. ).reduceByKey(_ + _)
  26. // 3、结构转换
  27. // (( 品类ID,sessionId ),sum) => ( 品类ID,(sessionId, sum) )
  28. val mappedRdd: RDD[(String, (String, Int))] = reduceRdd.map {
  29. case ((cid, sid), sum) => {
  30. (cid, (sid, sum))
  31. }
  32. }
  33. // 4、对相同分类的rdd进行分组
  34. val res = mappedRdd.groupByKey().mapValues(
  35. iter => {
  36. iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(10)
  37. })
  38. res.collect.foreach(println)
  39. //
  40. sc.stop()
  41. }
  42. // 封装top10
  43. private def top10(originRdd: RDD[String]) = {
  44. val convertedRdd: RDD[(String, (Int, Int, Int))] = originRdd.flatMap(
  45. data => {
  46. val datas: Array[String] = data.split("_")
  47. // 统计点击数据
  48. if (datas(6) != "-1") {
  49. List((datas(6), (1, 0, 0)))
  50. }
  51. // 统计下单数据
  52. else if (datas(8) != "null") {
  53. // 切割id
  54. val ids: Array[String] = datas(8).split(",")
  55. ids.map(id => (id, (0, 1, 0)))
  56. }
  57. // 统计支付数据
  58. else if (datas(10) != "null") {
  59. // 切割id
  60. val ids: Array[String] = datas(10).split(",")
  61. ids.map(id => (id, (0, 0, 1)))
  62. }
  63. // 没有数据
  64. else {
  65. Nil
  66. }
  67. }
  68. )
  69. val resultRdd: RDD[(String, (Int, Int, Int))] = convertedRdd.reduceByKey(
  70. (w1, w2) => {
  71. (w1._1 + w2._1, w1._2 + w2._2, w1._3 + w2._3)
  72. })
  73. // 不需要数量,过滤String
  74. resultRdd.sortBy(_._2, false).take(10).map(_._1)
  75. }
  76. }

3、需求3:页面单跳转换率统计

  • 页面单跳转化率

计算页面单跳转化率,什么是页面单跳转换率,比如一个用户在一次 Session 过程中访问的页面路径 3,5,7,9,10,21,那么页面 3 跳到页面 5 叫一次单跳,7-9 也叫一次单跳,那么单跳转化率就是要统计页面点击的概率。

比如:计算 3-5 的单跳转化率,先获取符合条件的 Session 对于页面 3 的访问次数(PV)为 A,然后获取符合条件的 Session 中访问了页面 3 又紧接着访问了页面 5 的次数为 B,那么 B/A 就是 3-5 的页面单跳转化率。
image.png

  • 统计页面单跳转换率的意义

产品经理和运营总监,可以根据这个指标,去尝试分析,整个网站,产品,各个页面的表现怎么样,是不是需要去优化产品的布局;吸引用户最终可以进入最后的支付页面。
数据分析师,可以此数据做更深一步的计算和分析。企业管理层,可以看到整个公司的网站,各个页面的之间的跳转的表现如何,可以适当调整公司的经营战略或策略。

1、计算分母
2、计算分子
3、计算单跳转换率

  1. object PageForwardRate {
  2. def main(args: Array[String]): Unit = {
  3. val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("PageForwardRate"))
  4. // 1、读取文件获取原始数据
  5. val originRdd: RDD[String] = sc.textFile("datas/user_visit_action.txt")
  6. // 缓存下
  7. // 2、计算分母,直接计算每个页面即可
  8. val mappedRdd: RDD[UserVisitAction] = originRdd.map(
  9. action => {
  10. val datas: Array[String] = action.split("_")
  11. // 返回封装好的对象
  12. UserVisitAction(
  13. datas(0),
  14. datas(1).toLong,
  15. datas(2),
  16. datas(3).toLong,
  17. datas(4),
  18. datas(5),
  19. datas(6).toLong,
  20. datas(7).toLong,
  21. datas(8),
  22. datas(9),
  23. datas(10),
  24. datas(11),
  25. datas(12).toLong
  26. )
  27. }
  28. )
  29. mappedRdd.cache()
  30. // 分母 转换为map
  31. val denominator: Map[Long, Long] = mappedRdd.map(action => (action.page_id, 1L)).reduceByKey(_ + _).collect().toMap
  32. // 3、计算分子
  33. // 根据session分组
  34. val sessionRdd: RDD[(String, Iterable[UserVisitAction])] = mappedRdd.groupBy(_.session_id)
  35. // 根据访问时间升序
  36. val mvRdd: RDD[(String, List[((Long, Long), Int)])] = sessionRdd.mapValues(
  37. iter => {
  38. // 默认升序,排完序后舍弃时间,只要页面
  39. val flowIds: List[Long] = iter.toList.sortBy(_.action_time).map(_.page_id)
  40. // 获取每个页面拉起来,可以用Sliding划窗,也可以用zip拉链
  41. val pages: List[(Long, Long)] = flowIds.zip(flowIds.tail)
  42. pages.map(t => (t,1))
  43. }
  44. )
  45. // 统计跳转的比率
  46. val molecule: RDD[((Long, Long), Int)] = mvRdd.map(_._2).flatMap(list => list).reduceByKey(_ + _)
  47. molecule.foreach {
  48. case ((page1, page2), sum) => {
  49. // 从分母中取
  50. val l: Long = denominator.getOrElse(page1, 0L)
  51. println(s"从${page1} 跳转到${page2} 的单挑转换率为:${sum.toDouble / l}")
  52. }
  53. }
  54. // 分子除以分母
  55. sc.stop()
  56. }
  57. //用户访问动作表
  58. case class UserVisitAction(
  59. date: String, //用户点击行为的日期
  60. user_id: Long, //用户的ID
  61. session_id: String, //Session的ID
  62. page_id: Long, //某个页面的ID
  63. action_time: String, //动作的时间点
  64. search_keyword: String, //用户搜索的关键词
  65. click_category_id: Long, //某一个商品品类的ID
  66. click_product_id: Long, //某一个商品的ID
  67. order_category_ids: String, //一次订单中所有品类的ID集合
  68. order_product_ids: String, //一次订单中所有商品的ID集合
  69. pay_category_ids: String, //一次支付中所有品类的ID集合
  70. pay_product_ids: String, //一次支付中所有商品的ID集合
  71. city_id: Long
  72. ) //城市 id
  73. }

优化:
1、filter过滤需要的页面
2、计算分母最后一个数不参与计算(init)
3、计算单跳转换率的时候,过滤合法的页面跳转

  1. object Spark06_Req3_PageflowAnalysis {
  2. def main(args: Array[String]): Unit = {
  3. // TODO : Top10热门品类
  4. val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
  5. val sc = new SparkContext(sparConf)
  6. val actionRDD = sc.textFile("datas/user_visit_action.txt")
  7. val actionDataRDD = actionRDD.map(
  8. action => {
  9. val datas = action.split("_")
  10. UserVisitAction(
  11. datas(0),
  12. datas(1).toLong,
  13. datas(2),
  14. datas(3).toLong,
  15. datas(4),
  16. datas(5),
  17. datas(6).toLong,
  18. datas(7).toLong,
  19. datas(8),
  20. datas(9),
  21. datas(10),
  22. datas(11),
  23. datas(12).toLong
  24. )
  25. }
  26. )
  27. actionDataRDD.cache()
  28. // TODO 对指定的页面连续跳转进行统计
  29. // 1-2,2-3,3-4,4-5,5-6,6-7
  30. val ids = List[Long](1,2,3,4,5,6,7)
  31. val okflowIds: List[(Long, Long)] = ids.zip(ids.tail)
  32. // TODO 计算分母
  33. val pageidToCountMap: Map[Long, Long] = actionDataRDD.filter(
  34. action => {
  35. ids.init.contains(action.page_id)
  36. }
  37. ).map(
  38. action => {
  39. (action.page_id, 1L)
  40. }
  41. ).reduceByKey(_ + _).collect().toMap
  42. // TODO 计算分子
  43. // 根据session进行分组
  44. val sessionRDD: RDD[(String, Iterable[UserVisitAction])] = actionDataRDD.groupBy(_.session_id)
  45. // 分组后,根据访问时间进行排序(升序)
  46. val mvRDD: RDD[(String, List[((Long, Long), Int)])] = sessionRDD.mapValues(
  47. iter => {
  48. val sortList: List[UserVisitAction] = iter.toList.sortBy(_.action_time)
  49. // 【1,2,3,4】
  50. // 【1,2】,【2,3】,【3,4】
  51. // 【1-2,2-3,3-4】
  52. // Sliding : 滑窗
  53. // 【1,2,3,4】
  54. // 【2,3,4】
  55. // zip : 拉链
  56. val flowIds: List[Long] = sortList.map(_.page_id)
  57. val pageflowIds: List[(Long, Long)] = flowIds.zip(flowIds.tail)
  58. // 将不合法的页面跳转进行过滤
  59. pageflowIds.filter(
  60. t => {
  61. okflowIds.contains(t)
  62. }
  63. ).map(
  64. t => {
  65. (t, 1)
  66. }
  67. )
  68. }
  69. )
  70. // ((1,2),1)
  71. val flatRDD: RDD[((Long, Long), Int)] = mvRDD.map(_._2).flatMap(list=>list)
  72. // ((1,2),1) => ((1,2),sum)
  73. val dataRDD = flatRDD.reduceByKey(_+_)
  74. // TODO 计算单跳转换率
  75. // 分子除以分母
  76. dataRDD.foreach{
  77. case ( (pageid1, pageid2), sum ) => {
  78. val lon: Long = pageidToCountMap.getOrElse(pageid1, 0L)
  79. println(s"页面${pageid1}跳转到页面${pageid2}单跳转换率为:" + ( sum.toDouble/lon ))
  80. }
  81. }
  82. sc.stop()
  83. }
  84. //用户访问动作表
  85. case class UserVisitAction(
  86. date: String,//用户点击行为的日期
  87. user_id: Long,//用户的ID
  88. session_id: String,//Session的ID
  89. page_id: Long,//某个页面的ID
  90. action_time: String,//动作的时间点
  91. search_keyword: String,//用户搜索的关键词
  92. click_category_id: Long,//某一个商品品类的ID
  93. click_product_id: Long,//某一个商品的ID
  94. order_category_ids: String,//一次订单中所有品类的ID集合
  95. order_product_ids: String,//一次订单中所有商品的ID集合
  96. pay_category_ids: String,//一次支付中所有品类的ID集合
  97. pay_product_ids: String,//一次支付中所有商品的ID集合
  98. city_id: Long
  99. )//城市 id
  100. }

五、工程化代码

之前的代码都是写在一个main方法中,就会感觉很乱。需要遵循一定的代码规范。

大数据技术没有view的概念,所以不考虑mvc。

就只能采用conroller-service-dao的三层架构了。

image.png

1、应用只负责运行,并且执行controller调度,所有应用都需要环境等,可以抽象出来

  1. trait TApplication {
  2. // 控制抽象,替换执行逻辑
  3. def start(master: String = "local[*]",appName: String = "Application")(op: => Unit): Unit = {
  4. val sc = new SparkContext(new SparkConf().setMaster(master).setAppName(appName))
  5. EnvUtils.put(sc)
  6. try {
  7. op
  8. }catch {
  9. case e: Exception => println(e.printStackTrace())
  10. }
  11. sc.stop()
  12. EnvUtils.clear()
  13. }
  14. }
  1. object Application extends App with TApplication{
  2. // 跑起来
  3. start(appName = "Test"){
  4. val controller = new ApplicationController
  5. controller.dispatch()
  6. }
  7. }

2、三层逻辑更不用说了,也是可以抽象的
1、controller只负责调度service执行业务逻辑
2、service执行逻辑需要数据源,交给dao获取
3、dao获取数据源的形式多种多样,但是需要sparkContext环境信息,不应层层传递,使用ThreadLocal单独提取
抽象层:

  1. trait TApplication {
  2. // 控制抽象,替换执行逻辑
  3. def start(master: String = "local[*]",appName: String = "Application")(op: => Unit): Unit = {
  4. val sc = new SparkContext(new SparkConf().setMaster(master).setAppName(appName))
  5. EnvUtils.put(sc)
  6. try {
  7. op
  8. }catch {
  9. case e: Exception => println(e.printStackTrace())
  10. }
  11. sc.stop()
  12. EnvUtils.clear()
  13. }
  14. }
  15. // 通用controller
  16. trait TApplicationController {
  17. def dispatch(): Any
  18. }
  19. trait TApplicationService extends Serializable {
  20. def dataAnalysis(): Any
  21. }
  22. trait TAppliactionDao extends Serializable {
  23. def readFile(path: String): Any
  24. }
  1. class ApplicationController extends TApplicationController {
  2. private val svc: ApplicationService = new ApplicationService
  3. override def dispatch(): Unit = {
  4. val res: RDD[Double] = svc.dataAnalysis()
  5. res.collect.foreach(println)
  6. }
  7. }

service

  1. class ApplicationService extends TApplicationService {
  2. private val dao = new ApplicationDao
  3. override def dataAnalysis() = {
  4. // 1、读取文件获取原始数据
  5. val originRdd: RDD[String] = dao.readFile("datas/user_visit_action.txt")
  6. // 缓存下
  7. // 2、计算分母,直接计算每个页面即可
  8. val mappedRdd: RDD[UserVisitAction] = originRdd.map(
  9. action => {
  10. val datas: Array[String] = action.split("_")
  11. // 返回封装好的对象
  12. UserVisitAction(
  13. datas(0),
  14. datas(1).toLong,
  15. datas(2),
  16. datas(3).toLong,
  17. datas(4),
  18. datas(5),
  19. datas(6).toLong,
  20. datas(7).toLong,
  21. datas(8),
  22. datas(9),
  23. datas(10),
  24. datas(11),
  25. datas(12).toLong
  26. )
  27. }
  28. )
  29. mappedRdd.cache()
  30. // 分母 转换为map
  31. val denominator: Map[Long, Long] = mappedRdd.map(action => (action.page_id, 1L)).reduceByKey(_ + _).collect().toMap
  32. // 3、计算分子
  33. // 根据session分组
  34. val sessionRdd: RDD[(String, Iterable[UserVisitAction])] = mappedRdd.groupBy(_.session_id)
  35. // 根据访问时间升序
  36. val mvRdd: RDD[(String, List[((Long, Long), Int)])] = sessionRdd.mapValues(
  37. iter => {
  38. // 默认升序,排完序后舍弃时间,只要页面
  39. val flowIds: List[Long] = iter.toList.sortBy(_.action_time).map(_.page_id)
  40. // 获取每个页面拉起来,可以用Sliding划窗,也可以用zip拉链
  41. val pages: List[(Long, Long)] = flowIds.zip(flowIds.tail)
  42. pages.map(t => (t,1))
  43. }
  44. )
  45. // 统计跳转的比率
  46. val molecule: RDD[((Long, Long), Int)] = mvRdd.map(_._2).flatMap(list => list).reduceByKey(_ + _)
  47. // 采集
  48. val res: RDD[Double] = molecule.collect {
  49. case ((page1, page2), sum) => {
  50. // 从分母中取
  51. val l: Long = denominator.getOrElse(page1, 0L)
  52. println(s"从${page1} 跳转到${page2} 的单挑转换率为:${sum.toDouble / l}")
  53. sum.toDouble / l
  54. }
  55. }
  56. // 返回
  57. res
  58. }
  59. //用户访问动作表
  60. case class UserVisitAction(
  61. date: String, //用户点击行为的日期
  62. user_id: Long, //用户的ID
  63. session_id: String, //Session的ID
  64. page_id: Long, //某个页面的ID
  65. action_time: String, //动作的时间点
  66. search_keyword: String, //用户搜索的关键词
  67. click_category_id: Long, //某一个商品品类的ID
  68. click_product_id: Long, //某一个商品的ID
  69. order_category_ids: String, //一次订单中所有品类的ID集合
  70. order_product_ids: String, //一次订单中所有商品的ID集合
  71. pay_category_ids: String, //一次支付中所有品类的ID集合
  72. pay_product_ids: String, //一次支付中所有商品的ID集合
  73. city_id: Long
  74. ) //城市 id
  75. }

dao

  1. class ApplicationDao extends TAppliactionDao {
  2. override def readFile(path: String): RDD[String] = {
  3. val lines: RDD[String] = EnvUtils.take().textFile(path)
  4. lines
  5. }
  6. }

3、工具类ThreadLocal,要想被使用,就得使用object静态调用

  1. // ThreadLocal线程本地化技术
  2. object EnvUtils {
  3. private val env: ThreadLocal[SparkContext] = new ThreadLocal[SparkContext]
  4. def take(): SparkContext = {
  5. env.get()
  6. }
  7. def put(sc: SparkContext): Unit = {
  8. env.set(sc)
  9. }
  10. def clear(): Unit = {
  11. env.remove()
  12. }
  13. }