Flink 支持广播变量,就是将数据广播到具体的 taskmanager 上,数据存储在内存中,这样可以减缓大量的 shuffle 操作;
    比如在数据 join 阶段,不可避免的就是大量的 shuffle 操作,我们可以把其中一个 dataSet 广播出去,一直加载到 taskManager 的内存中,可以直接在内存中拿数据,避免了大量的 shuffle,导致集群性能下降;
    广播变量创建后,它可以运行在集群中的任何 function 上,而不需要多次传递给集群节点。另外需要记住,不应该修改广播变量,这样才能确保每个节
    点获取到的值都是一致的。
    一句话解释,可以理解为是一个公共的共享变量,我们可以把一个 dataset数据集广播出去,然后不同的 task 在节点上都能够获取到,这个数据在每个节
    点上只会存在一份。如果不使用 broadcast,则在每个节点中的每个 task 中都需要拷贝一份 dataset 数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset 数据)。
    注意:因为广播变量是要把 dataset 广播到内存中,所以广播的数据量不能太大,否则会出现 OOM 这样的问题

    • Broadcast:Broadcast 是通过 withBroadcastSet(dataset,string)来注册的
    • Access:通过 getRuntimeContext().getBroadcastVariable(String)访问广播变量

    操作步骤

    1. 1:初始化数据
    2. DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3)
    3. 2:广播数据
    4. .withBroadcastSet(toBroadcast, "broadcastSetName");
    5. 3:获取数据
    6. Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");

    例子:

    1. package dh.flink.com.dataset.transform;
    2. import java.util.ArrayList;
    3. import java.util.Collection;
    4. import java.util.List;
    5. import org.apache.flink.api.common.functions.RichMapFunction;
    6. import org.apache.flink.api.java.DataSet;
    7. import org.apache.flink.api.java.ExecutionEnvironment;
    8. import org.apache.flink.configuration.Configuration;
    9. public class BroadCastTest {
    10. public static void main(final String[] args) throws Exception{
    11. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    12. //1.封装一个DataSet
    13. DataSet<Integer> broadcast = env.fromElements(1, 2, 3);
    14. DataSet<String> data = env.fromElements("a", "b");
    15. data.map(new RichMapFunction<String, String>() {
    16. private List list = new ArrayList();
    17. @Override
    18. public void open(final Configuration parameters) throws Exception {
    19. // 3. 获取广播的DataSet数据 作为一个Collection
    20. //通过getRuntimeContext().getBroadcastVariable(String)访问广播变量
    21. Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("number");
    22. list.addAll(broadcastSet);
    23. }
    24. @Override
    25. public String map(final String value) throws Exception {
    26. return value + ": "+ list;
    27. }
    28. }).withBroadcastSet(broadcast, "number") //Broadcast是通过withBroadcastSet(dataset,string)来注册的
    29. // 2. 广播的broadcast
    30. .printToErr();//打印到err方便查看
    31. }
    32. }
    1. package dh.flink.com.dataset.transform.guangbo;
    2. import java.util.ArrayList;
    3. import java.util.HashMap;
    4. import java.util.List;
    5. import org.apache.flink.api.common.functions.MapFunction;
    6. import org.apache.flink.api.common.functions.RichMapFunction;
    7. import org.apache.flink.api.java.DataSet;
    8. import org.apache.flink.api.java.ExecutionEnvironment;
    9. import org.apache.flink.api.java.operators.DataSource;
    10. import org.apache.flink.api.java.tuple.Tuple2;
    11. import org.apache.flink.configuration.Configuration;
    12. /**
    13. * 广播
    14. *
    15. * @author duhai
    16. * @date 2020年1月10日
    17. */
    18. public class BatchDemoBroadcast {
    19. public static void main(final String[] args) throws Exception {
    20. // 获取运行环境
    21. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    22. // 1:准备需要广播的数据
    23. ArrayList<Tuple2<String, Integer>> broadData = new ArrayList<Tuple2<String, Integer>>();
    24. broadData.add(new Tuple2<String, Integer>("zs", 18));
    25. broadData.add(new Tuple2<String, Integer>("ls", 20));
    26. broadData.add(new Tuple2<String, Integer>("ww", 17));
    27. DataSet<Tuple2<String, Integer>> tupleData = env.fromCollection(broadData);
    28. // 1.1:处理需要广播的数据,把数据集转换成map类型,map中的key就是用户姓名,value就是用户年龄
    29. DataSet<HashMap<String, Integer>> toBroadcast = tupleData
    30. .map(new MapFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() {
    31. public HashMap<String, Integer> map(final Tuple2<String, Integer> value) throws Exception {
    32. HashMap<String, Integer> res = new HashMap<String, Integer>();
    33. res.put(value.f0, value.f1);
    34. return res;
    35. }
    36. });
    37. // 源数据
    38. DataSource<String> data = env.fromElements("zs", "ls", "ww");
    39. // 注意:在这里需要使用到RichMapFunction获取广播变量
    40. DataSet<String> result = data.map(new RichMapFunction<String, String>() {
    41. List<HashMap<String, Integer>> broadCastMap = new ArrayList<HashMap<String, Integer>>();
    42. HashMap<String, Integer> allMap = new HashMap<String, Integer>();
    43. /**
    44. * 这个方法只会执行一次 可以在这里实现一些初始化的功能。所以,就可以在open方法中获取广播变量数据
    45. *
    46. */
    47. @Override
    48. public void open(final Configuration parameters) throws Exception {
    49. super.open(parameters);
    50. // 3:获取广播数据
    51. this.broadCastMap = getRuntimeContext().getBroadcastVariable("broadCastMapName");
    52. for (HashMap map : broadCastMap) {
    53. allMap.putAll(map);
    54. }
    55. }
    56. @Override
    57. public String map(final String value) throws Exception {
    58. Integer age = allMap.get(value);
    59. return value + "," + age;
    60. }
    61. }).withBroadcastSet(toBroadcast, "broadCastMapName");// 2:执行广播数据的操作
    62. result.print();
    63. }
    64. }