分布式缓存

Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件,并把它放在taskmanager节点中,防止task重复拉取。
此缓存的工作机制如下:程序注册一个文件或者目录(本地或者远程文件系统,例如hdfs或者s3),通过ExecutionEnvironment注册缓存文件并为它起一个名称。
当程序执行,Flink自动将文件或者目录复制到所有taskmanager节点的本地文件系统,仅会执行一次。用户可以通过这个指定的名称查找文件或者目录,然后从taskmanager节点的本地文件系统访问它。

示例

在ExecutionEnvironment中注册一个文件:

  1. //获取运行环境
  2. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  3. //1:注册一个文件,可以使用hdfs上的文件 也可以是本地文件进行测试
  4. env.registerCachedFile("/Users/wangzhiwu/WorkSpace/quickstart/text","a.txt");

在用户函数中访问缓存文件或者目录(这里是一个map函数)。这个函数必须继承RichFunction,因为它需要使用RuntimeContext读取数据:

  1. DataSet<String> result = data.map(new RichMapFunction<String, String>() {
  2. private ArrayList<String> dataList = new ArrayList<String>();
  3. @Override
  4. public void open(Configuration parameters) throws Exception {
  5. super.open(parameters);
  6. //2:使用文件
  7. File myFile = getRuntimeContext().getDistributedCache().getFile("a.txt");
  8. List<String> lines = FileUtils.readLines(myFile);
  9. for (String line : lines) {
  10. this.dataList.add(line);
  11. System.err.println("分布式缓存为:" + line);
  12. }
  13. }
  14. @Override
  15. public String map(String value) throws Exception {
  16. //在这里就可以使用dataList
  17. System.err.println("使用datalist:" + dataList + "------------" +value);
  18. //业务逻辑
  19. return dataList +":" + value;
  20. }
  21. });
  22. result.printToErr();
  23. }

完整代码如下,仔细看注释:

  1. public class DisCacheTest {
  2. public static void main(String[] args) throws Exception{
  3. //获取运行环境
  4. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  5. //1:注册一个文件,可以使用hdfs上的文件 也可以是本地文件进行测试
  6. //text 中有4个单词:hello flink hello FLINK env.registerCachedFile("/Users/wangzhiwu/WorkSpace/quickstart/text","a.txt");
  7. DataSource<String> data = env.fromElements("a", "b", "c", "d");
  8. DataSet<String> result = data.map(new RichMapFunction<String, String>() {
  9. private ArrayList<String> dataList = new ArrayList<String>();
  10. @Override
  11. public void open(Configuration parameters) throws Exception {
  12. super.open(parameters);
  13. //2:使用文件
  14. File myFile = getRuntimeContext().getDistributedCache().getFile("a.txt");
  15. List<String> lines = FileUtils.readLines(myFile);
  16. for (String line : lines) {
  17. this.dataList.add(line);
  18. System.err.println("分布式缓存为:" + line);
  19. }
  20. }
  21. @Override
  22. public String map(String value) throws Exception {
  23. //在这里就可以使用dataList
  24. System.err.println("使用datalist:" + dataList + "------------" +value);
  25. //业务逻辑
  26. return dataList +":" + value;
  27. }
  28. });
  29. result.printToErr();
  30. }
  31. }//

输出结果如下:

  1. [hello, flink, hello, FLINK]:a
  2. [hello, flink, hello, FLINK]:b
  3. [hello, flink, hello, FLINK]:c
  4. [hello, flink, hello, FLINK]:d