- 所有代码详见git:https://gitee.com/mazhibin456/flink-dev-test
submit
```sql Dpipeline.max-parallelism=120 # 设置最大并行度
-t yarn-per-job|yarn-application # 设置job提交方式
案例
bin/flink run \ -t yarn-per-job \ -d \ -p 5 \ 指定并行度 -Dyarn.application.queue=test \ 指定 yarn 队列 -Djobmanager.memory.process.size=2048mb \ JM2~4G 足够 -Dtaskmanager.memory.process.size=4096mb \ 单个 TM2~8G 足够 -Dtaskmanager.numberOfTaskSlots=2 \ 与容器核数 1core:1slot 或 2core:1slot -c com.atguigu.flink.tuning.UvDemo \ /opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar
设置yarn支持cpu-core的分配: 一个slot可分配两个core
即使提交时设置了taskmanager.numberOfTaskSlots=2,因为yarn的资源调度粒度是DefaultResourceCalculator:只按照内存分配 需改成DominantResourceCalculator按照cpu内存分配,taskslots才生效
<a name="YMZCr"></a>### 操作```sql# 创建保存点$ ./bin/flink savepoint \$JOB_ID \/tmp/flink-savepoints# 删除保存点$ ./bin/flink savepoint \--dispose \/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \$JOB_ID# 优雅的stop-job 并记录保存点$ ./bin/flink stop \--savepointPath /tmp/flink-savepoints \$JOB_ID# 从检查点 启动job$ ./bin/flink run \--detached \--fromSavepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \./examples/streaming/StateMachineExample.jar
优化
- 避免全窗口、非并行度 一个分区操作
- keyBy尽量不使用String
- 设置状态的保存时间
- 所有算子设置uuid
架构优化
// HA flink-conf.yaml high-availability: zookeeper high-availability.zookeeper.quorum: localhost:2181 high-availability.zookeeper.path.root: /flink high-availability.cluster-id: /cluster_one # 重要: 每个flink集群自定义 yarn模式可以不配置 high-availability.storageDir: hdfs:///flink/recoveryJVM、内存的优化
```sql // jobmanager可以使用到的内存 包括元空间,默认1600M jobmanager.memory.process.size
// taskmanager可以使用到的内存 包括元空间,默认1600M taskmanager.memory.process.size
// taskmanager能分配到的slot数量 默认1,可根据cpu数调优 taskmanager.numberOfTaskSlots
设置老年代与新生代的比值2:1
env.java.opts:-XX:NewRatio=2
<a name="iinXX"></a>
#### 并行度的设置
- 总 QPS/单并行度的处理能力 = 并行度, 再*1.5设置一些冗余
- 四个层级设置
- 集群层级:conf/flink-conf.yaml parallelism.default:3
- 任务层级:-p指定并行度 bin/flink run -p 10 ../examples/*WordCount-java*.jar
- job层级:设置默认并行度env.setParallelism(3);
- 算子层级:.setParallelism()
- source并行度
- source如果是kafka,则source并行度设置为kafka-topic的分区数
- transform并行度
- keyby之前:与source并行度保持一致
- keyby之后:如果并发大 设置成2的整次幂:128、256
- sink并行度
- sink如果是kafka,则sink并行度设置为kafka-topic的分区数。
- 其他按照sink抗压能力来设置
<a name="SSu0t"></a>
#### 检查点、状态优化
- 检查点一般设置为分钟级(1-5分钟)
- 对于状态很大的任务则设置为5-10分钟,并调大两次checkpoint之间的暂停时间:5分钟左右
- 背压
- 要避免出现。
- 产生原因:生产的数据过多,下游来不及消费。通常产生于短时间的峰值,如垃圾回收、大促、秒杀。
- 影响:如果不能得到正确的处理,可能会影响到 checkpoint 时长和 state 大小,甚至可能会导致资源耗尽甚至系统崩溃。
- 排查:-Drest.flamegraph.enabled=true #1.13版本开启jvm火焰图
- 状态设置超时时间TTL
```sql
#API 指定
tableEnv.getConfig().setIdleStateRetention(Duration.ofHours(1));
#参数指定
Configuration configuration = tableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.state.ttl", "1 h");
# 开启增量检查点
state.backend.incremental: true #默认 false,或代码中指定:new EmbeddedRocksDBStateBackend(true)
# rocksdb 读缓存的大小
state.backend.rocksdb.block.cache-size: 64m #默认 8m
# rocksdb 写缓存的大小
state.backend.rocksdb.writebuffer.size: 128m
state.backend.rocksdb.compaction.level.max-size-level-base: 320m
# 增大线程数
state.backend.rocksdb.thread.num: 8 #默认1
state.backend.rocksdb.checkpoint.transfer.thread.num:8
# 增大 writebuffer最小合并数
state.backend.rocksdb.writebuffer.number-to-merge: 3 #默认1
数据倾斜
虽然没有GC,但是个别的task执行时间严重延长
1、重新设计key,以更小粒度的key使得task大小合理化。
2、修改并行度
- keyby之前就有数据倾斜
可能是分区的数据量本身不均匀,方案:强制进行 shuffle。使用 shuffle、rebalance 或 rescale
- keyby + windows 数据倾斜
窗口结束才会触发计算,有攒批的过程 用两阶段聚合方案解决
第一阶段聚合:key 拼接随机数前缀或后缀,进行 keyby、开窗、聚合
注意:要获取 WindowEnd 作为窗口标记作为第二阶段分组依据,避免不同窗口的结果聚合到一起
第二阶段聚合:按照原来的 key 及 windowEnd 作 keyby、聚合
sql写法参考:https://zhuanlan.zhihu.com/p/197299746
select
winEnd,
split_index(plat1,'_',0) as plat2,
sum(pv)
from (
select
TUMBLE_END(proc_time, INTERVAL '1' MINUTE) as winEnd,
plat1,
count(*) as pv
from (
-- 最内层,将分组的key,也就是plat加上一个随机数打散
select
plat || '_' || cast(cast(RAND()*100 as int) as string) as plat1 ,
proc_time
from source_kafka_table
) group by
TUMBLE(proc_time, INTERVAL '1' MINUTE),
plat1
) group by
winEnd,
split_index(plat1,'_',0)
其他
# STREAMING\BATCH\AUTOMATIC
# 配置批、流模式
bin/flink run
-Dexecution.runtime-mode=BATCH
examples/streaming/WordCount.jar
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
# 1.13开始 支持minibatch,攒批缓存一定的数据在触发计算,来减少对state的访问
// 初始化
TableEnvironment tEnv = ...
// 获取 tableEnv 的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 开启 miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量输出的间隔时间
configuration.setString("table.exec.mini-batch.allow-latency", "5s");
// 防止 OOM 设置每个批次最多缓存数据的条数,可以设为 2 万条
configuration.setString("table.exec.mini-batch.size", "20000");
//不同服务器之间传递的缓冲区超时时间
setBufferTimeout(-1) : 最大化延迟,等缓冲区满了才刷新
setBufferTimeout(0):最小化延迟,数据一旦接收就刷新
env.setBufferTimeout(timeoutMillis) :一般设置为10毫秒左右
env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
