配置

详细参考:https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html
conf/flink-conf.yaml

  1. resourcemanager.taskmanager-timeout #默认30000毫秒,yarn-session模式任务结束30秒后TM关闭。
  2. yarn.application-attempts: 10 #yarn抢占、节点故障、NM同步时候的重启不算在这里
  3. state.checkpoints.num-retained: 20 #保留多个Checkpoint,默认只保存一个

打印Debug类型日志配置

有的客户端环境比较复杂,难以定位日志位置和配置时,可以通过以下环境变量配置打开 log4j 的 DEBUG 日志,跟踪 log4j 的初始化和详细加载流程:export JVM_ARGS=”-Dlog4j.debug=true”

日志配置

log4j和logback文件同时存在,则log4j会生效。

HA

HA模式下 jobmanager.rpc.address、jobmanager.rpc.port失效。

  1. high-availability: zookeeper
  2. high-availability.zookeeper.quorum: zk-host1:2181,zk-host2:2181,zk-host3:2181
  3. high-availability.storageDir: hdfs:///flink/recovery
  4. high-availability.zookeeper.path.root: /flink
  5. yarn.application-attempts: 10

Yarn

high-availability.cluster-id 最好不配置。Yarn会把application-id当做cluster-id。

Metrics

prometheus

  1. metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter

promgateway

  1. metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
  2. metrics.reporter.promgateway.host: venn
  3. metrics.reporter.promgateway.port: 9091
  4. metrics.reporter.promgateway.jobName: myJob
  5. metrics.reporter.promgateway.randomJobNameSuffix: true
  6. metrics.reporter.promgateway.deleteOnShutdown: false

重启策略

Fixed Delay Restart Strategy

  1. restart-strategy.fixed-delay.attempts: 3
  2. restart-strategy.fixed-delay.delay: 10 s
  1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  2. env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3. 3, // number of restart attempts
  4. Time.of(10, TimeUnit.SECONDS) // delay
  5. ));

Failure Rate Restart Strategy

  1. restart-strategy.failure-rate.max-failures-per-interval: 3
  2. restart-strategy.failure-rate.failure-rate-interval: 5 min
  3. restart-strategy.failure-rate.delay: 10 s
  1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  2. env.setRestartStrategy(RestartStrategies.failureRateRestart(
  3. 3, // max failures per interval
  4. Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
  5. Time.of(10, TimeUnit.SECONDS) // delay
  6. ));