-t yarn-per-job|yarn-application # 设置job提交方式

案例

bin/flink run \ -t yarn-per-job \ -d \ -p 5 \ 指定并行度 -Dyarn.application.queue=test \ 指定 yarn 队列 -Djobmanager.memory.process.size=2048mb \ JM2~4G 足够 -Dtaskmanager.memory.process.size=4096mb \ 单个 TM2~8G 足够 -Dtaskmanager.numberOfTaskSlots=2 \ 与容器核数 1core:1slot 或 2core:1slot -c com.atguigu.flink.tuning.UvDemo \ /opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

设置yarn支持cpu-core的分配: 一个slot可分配两个core

即使提交时设置了taskmanager.numberOfTaskSlots=2,因为yarn的资源调度粒度是DefaultResourceCalculator:只按照内存分配 需改成DominantResourceCalculator按照cpu内存分配,taskslots才生效

yarn.scheduler.capacity.resource-calculator

org.apache.hadoop.yarn.util.resource.DominantResourceCalculator

  1. <a name="YMZCr"></a>
  2. ### 操作
  3. ```sql
  4. # 创建保存点
  5. $ ./bin/flink savepoint \
  6. $JOB_ID \
  7. /tmp/flink-savepoints
  8. # 删除保存点
  9. $ ./bin/flink savepoint \
  10. --dispose \
  11. /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
  12. $JOB_ID
  13. # 优雅的stop-job 并记录保存点
  14. $ ./bin/flink stop \
  15. --savepointPath /tmp/flink-savepoints \
  16. $JOB_ID
  17. # 从检查点 启动job
  18. $ ./bin/flink run \
  19. --detached \
  20. --fromSavepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
  21. ./examples/streaming/StateMachineExample.jar

优化

  • 避免全窗口、非并行度 一个分区操作
  • keyBy尽量不使用String
  • 设置状态的保存时间
  • 所有算子设置uuid

    架构优化

    // HA  flink-conf.yaml
    high-availability: zookeeper
    high-availability.zookeeper.quorum: localhost:2181
    high-availability.zookeeper.path.root: /flink
    high-availability.cluster-id: /cluster_one # 重要: 每个flink集群自定义   yarn模式可以不配置
    high-availability.storageDir: hdfs:///flink/recovery
    

    JVM、内存的优化

    ```sql // jobmanager可以使用到的内存 包括元空间,默认1600M jobmanager.memory.process.size

// taskmanager可以使用到的内存 包括元空间,默认1600M taskmanager.memory.process.size

// taskmanager能分配到的slot数量 默认1,可根据cpu数调优 taskmanager.numberOfTaskSlots

设置老年代与新生代的比值2:1

env.java.opts:-XX:NewRatio=2

<a name="iinXX"></a>
#### 并行度的设置

- 总 QPS/单并行度的处理能力 = 并行度, 再*1.5设置一些冗余
- 四个层级设置
   - 集群层级:conf/flink-conf.yaml    parallelism.default:3
   - 任务层级:-p指定并行度 bin/flink run -p 10 ../examples/*WordCount-java*.jar
   - job层级:设置默认并行度env.setParallelism(3);
   - 算子层级:.setParallelism()

- source并行度
   - source如果是kafka,则source并行度设置为kafka-topic的分区数
- transform并行度
   - keyby之前:与source并行度保持一致
   - keyby之后:如果并发大 设置成2的整次幂:128、256
- sink并行度
   - sink如果是kafka,则sink并行度设置为kafka-topic的分区数。
   - 其他按照sink抗压能力来设置
<a name="SSu0t"></a>
#### 检查点、状态优化

- 检查点一般设置为分钟级(1-5分钟)
- 对于状态很大的任务则设置为5-10分钟,并调大两次checkpoint之间的暂停时间:5分钟左右
- 背压
   - 要避免出现。
   - 产生原因:生产的数据过多,下游来不及消费。通常产生于短时间的峰值,如垃圾回收、大促、秒杀。
   - 影响:如果不能得到正确的处理,可能会影响到 checkpoint 时长和 state 大小,甚至可能会导致资源耗尽甚至系统崩溃。
   - 排查:-Drest.flamegraph.enabled=true  #1.13版本开启jvm火焰图
- 状态设置超时时间TTL
```sql
#API 指定
tableEnv.getConfig().setIdleStateRetention(Duration.ofHours(1));
#参数指定
Configuration configuration = tableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.state.ttl", "1 h");
# 开启增量检查点
state.backend.incremental: true #默认 false,或代码中指定:new EmbeddedRocksDBStateBackend(true)
# rocksdb 读缓存的大小
state.backend.rocksdb.block.cache-size: 64m #默认 8m
# rocksdb 写缓存的大小
state.backend.rocksdb.writebuffer.size: 128m
state.backend.rocksdb.compaction.level.max-size-level-base: 320m
# 增大线程数
state.backend.rocksdb.thread.num: 8 #默认1
state.backend.rocksdb.checkpoint.transfer.thread.num:8
# 增大 writebuffer最小合并数
state.backend.rocksdb.writebuffer.number-to-merge: 3 #默认1

数据倾斜

虽然没有GC,但是个别的task执行时间严重延长
1、重新设计key,以更小粒度的key使得task大小合理化。
2、修改并行度

  • keyby之前就有数据倾斜

可能是分区的数据量本身不均匀,方案:强制进行 shuffle。使用 shuffle、rebalance 或 rescale

  • keyby + windows 数据倾斜

窗口结束才会触发计算,有攒批的过程 用两阶段聚合方案解决
第一阶段聚合:key 拼接随机数前缀或后缀,进行 keyby、开窗、聚合
注意:要获取 WindowEnd 作为窗口标记作为第二阶段分组依据,避免不同窗口的结果聚合到一起
第二阶段聚合:按照原来的 key 及 windowEnd 作 keyby、聚合
sql写法参考:https://zhuanlan.zhihu.com/p/197299746

select 
winEnd,
split_index(plat1,'_',0) as plat2,
sum(pv) 
from (

select 
TUMBLE_END(proc_time, INTERVAL '1' MINUTE) as winEnd,
plat1,
count(*) as pv 
from (
    -- 最内层,将分组的key,也就是plat加上一个随机数打散
    select 
    plat || '_' || cast(cast(RAND()*100 as int) as string) as plat1 ,
    proc_time 
    from source_kafka_table 
) group by 
TUMBLE(proc_time, INTERVAL '1' MINUTE), 
plat1

) group by 
winEnd,
split_index(plat1,'_',0)

其他

# STREAMING\BATCH\AUTOMATIC
# 配置批、流模式
bin/flink run 
-Dexecution.runtime-mode=BATCH 
examples/streaming/WordCount.jar

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

# 1.13开始 支持minibatch,攒批缓存一定的数据在触发计算,来减少对state的访问
// 初始化
TableEnvironment tEnv = ...
// 获取 tableEnv 的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 开启 miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量输出的间隔时间
configuration.setString("table.exec.mini-batch.allow-latency", "5s");
// 防止 OOM 设置每个批次最多缓存数据的条数,可以设为 2 万条
configuration.setString("table.exec.mini-batch.size", "20000");

//不同服务器之间传递的缓冲区超时时间
setBufferTimeout(-1) : 最大化延迟,等缓冲区满了才刷新
setBufferTimeout(0):最小化延迟,数据一旦接收就刷新
env.setBufferTimeout(timeoutMillis) :一般设置为10毫秒左右
env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);