10.1 累加器
在驱动程序中定义一个变量,使得集群中的每个任务都能访问这个变量。
10.1.1 系统累加器
- SparkContext.accumulator(initialValue)
- 只有drive程序可以读取累加器的值,job端只能累加
- 重复调用Action算子会造成多次累加
10.1.2 自定义累加器
```scala import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.util.AccumulatorV2 import scala.collection.mutable
object customerAccumulatorDemo { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster(“local[*]”).setAppName(“customerAccumulatorDemo”) val sparkContext = new SparkContext(sparkConf)
val accumulator = new LogAccumulator()
sparkContext.register(accumulator,"CustomerAccumulator")
val sum = sparkContext.parallelize(Array("1","2a","3","4b","5","6","7cd","8","9"), 2).filter(word => {
val pattern = "\\d+"
val flag = word.matches(pattern)
if (!flag) accumulator.add(word)
flag
}).map(_.toInt).reduce(_+_)
println(sum)
for (v <- accumulator.value) print(v + "")
}
}
class LogAccumulator extends AccumulatorV2[String, Set[String]] { private var logArray = new mutable.HashSetString
override def isZero: Boolean = logArray.isEmpty
override def copy(): AccumulatorV2[String, Set[String]] = {
val accumulator = new LogAccumulator
accumulator.logArray = this.logArray.clone()
accumulator
}
override def reset(): Unit = logArray.clear()
override def add(v: String): Unit = logArray.add(v)
override def merge(other: AccumulatorV2[String, Set[String]]): Unit = {
other match {
case o:LogAccumulator =>
val iterator = o.logArray.iterator
while (iterator.hasNext) this.logArray.add(iterator.next())
}
}
override def value: Set[String] = logArray.toSet
10.1.2 广播变量
- sc.broadcast(value: T)创建BoardCast[T]类型的广播变量
- 变量会被发送到各个节点,只读(修改这个值不会影响到其他节点)