分布式缓存
Flink提供分布式缓存的使用,可以将本地或者HDFS中文件进行缓存,缓存在每个TaskManager上,当前的task的就可以像读取本地文件一样,拉取文件中内容。
在生产环境中有这么一种情况:在Join表的过程中,如果一个表很大,一个表相对很小,就可以将小的表进行分布式缓存在每个TaskManager上,然后再进行Join。
进行分布式缓存的示例代码如下:
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerCachedFile("E:\\idea_bigData_Project\\FlinkStudy\\src\\main\\resources\\distributedcache.txt", "distributedCache");
DataSource<String> data = env.fromElements("lineA", "lineB", "lineC", "lineD");
MapOperator<String, String> result = data.map(new RichMapFunction<String, String>() {
private ArrayList<String> arrayList = new ArrayList<>();
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 使用指定的缓存文件
File distributedCache = getRuntimeContext().getDistributedCache().getFile("distributedCache");
List<String> lines = FileUtils.readLines(distributedCache);
for (String line : lines) {
arrayList.add(line);
System.out.println("分布式缓存:" + line);
}
}
@Override
public String map(String value) throws Exception {
// 在此处使用分布式缓存
System.out.println("使用分布式缓存: " + arrayList + " --------- " + value);
return value;
}
});
result.printToErr();
}
在使用分布式缓存的情况下,需要注意分布式容易造成的问题,尽量只读缓存文件,否则容易造成数据的一致性问题。
此外,缓存的文件不应该过大,否则容易OOM。
故障恢复和重启策略
在实际运行环境中,经常会遇到非法数据,以及网络抖动等问题导致应用挂掉,
Flink提供了强大的可配置故障恢复和重启策略来进行自动恢复。
**
故障恢复
在Flink的配置文件flink-conf.yaml
中有一个参数
jobmanager.execution.failover-strategy: region
该配置有两种配置,分别是 full
和 region
这是Flink支持的两种故障恢复策略,从英文意思可以知道,前者是任务在判定为失败后,将任务中全部的task都进行重启,后者是基于region范围的task重启。
在Region的策略下,Flink为将任务分成不同的region,当某一个task发生故障时,Flink会计算需要故障恢复的最小范围内的region,计算策略如下:
- 发生错误的task所在的region进行重启。
- 如果当前region的依赖数据出现损坏或者部分丢失,那么生产数据的region也要重启。
- 为了保证数据的一致性,当前region的下游region也要进行重启。
重启策略
Flink提供了多种类型和级别的重启策略:
- 固定延迟重启策略模式
- 失败率重启策略模式
- 无重启策略模式
Flink进行了默认的约定,来判断使用哪种重启策略。
如果用户配置了checkpoint,但是没有配置重启策略,那么Flink会默认采用固定延迟重启策略模式。
如果用户没有配置checkpoint,那么采用无重启策略模式。
无重启策略模式
在这种情况下,任务发生错误,会直接退出。
在flink-conf.yaml
中配置
restart-strategy: none
也可以在程序中使用代码进行配置
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());
固定延迟重启策略模式
在flink-conf.yaml
中配置
restart-strategy: fixed-delay
# 需要重启的次数
restart-strategy.fixed-delay.attempts: 3
# 每次重试的时间间隔
restart-strategy.fixed-delay.delay: 5 s
代码配置
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 重启次数
Time.of(5, TimeUnit.SECONDS) // 时间间隔
));
失败率重启策略模式
在flink-conf.yaml
中配置
restart-strategy: failure-rate
# 在指定的时间间隔内最大的失败次数
restart-strategy.failure-rate.max-failures-per-interval: 3
# 计算失败率的时间间隔
restart-strategy.failure-rate.failure-rate-interval: 5 min
# 每次失败重试的时间间隔
restart-strategy.failure-rate.delay: 5 s
该配置的理解举个例子:假如在5分钟内若失败了3次,则认为任务失败,每次失败后间隔5s。
代码配置
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 每个时间间隔的最大故障次数
Time.of(5, TimeUnit.MINUTES), // 测量故障率的时间间隔
Time.of(5, TimeUnit.SECONDS) // 每次任务失败时间间隔
));
需要注意的是,在实际生产环境中由于每个任务的负载和资源消耗不一样,我们推荐在代码中指定每个任务的重试机制和重启策略。
**
并行度
并行度被定义为一个任务能被分成几个子任务并行执行,提高并行度(parallelism)很大程度上可以提高任务的执行效率。
一般情况下我们通过四种级别来设置任务的并行度:
- 算子级别
在代码中可以调用setParallelism 来设置每个算子的并行度。
DataSet<Tuple2<String, Integer>> counts =
text.flatMap(new LineSplitter())
.groupBy(0)
.sum(1).setParallelism(1);
推荐这种方式,可以针对每个算子进行调优。
算子的并行度设置优先级最高。
- 执行环境级别
在创建Flink上下文时设置并行度,env.setParallelism
这种方式设置的并行度对当前任务的算子,source和sink进行生效。当然,可以在算子级别设置并行度覆盖该配置。
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(5);
- 提交任务级别
用户在提交任务时,可以显示的指定 -p 参数来设置任务的并行度,例如:
./bin/flink run -p 10 WordCount.jar
- 系统配置级别
在flink-conf.yaml中配置
parallelism.default: 1
该配置是在系统层面上配置所有执行环境的并行度。
整体上讲,这四种级别的配置生效优先级如下:算子级别 > 执行环境级别 > 提交任务级别 > 系统配置级别。
需要注意的是:
在Flink中有Slot的概念,在配置文件中有 taskmanager.numberOfTaskSlots: 1
,有多少个slot表示一个TaskManager能有多少并发执行能力。
如果配置为3,则集群中所拥有的所有执行任务的资源为TaskManager*3。