Flink 程序启动模式主要是三种:本地模式、集群模式、yarn模式。

我们通常使用 yarn 模式。因此集群模式需要的 master/slave 参数都不需要配置了。

所以后台的服务,只需要启动 history server ,可用通过 Flink UI 查看历史执行日志。

1 下载

https://www.apache.org/dyn/closer.lua/flink/flink-1.13.2/flink-1.13.2-bin-scala_2.12.tgz

2 解压

[hadoop@bigdata1 package]$ tar -xzvf flink-1.13.2-bin-scala_2.12.tgz flink-1.13.2/ flink-1.13.2/LICENSE flink-1.13.2/bin/ flink-1.13.2/licenses/ 。。。 [hadoop@bigdata1 package]$ mv flink-1.13.2 ../flink1132 [hadoop@bigdata1 package]$ pwd /home/hadoop/package [hadoop@bigdata1 package]$ cd ../flink1132 [hadoop@bigdata1 flink1132]$ ls -lrt total 496 -rw-r—r— 1 hadoop hadoop 11357 May 31 10:46 LICENSE -rw-r—r— 1 hadoop hadoop 1309 May 31 10:46 README.txt drwxr-xr-x 2 hadoop hadoop 4096 Jul 2 15:31 log drwxr-xr-x 2 hadoop hadoop 4096 Jul 23 19:06 conf -rw-r—r— 1 hadoop hadoop 455120 Jul 23 19:06 NOTICE drwxr-xr-x 3 hadoop hadoop 4096 Aug 29 10:18 opt drwxr-xr-x 2 hadoop hadoop 4096 Aug 29 10:18 lib drwxr-xr-x 7 hadoop hadoop 4096 Aug 29 10:18 examples drwxr-xr-x 10 hadoop hadoop 4096 Aug 29 10:18 plugins drwxr-xr-x 2 hadoop hadoop 4096 Aug 29 10:18 licenses drwxr-xr-x 2 hadoop hadoop 4096 Aug 29 10:18 bin [hadoop@bigdata1 flink1132]$

3 修改配置文件 flink-conf.yaml

基础配置

jobManager 的IP地址
jobmanager.rpc.address: localhost

JobManager 的端口号
jobmanager.rpc.port: 6123

JobManager JVM heap 内存大小
jobmanager.heap.size: 1024m

TaskManager JVM heap 内存大小
taskmanager.heap.size: 1024m

每个 TaskManager 提供的任务 slots 数量大小

taskmanager.numberOfTaskSlots: 1

程序默认并行计算的个数
parallelism.default: 1

文件系统来源
# fs.default-scheme

高可用性配置

可以选择 ‘NONE’ 或者 ‘zookeeper’.
# high-availability: zookeeper

文件系统路径,让 Flink 在高可用性设置中持久保存元数据
# high-availability.storageDir: hdfs:///flink/ha/

zookeeper 集群中仲裁者的机器 ip 和 port 端口号
# high-availability.zookeeper.quorum: localhost:2181

默认是 open,如果 zookeeper security 启用了该值会更改成 creator
# high-availability.zookeeper.client.acl: open

容错和检查点 配置

用于存储和检查点状态

state.backend: filesystem


# 存储检查点的数据文件和元数据的默认目录

state.checkpoints.dir: hdfs://bigdata1:8020/flink/checkpoints

savepoints 的默认目标目录(可选)

state.savepoints.dir: hdfs://bigdata1:8020/flink/savepoints

用于启用/禁用增量 checkpoints 的标志

state.backend.incremental: true state.backend: rocksdb

jobmanager.execution.failover-strategy: region

web 前端配置

基于 Web 的运行时监视器侦听的地址.
#jobmanager.web.address: 0.0.0.0

Web 的运行时监视器端口
rest.port: 8081

是否从基于 Web 的 jobmanager 启用作业提交
# jobmanager.web.submit.enable: false

高级配置


# io.tmp.dirs: /tmp
# 是否应在 TaskManager 启动时预先分配 TaskManager 管理的内存
# taskmanager.memory.preallocate: false

类加载解析顺序,是先检查用户代码 jar(“child-first”)还是应用程序类路径(“parent-first”)。 默认设置指示首先从用户代码 jar 加载类
# classloader.resolve-order: child-first


# 用于网络缓冲区的 JVM 内存的分数。 这决定了 TaskManager 可以同时拥有多少流数据交换通道以及通道缓冲的程度。 如果作业被拒绝或者您收到系统没有足够缓冲区的警告,请增加此值或下面的最小/最大值。 另请注意,“taskmanager.network.memory.min”和“taskmanager.network.memory.max”可能会覆盖此分数

taskmanager.network.memory.fraction: 0.1
# taskmanager.network.memory.min: 67108864
# taskmanager.network.memory.max: 1073741824

Flink 集群安全配置

指示是否从 Kerberos ticket 缓存中读取
# security.kerberos.login.use-ticket-cache: true

包含用户凭据的 Kerberos 密钥表文件的绝对路径
# security.kerberos.login.keytab: /path/to/kerberos/keytab

与 keytab 关联的 Kerberos 主体名称
# security.kerberos.login.principal: flink-user

以逗号分隔的登录上下文列表,用于提供 Kerberos 凭据(例如,Client,KafkaClient使用凭证进行 ZooKeeper 身份验证和 Kafka 身份验证)
# security.kerberos.login.contexts: Client,KafkaClient

Zookeeper 安全配置

覆盖以下配置以提供自定义 ZK 服务名称
# zookeeper.sasl.service-name: zookeeper

该配置必须匹配 “security.kerberos.login.contexts” 中的列表(含有一个)
# zookeeper.sasl.login-context-name: Client

HistoryServer

启动和关闭 HistoryServer:
/home/hadoop/flink1132/bin/historyserver.sh start
stop

将已完成的作业上传到的目录
jobmanager.archive.fs.dir: hdfs://bigdata1:8020/flink/completed-jobs

基于 Web 的 HistoryServer 的地址
historyserver.web.address: bigdata1

基于 Web 的 HistoryServer 的端口号 8020之前已经配置了
historyserver.web.port: 8082

以逗号分隔的目录列表,用于监视已完成的作业
historyserver.archive.fs.dir:hdfs://bigdata1:8020/flink/completed-jobs

刷新受监控目录的时间间隔(以毫秒为单位)
historyserver.archive.fs.refresh-interval: 10000

是否清理不存在的作业(即已经过期的)

historyserver.archive.clean-expired-jobs:false

每一个归档目录下可以保留的最大job数,设置为-1即不限制

historyserver.archive.retained-jobs:-1

4 添加 linux 环境变量(所有节点都需要)

export HADOOP_CLASSPATH=hadoop classpath

vi /etc/profile

JAVA_HOME=/usr/java/jdk1.8.0_191-amd64 CLASSPATH=$JAVA_HOME/lib/ PATH=$PATH:$JAVA_HOME/bin export PATH JAVA_HOME CLASSPATH

export HADOOP_HOME=/home/hadoop/hadoop260 export PATH=$HADOOP_HOME/bin:$PATH export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop export HADOOP_CLASSPATH=hadoop classpath

hive

export HIVE_HOME=/home/hadoop/hive122 export PATH=$HIVE_HOME/bin:$PATH export HIVE_CONF_DIR=$HIVE_HOME/conf

flink

export FLINK_HOME=/home/hadoop/flink1132

source /etc/profile

5. 新建需要 HDFS 目录、分发到所有节点

[hadoop@bigdata1 log]$ hadoop fs -mkdir hdfs://bigdata1:8020/flink/checkpoints [hadoop@bigdata1 log]$ hadoop fs -mkdir hdfs://bigdata1:8020/flink/savepoints [hadoop@bigdata1 log]$ hadoop fs -mkdir hdfs://bigdata1:8020/flink/completed-jobs [hadoop@bigdata1 ~]$ scp -r flink1132 hadoop@bigdata2:/home/hadoop/

6 启动 historyserver

[hadoop@bigdata1 bin]$ ${FLINK_HOME}/bin/historyserver.sh start Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set. Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set. Starting historyserver daemon on host bigdata1. [hadoop@bigdata1 bin]$

./flink-daemon.sh start historyserver —configDir ../conf

UI 链接
http://bigdata1:8082/#/overview

image.png

请参考上图,由于上文已经配置了 FLINK_CONF_DIR Linux 环境变量,historyserver 可以直接调用。
但是,如果使用 flink-daemon.sh,就必须手动传入参数(../conf)去指定 flink 配置文件,否则启动会报错。

../conf指向的是 flink-conf.yaml
flink-daemon.sh 拿到第三个参数(configDir ../conf)直接传入org.apache.flink.runtime.webmonitor.history.HistoryServer Java 类,里边实际会拼接成(./../conf/flink-conf.yaml)。

image.png
image.png

[hadoop@bigdata1 bin]$ ./flink-daemon.sh start historyserver —configDir ../conf Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set. Starting historyserver daemon on host bigdata1. [hadoop@bigdata1 bin]$ jps 31648 Jps 16544 DataNode 13169 RunJar 26881 RunJar 31618 HistoryServer 16115 NameNode 17108 ResourceManager 17621 NodeManager 16933 SecondaryNameNode 5197 HistoryServer 16878 HistoryServer [hadoop@bigdata1 bin]$ ps aux| grep HistoryServer

7 启动 flink 服务

Local模式

这个直接执行以下命令即可

bin/start-cluster.sh
bin/stop-cluster.sh

Standalone模式

这个需要修改配置文件 flink-conf.yaml、master、slave

jobmanager.rpc.address: hadoop100

然后把flink程序文件复制到所有节点,再启动 bin/start-cluster.sh

Flink On Yarn模式

我们建议使用 Yarn 来统一管理集群资源,这样 flink 就变成一个存粹的计算程序,不需要提前启动服务,但必须将flink程序文件复制到所有节点,同时设置好程序执行需要的本地路径。

执行示例如下

—class x.y.z \ 此段示例不需要 class 参数

/home/hadoop/flink1132/bin/flink run \ —detached \ —jobmanager yarn-cluster \ —yarnname “x.y.z” \ —yarnjobManagerMemory 1024 \ —yarntaskManagerMemory 2048 \ —yarnslots 2 \ —parallelism 2 \ /home/hadoop/flink1132/examples/batch/WordCount.jar —input /home/hadoop/flink1132/README.txt —output /home/hadoop/flink1132/log/

内存集中管理模式:yarn session

第一步,启动 yarn session:
/home/hadoop/flink1132/bin/yarn-session.sh -Djm 1024 -Dtm 1024 -d

参数解释: //-n 2 表示指定两个容器 // -jm 1024 表示jobmanager 1024M内存 // -tm 2048 表示taskmanager 2048M内存 //-d 任务后台运行 。 //-nm,—name YARN上为一个自定义的应用设置一个名字 //-q,—query 显示yarn中可用的资源 (内存, cpu核数) //-z,—zookeeperNamespace 针对HA模式在zookeeper上创建NameSpace //-id,—applicationId YARN集群上的任务id,附着到一个后台运行的yarn session中

这时候会申请一个 yarn application,然后启动 JobManager,但不会分配 TaskManager
image.png

第二步,在 yarn session 内启动任务

/home/hadoop/flink1132/bin/flink run \
—detached \
—jobmanager bigdata1:33559 \
—yarnname “yyds” \
—yarnjobManagerMemory 1024 \
—yarntaskManagerMemory 2048 \
—yarnslots 4 \
—parallelism 12 \
-c com.pixel.flink.ReadingFromKafka \
/home/hadoop/flink1132/flink-1.0.jar

bigdata1:37199 是第一步启动日志里的 JobManager Web Interface: http://bigdata1:37199

独立内存模式:per-job

会启动单独的 yarn application —yarnslots 4 \ # 每个taskManager内的slots个数。默认taskManager只给分配1个cpu,但是不是一个slots设置一个cpu更好呢? —parallelism 12 \ #并行度:taskManager个数=12/4

/home/hadoop/flink1132/bin/flink run \ —detached \ —jobmanager yarn-cluster \ —yarnname “yyds” \ —yarnjobManagerMemory 1024 \ —yarntaskManagerMemory 2048 \ —yarnslots 4 \ —parallelism 12 \ -c com.pixel.flink.ReadingFromKafka \ /home/hadoop/flink1132/flink-1.0.jar

http://bigdata1:49966/cluster/apps
image.png

http://bigdata1:8082/#/overview
image.png

org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.

原因 CheckpointTimeout 设置的Checkpoin超时时间内未完成任务 导致的超时异常。

解决 1,增加超时时间 2,增加机器性能 3,减少数据处理量 :source并行度 和窗口数据量减少,sink 并行度增加