广播变量

先来简单介绍下spark中的广播变量:
广播变量允许开发者缓存一个只读的变量在每台机器上面,而不是每个任务保存一份拷贝。例如,利用广播变量,我们能够以一种更有效率的方式将一个大数据量输入集合的副本分配给每个节点。Spark也尝试着利用有效的广播算法去分配广播变量,以减少通信的成本。

一个广播变量可以通过调用SparkContext.broadcast(v)方法从一个初始变量v中创建。广播变量是v的一个包装变量,它的值可以通过value方法访问,下面的代码说明了这个过程:

  1. scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
  2. broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
  3. scala> broadcastVar.value
  4. res0: Array[Int] = Array(1, 2, 3)

从上文我们可以看出广播变量的声明很简单,调用broadcast就能搞定,并且scala中一切可序列化的对象都是可以进行广播的,这就给了我们很大的想象空间,可以利用广播变量将一些经常访问的大变量进行广播,而不是每个任务保存一份,这样可以减少资源上的浪费。

更新广播变量

广播变量可以用来更新一些大的配置变量,比如数据库中的一张表格,那么有这样一个问题,如果数据库当中的配置表格进行了更新,我们需要重新广播变量该怎么做呢。上文对广播变量的说明中,我们知道广播变量是只读的,也就是说广播出去的变量没法再修改,那么我们应该怎么解决这个问题呢?

答案是利用spark中的unpersist函数。

Spark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion. If you would like to manually remove an RDD instead of waiting for it to fall out of the cache, use the RDD.unpersist() method.

上文是从spark官方文档摘抄出来的,我们可以看出,正常来说每个节点的数据是不需要我们操心的,spark会自动按照LRU规则将老数据删除,如果需要手动删除可以调用 unpersist 函数。

那么更新广播变量(rebroadcast)的基本思路:将老的广播变量删除(unpersist),然后重新广播一遍新的广播变量。

  1. public class BroadcastStringPeriodicUpdater {
  2. private static final int PERIOD = 60 * 1000;
  3. private static volatile BroadcastStringPeriodicUpdater instance;
  4. private Broadcast<String> broadcast;
  5. private long lastUpdate = 0L;
  6. private BroadcastStringPeriodicUpdater() {}
  7. public static BroadcastStringPeriodicUpdater getInstance() {
  8. if (instance == null) {
  9. synchronized (BroadcastStringPeriodicUpdater.class) {
  10. if (instance == null) {
  11. instance = new BroadcastStringPeriodicUpdater();
  12. }
  13. }
  14. }
  15. return instance;
  16. }
  17. public String updateAndGet(SparkContext sc) {
  18. long now = System.currentTimeMillis();
  19. long offset = now - lastUpdate;
  20. if (offset > PERIOD || broadcast == null) {
  21. if (broadcast != null) {
  22. broadcast.unpersist();
  23. }
  24. lastUpdate = now;
  25. String value = fetchBroadcastValue();
  26. broadcast = JavaSparkContext.fromSparkContext(sc).broadcast(value);
  27. }
  28. return broadcast.getValue();
  29. }
  30. private String fetchBroadcastValue() {
  31. }
  32. }

用的时候就可以这样用:

  1. String broadcastValue = BroadcastStringPeriodicUpdater.getInstance().updateAndGet(rdd.context());

总结

spark中的共享变量是我们能够在全局做出一些操作,比如 record 总数的统计更新,一些大变量配置项的广播等等。而对于广播变量,我们也可以监控数据库中的变化,做到定时的重新广播新的数据表配置情况。