/*
广播变量应用场景:
1、task需要使用Driver数据的时候【好处: 能够减少数据占用内存空间】
image.png
task任务个数设定:
工作中一般设置task个数为cpu核数的2-3倍(官网建议)
因为如果设置超过这个数,需要运行多批次才能完成任务
如果设置少了,又有cpu空闲,浪费资源
如果刚好设置为与cpu核数相同,那么就会每个cpu性能不一,在一个时段内完成的内容不同,也会造成浪费
广播变量:少了executorcores官方建议倍数,这里是5core3倍
广播变量.png
2、大表join小表 【好处: 能够减少shuffle操作】
1)使用join的时候涉及到两个rdd的数据聚合操作,产生两个shuffler
2)使用广播减少shuffler:
将driver的广播数据收集好
之后将数据传递到一个rdd中进行自定义操作,不涉及两个rdd,不会落盘
如何使用:
1、广播数据: val bc = sc.broadcast(数据)
2、task取出数据使用: bc.value

*/

task使用driver的数据

  1. package tcode.day06
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object $02_BroadCast {
  4. /**
  5. * 广播变量应用场景:
  6. * 1、task需要使用Driver数据的时候【好处: 能够减少数据占用内存空间】
  7. * 2、大表join小表 【好处: 能够减少shuffle操作】
  8. * 如何使用:
  9. * 1、广播数据: val bc = sc.broadcast(数据)
  10. * 2、task取出数据使用: bc.value
  11. *
  12. */
  13. //task需要使用Driver数据的时候【好处: 能够减少数据占用内存空间】
  14. def main1(args: Array[String]): Unit = {
  15. val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
  16. val rdd = sc.parallelize(List("jd","taobao","tm","pdd"))
  17. val map = Map("jd"->"http://www.jd.com","taobao"->"http://www.taobao.com","tm"->"http://www.tm.com","pdd"->"http://www.pdd.com")
  18. //使用广播变量之后, map数据占用的内存大小 = executor的个数 * map数据大小
  19. // 广播的内容不是rdd而是数据,所以需要先collect收集再广播,这里的map就是数据,不需要collect
  20. val bc = sc.broadcast(map)
  21. val rdd2 = rdd.map(x=> {
  22. //取出广播变量的值
  23. val value = bc.value
  24. value.getOrElse(x,"")
  25. })
  26. //此时map数据占用的内存大小 = task的个数 * map数据大小
  27. //val rdd2 = rdd.map(x=> map.getOrElse(x,""))
  28. println(rdd2.collect().toList)
  29. }
  30. ------------------------------------------------------------------------------
  31. Thread.sleep(10000000)
  32. }
  33. }

大表join小表

  1. //大表join小表 【好处: 能够减少shuffle操作】
  2. def main(args: Array[String]): Unit = {
  3. val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
  4. val studentRdd = sc.parallelize(List(
  5. ("1001","zhangsan",20,"A"),
  6. ("1002","lisi",20,"B"),
  7. ("1003","wangwu",20,"A"),
  8. ("1004","zhaoliu",30,"C"),
  9. ("1005","qianqi",40,"B")
  10. ))
  11. val classRdd = sc.parallelize(List(
  12. ("A","大数据班"),
  13. ("B","java班"),
  14. ("C","python班")
  15. ))
  16. //需求: 获取每个学生的信息以及所在班级名称
  17. // 没使用广播:需要join,会产生两个shuffler
  18. /* val stuRdd = studentRdd.map{
  19. case (stuId,name,age,classId) => (classId,(stuId,name,age))
  20. }
  21. val rdd: RDD[(String, ((String, String, Int), Option[String]))] = stuRdd.leftOuterJoin(classRdd)
  22. rdd.map{
  23. case (classId,((stuid,name,age),className)) => (stuid,name,age,className.getOrElse(""))
  24. }.foreach(println(_))*/
  25. //广播小表数据后,不需要join
  26. // 广播的内容不是rdd而是数据,所以需要先collect收集再广播
  27. val classMap = classRdd.collect().toMap
  28. val bc = sc.broadcast(classMap)
  29. // 普通jion与广播的区别,
  30. // 广播是将数据放进广播里,执行的时候可以直接将数据传递进一个rdd中,进行自定义操作,不涉及两个rdd所以不会落盘
  31. // join是两个rdd的操作,需要写好落盘过程
  32. studentRdd.map{
  33. case (stuid,name,age,classid) =>
  34. val className = bc.value.getOrElse(classid,"")
  35. (stuid,name,age,className)
  36. }.foreach(println(_))