10.1 累加器

在驱动程序中定义一个变量,使得集群中的每个任务都能访问这个变量。

10.1.1 系统累加器

  • SparkContext.accumulator(initialValue)
  • 只有drive程序可以读取累加器的值,job端只能累加
  • 重复调用Action算子会造成多次累加

    10.1.2 自定义累加器

    ```scala import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.util.AccumulatorV2 import scala.collection.mutable

object customerAccumulatorDemo { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster(“local[*]”).setAppName(“customerAccumulatorDemo”) val sparkContext = new SparkContext(sparkConf)

  1. val accumulator = new LogAccumulator()
  2. sparkContext.register(accumulator,"CustomerAccumulator")
  3. val sum = sparkContext.parallelize(Array("1","2a","3","4b","5","6","7cd","8","9"), 2).filter(word => {
  4. val pattern = "\\d+"
  5. val flag = word.matches(pattern)
  6. if (!flag) accumulator.add(word)
  7. flag
  8. }).map(_.toInt).reduce(_+_)
  9. println(sum)
  10. for (v <- accumulator.value) print(v + "")
  11. }

}

class LogAccumulator extends AccumulatorV2[String, Set[String]] { private var logArray = new mutable.HashSetString

override def isZero: Boolean = logArray.isEmpty

override def copy(): AccumulatorV2[String, Set[String]] = {
    val accumulator = new LogAccumulator
    accumulator.logArray = this.logArray.clone()
    accumulator
}

override def reset(): Unit = logArray.clear()

override def add(v: String): Unit = logArray.add(v)

override def merge(other: AccumulatorV2[String, Set[String]]): Unit = {
    other match {
        case o:LogAccumulator =>
            val iterator = o.logArray.iterator
            while (iterator.hasNext) this.logArray.add(iterator.next())
    }
}

override def value: Set[String] = logArray.toSet

} ```

10.1.2 广播变量

  • sc.broadcast(value: T)创建BoardCast[T]类型的广播变量
  • 变量会被发送到各个节点,只读(修改这个值不会影响到其他节点)