举例子

假如Driver说有个BigArr ,这个变量数据很大,在运行的时候需要在Executor进程里面运行,那么就需要将这个BigArr分发过去.
假如说每个Executor有两个线程,每个线程有一个BigArr,如果BigArr占用1G内存的话,那么一个Executor进程就会有2个G的BigArr
如果程序只是对BigArr进行读操作而不进行修改操作,这个时候完全可以搞一个BigArr就可以了.
image.png

解决办法

我们可以给每个Executor里面的每个线程的BigArr都放到Executor进程里面,让多个线程共享进程里面的BigArr,这样节省了系统内存资源. 这就是广播变量.

image.png

广播变量概念

广播变量在每个节点上保存一个只读的变量的缓存, 而不用给每个 task 来传送一个 copy.这样降低了通讯的成本,
广播变量通过调用SparkContext.broadcast(v)来创建. 广播变量是对v的包装, 通过调用广播变量的 value方法可以访问.
广播变量只会被发到各个节点一次,应作为只读值处理(修改广播变量的值不会影响到别的节点).

代码演示

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object demo {
  4. def main(args: Array[String]): Unit = {
  5. val bigArr = 1 to 1000 toArray
  6. val conf: SparkConf = new SparkConf().setAppName("BroadCastDemo1").setMaster("local[2]")
  7. val sc: SparkContext = new SparkContext(conf)
  8. // 广播出去.
  9. val bd = sc.broadcast(bigArr)
  10. val list1 = List(30, 50000000, 70, 600000, 10, 20)
  11. val rdd1: RDD[Int] = sc.parallelize(list1, 4)
  12. //获取 广播变量的引用 , 使用广播变量的value值就可以获取到广播变量内部封装的值
  13. // 过滤rdd1和bigArr有交集的变量
  14. val rdd2 = rdd1.filter(x => bd.value.contains(x))
  15. rdd2.collect.foreach(println)
  16. Thread.sleep(1000000)
  17. sc.stop()
  18. }
  19. }

输出

  1. 30
  2. 70
  3. 10
  4. 20