分布式缓存

Flink提供分布式缓存的使用,可以将本地或者HDFS中文件进行缓存,缓存在每个TaskManager上,当前的task的就可以像读取本地文件一样,拉取文件中内容。
在生产环境中有这么一种情况:在Join表的过程中,如果一个表很大,一个表相对很小,就可以将小的表进行分布式缓存在每个TaskManager上,然后再进行Join。
进行分布式缓存的示例代码如下:

  1. public static void main(String[] args) throws Exception {
  2. final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  3. env.registerCachedFile("E:\\idea_bigData_Project\\FlinkStudy\\src\\main\\resources\\distributedcache.txt", "distributedCache");
  4. DataSource<String> data = env.fromElements("lineA", "lineB", "lineC", "lineD");
  5. MapOperator<String, String> result = data.map(new RichMapFunction<String, String>() {
  6. private ArrayList<String> arrayList = new ArrayList<>();
  7. @Override
  8. public void open(Configuration parameters) throws Exception {
  9. super.open(parameters);
  10. // 使用指定的缓存文件
  11. File distributedCache = getRuntimeContext().getDistributedCache().getFile("distributedCache");
  12. List<String> lines = FileUtils.readLines(distributedCache);
  13. for (String line : lines) {
  14. arrayList.add(line);
  15. System.out.println("分布式缓存:" + line);
  16. }
  17. }
  18. @Override
  19. public String map(String value) throws Exception {
  20. // 在此处使用分布式缓存
  21. System.out.println("使用分布式缓存: " + arrayList + " --------- " + value);
  22. return value;
  23. }
  24. });
  25. result.printToErr();
  26. }

在使用分布式缓存的情况下,需要注意分布式容易造成的问题,尽量只读缓存文件,否则容易造成数据的一致性问题。
此外,缓存的文件不应该过大,否则容易OOM。

故障恢复和重启策略

在实际运行环境中,经常会遇到非法数据,以及网络抖动等问题导致应用挂掉,
Flink提供了强大的可配置故障恢复和重启策略来进行自动恢复。
**

故障恢复

在Flink的配置文件flink-conf.yaml中有一个参数

  1. jobmanager.execution.failover-strategy: region

该配置有两种配置,分别是 fullregion
这是Flink支持的两种故障恢复策略,从英文意思可以知道,前者是任务在判定为失败后,将任务中全部的task都进行重启,后者是基于region范围的task重启。
在Region的策略下,Flink为将任务分成不同的region,当某一个task发生故障时,Flink会计算需要故障恢复的最小范围内的region,计算策略如下:

  • 发生错误的task所在的region进行重启。
  • 如果当前region的依赖数据出现损坏或者部分丢失,那么生产数据的region也要重启。
  • 为了保证数据的一致性,当前region的下游region也要进行重启。

重启策略

Flink提供了多种类型和级别的重启策略:

  • 固定延迟重启策略模式
  • 失败率重启策略模式
  • 无重启策略模式

Flink进行了默认的约定,来判断使用哪种重启策略。
如果用户配置了checkpoint,但是没有配置重启策略,那么Flink会默认采用固定延迟重启策略模式。
如果用户没有配置checkpoint,那么采用无重启策略模式。

无重启策略模式

在这种情况下,任务发生错误,会直接退出。
flink-conf.yaml中配置

  1. restart-strategy: none

也可以在程序中使用代码进行配置

  1. final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  2. env.setRestartStrategy(RestartStrategies.noRestart());

固定延迟重启策略模式

flink-conf.yaml中配置

  1. restart-strategy: fixed-delay
  2. # 需要重启的次数
  3. restart-strategy.fixed-delay.attempts: 3
  4. # 每次重试的时间间隔
  5. restart-strategy.fixed-delay.delay: 5 s

代码配置

  1. env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  2. 3, // 重启次数
  3. Time.of(5, TimeUnit.SECONDS) // 时间间隔
  4. ));

失败率重启策略模式

flink-conf.yaml中配置

  1. restart-strategy: failure-rate
  2. # 在指定的时间间隔内最大的失败次数
  3. restart-strategy.failure-rate.max-failures-per-interval: 3
  4. # 计算失败率的时间间隔
  5. restart-strategy.failure-rate.failure-rate-interval: 5 min
  6. # 每次失败重试的时间间隔
  7. restart-strategy.failure-rate.delay: 5 s

该配置的理解举个例子:假如在5分钟内若失败了3次,则认为任务失败,每次失败后间隔5s。

代码配置

  1. env.setRestartStrategy(RestartStrategies.failureRateRestart(
  2. 3, // 每个时间间隔的最大故障次数
  3. Time.of(5, TimeUnit.MINUTES), // 测量故障率的时间间隔
  4. Time.of(5, TimeUnit.SECONDS) // 每次任务失败时间间隔
  5. ));

需要注意的是,在实际生产环境中由于每个任务的负载和资源消耗不一样,我们推荐在代码中指定每个任务的重试机制和重启策略。
**

并行度

并行度被定义为一个任务能被分成几个子任务并行执行,提高并行度(parallelism)很大程度上可以提高任务的执行效率。
一般情况下我们通过四种级别来设置任务的并行度:

  • 算子级别

在代码中可以调用setParallelism 来设置每个算子的并行度。

  1. DataSet<Tuple2<String, Integer>> counts =
  2. text.flatMap(new LineSplitter())
  3. .groupBy(0)
  4. .sum(1).setParallelism(1);

推荐这种方式,可以针对每个算子进行调优。
算子的并行度设置优先级最高。

  • 执行环境级别

在创建Flink上下文时设置并行度,env.setParallelism
这种方式设置的并行度对当前任务的算子,source和sink进行生效。当然,可以在算子级别设置并行度覆盖该配置。

  1. final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  2. env.setParallelism(5);
  • 提交任务级别

用户在提交任务时,可以显示的指定 -p 参数来设置任务的并行度,例如:

  1. ./bin/flink run -p 10 WordCount.jar
  • 系统配置级别

在flink-conf.yaml中配置

  1. parallelism.default: 1

该配置是在系统层面上配置所有执行环境的并行度。
整体上讲,这四种级别的配置生效优先级如下:算子级别 > 执行环境级别 > 提交任务级别 > 系统配置级别

需要注意的是:
在Flink中有Slot的概念,在配置文件中有 taskmanager.numberOfTaskSlots: 1 ,有多少个slot表示一个TaskManager能有多少并发执行能力。
如果配置为3,则集群中所拥有的所有执行任务的资源为TaskManager*3。