- 1 下载
- 2 解压
- 3 修改配置文件 flink-conf.yaml
- jobManager 的IP地址
jobmanager.rpc.address: localhost - JobManager 的端口号
jobmanager.rpc.port: 6123 - JobManager JVM heap 内存大小
jobmanager.heap.size: 1024m - TaskManager JVM heap 内存大小
taskmanager.heap.size: 1024m - 每个 TaskManager 提供的任务 slots 数量大小
- 程序默认并行计算的个数
parallelism.default: 1 - 文件系统来源
# fs.default-scheme - 可以选择 ‘NONE’ 或者 ‘zookeeper’.
# high-availability: zookeeper - 文件系统路径,让 Flink 在高可用性设置中持久保存元数据
# high-availability.storageDir: hdfs:///flink/ha/ - zookeeper 集群中仲裁者的机器 ip 和 port 端口号
# high-availability.zookeeper.quorum: localhost:2181 - 默认是 open,如果 zookeeper security 启用了该值会更改成 creator
# high-availability.zookeeper.client.acl: open - 用于存储和检查点状态
- savepoints 的默认目标目录(可选)
- 基于 Web 的运行时监视器侦听的地址.
#jobmanager.web.address: 0.0.0.0 - Web 的运行时监视器端口
rest.port: 8081 - 是否从基于 Web 的 jobmanager 启用作业提交
# jobmanager.web.submit.enable: false - 类加载解析顺序,是先检查用户代码 jar(“child-first”)还是应用程序类路径(“parent-first”)。 默认设置指示首先从用户代码 jar 加载类
# classloader.resolve-order: child-first - taskmanager.network.memory.fraction: 0.1
# taskmanager.network.memory.min: 67108864
# taskmanager.network.memory.max: 1073741824 - 指示是否从 Kerberos ticket 缓存中读取
# security.kerberos.login.use-ticket-cache: true - 包含用户凭据的 Kerberos 密钥表文件的绝对路径
# security.kerberos.login.keytab: /path/to/kerberos/keytab - 与 keytab 关联的 Kerberos 主体名称
# security.kerberos.login.principal: flink-user - 以逗号分隔的登录上下文列表,用于提供 Kerberos 凭据(例如,
Client,KafkaClient
使用凭证进行 ZooKeeper 身份验证和 Kafka 身份验证)
# security.kerberos.login.contexts: Client,KafkaClient - 覆盖以下配置以提供自定义 ZK 服务名称
# zookeeper.sasl.service-name: zookeeper - 该配置必须匹配 “security.kerberos.login.contexts” 中的列表(含有一个)
# zookeeper.sasl.login-context-name: Client - 将已完成的作业上传到的目录
jobmanager.archive.fs.dir: hdfs://bigdata1:8020/flink/completed-jobs - 基于 Web 的 HistoryServer 的地址
historyserver.web.address: bigdata1 - 基于 Web 的 HistoryServer 的端口号 8020之前已经配置了
historyserver.web.port: 8082 - 以逗号分隔的目录列表,用于监视已完成的作业
historyserver.archive.fs.dir:hdfs://bigdata1:8020/flink/completed-jobs - 刷新受监控目录的时间间隔(以毫秒为单位)
historyserver.archive.fs.refresh-interval: 10000 - 是否清理不存在的作业(即已经过期的)
- 每一个归档目录下可以保留的最大job数,设置为-1即不限制
- hive
- flink
Flink 程序启动模式主要是三种:本地模式、集群模式、yarn模式。
我们通常使用 yarn 模式。因此集群模式需要的 master/slave 参数都不需要配置了。
所以后台的服务,只需要启动 history server ,可用通过 Flink UI 查看历史执行日志。
1 下载
https://www.apache.org/dyn/closer.lua/flink/flink-1.13.2/flink-1.13.2-bin-scala_2.12.tgz
2 解压
[hadoop@bigdata1 package]$ tar -xzvf flink-1.13.2-bin-scala_2.12.tgz flink-1.13.2/ flink-1.13.2/LICENSE flink-1.13.2/bin/ flink-1.13.2/licenses/ 。。。 [hadoop@bigdata1 package]$ mv flink-1.13.2 ../flink1132 [hadoop@bigdata1 package]$ pwd /home/hadoop/package [hadoop@bigdata1 package]$ cd ../flink1132 [hadoop@bigdata1 flink1132]$ ls -lrt total 496 -rw-r—r— 1 hadoop hadoop 11357 May 31 10:46 LICENSE -rw-r—r— 1 hadoop hadoop 1309 May 31 10:46 README.txt drwxr-xr-x 2 hadoop hadoop 4096 Jul 2 15:31 log drwxr-xr-x 2 hadoop hadoop 4096 Jul 23 19:06 conf -rw-r—r— 1 hadoop hadoop 455120 Jul 23 19:06 NOTICE drwxr-xr-x 3 hadoop hadoop 4096 Aug 29 10:18 opt drwxr-xr-x 2 hadoop hadoop 4096 Aug 29 10:18 lib drwxr-xr-x 7 hadoop hadoop 4096 Aug 29 10:18 examples drwxr-xr-x 10 hadoop hadoop 4096 Aug 29 10:18 plugins drwxr-xr-x 2 hadoop hadoop 4096 Aug 29 10:18 licenses drwxr-xr-x 2 hadoop hadoop 4096 Aug 29 10:18 bin [hadoop@bigdata1 flink1132]$
3 修改配置文件 flink-conf.yaml
基础配置
jobManager 的IP地址
jobmanager.rpc.address: localhost
JobManager 的端口号
jobmanager.rpc.port: 6123
JobManager JVM heap 内存大小
jobmanager.heap.size: 1024m
TaskManager JVM heap 内存大小
taskmanager.heap.size: 1024m
每个 TaskManager 提供的任务 slots 数量大小
taskmanager.numberOfTaskSlots: 1
程序默认并行计算的个数
parallelism.default: 1
文件系统来源
# fs.default-scheme
高可用性配置
可以选择 ‘NONE’ 或者 ‘zookeeper’.
# high-availability: zookeeper
文件系统路径,让 Flink 在高可用性设置中持久保存元数据
# high-availability.storageDir: hdfs:///flink/ha/
zookeeper 集群中仲裁者的机器 ip 和 port 端口号
# high-availability.zookeeper.quorum: localhost:2181
默认是 open,如果 zookeeper security 启用了该值会更改成 creator
# high-availability.zookeeper.client.acl: open
容错和检查点 配置
用于存储和检查点状态
state.backend: filesystem
# 存储检查点的数据文件和元数据的默认目录state.checkpoints.dir: hdfs://bigdata1:8020/flink/checkpoints
savepoints 的默认目标目录(可选)
state.savepoints.dir: hdfs://bigdata1:8020/flink/savepoints
用于启用/禁用增量 checkpoints 的标志
state.backend.incremental: true state.backend: rocksdb
jobmanager.execution.failover-strategy: region
web 前端配置
基于 Web 的运行时监视器侦听的地址.
#jobmanager.web.address: 0.0.0.0
Web 的运行时监视器端口
rest.port: 8081
是否从基于 Web 的 jobmanager 启用作业提交
# jobmanager.web.submit.enable: false
高级配置
# io.tmp.dirs: /tmp
# 是否应在 TaskManager 启动时预先分配 TaskManager 管理的内存
# taskmanager.memory.preallocate: false
类加载解析顺序,是先检查用户代码 jar(“child-first”)还是应用程序类路径(“parent-first”)。 默认设置指示首先从用户代码 jar 加载类
# classloader.resolve-order: child-first
# 用于网络缓冲区的 JVM 内存的分数。 这决定了 TaskManager 可以同时拥有多少流数据交换通道以及通道缓冲的程度。 如果作业被拒绝或者您收到系统没有足够缓冲区的警告,请增加此值或下面的最小/最大值。 另请注意,“taskmanager.network.memory.min”和“taskmanager.network.memory.max”可能会覆盖此分数
taskmanager.network.memory.fraction: 0.1
# taskmanager.network.memory.min: 67108864
# taskmanager.network.memory.max: 1073741824
Flink 集群安全配置
指示是否从 Kerberos ticket 缓存中读取
# security.kerberos.login.use-ticket-cache: true
包含用户凭据的 Kerberos 密钥表文件的绝对路径
# security.kerberos.login.keytab: /path/to/kerberos/keytab
与 keytab 关联的 Kerberos 主体名称
# security.kerberos.login.principal: flink-user
以逗号分隔的登录上下文列表,用于提供 Kerberos 凭据(例如,Client,KafkaClient
使用凭证进行 ZooKeeper 身份验证和 Kafka 身份验证)
# security.kerberos.login.contexts: Client,KafkaClient
Zookeeper 安全配置
覆盖以下配置以提供自定义 ZK 服务名称
# zookeeper.sasl.service-name: zookeeper
该配置必须匹配 “security.kerberos.login.contexts” 中的列表(含有一个)
# zookeeper.sasl.login-context-name: Client
HistoryServer
启动和关闭 HistoryServer:
/home/hadoop/flink1132/bin/historyserver.sh start
stop
将已完成的作业上传到的目录
jobmanager.archive.fs.dir: hdfs://bigdata1:8020/flink/completed-jobs
基于 Web 的 HistoryServer 的地址
historyserver.web.address: bigdata1
基于 Web 的 HistoryServer 的端口号 8020之前已经配置了
historyserver.web.port: 8082
以逗号分隔的目录列表,用于监视已完成的作业
historyserver.archive.fs.dir:hdfs://bigdata1:8020/flink/completed-jobs
刷新受监控目录的时间间隔(以毫秒为单位)
historyserver.archive.fs.refresh-interval: 10000
是否清理不存在的作业(即已经过期的)
historyserver.archive.clean-expired-jobs:false
每一个归档目录下可以保留的最大job数,设置为-1即不限制
historyserver.archive.retained-jobs:-1
4 添加 linux 环境变量(所有节点都需要)
export HADOOP_CLASSPATH=hadoop classpath
vi /etc/profile
JAVA_HOME=/usr/java/jdk1.8.0_191-amd64 CLASSPATH=$JAVA_HOME/lib/ PATH=$PATH:$JAVA_HOME/bin export PATH JAVA_HOME CLASSPATH
export HADOOP_HOME=/home/hadoop/hadoop260 export PATH=$HADOOP_HOME/bin:$PATH export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop export HADOOP_CLASSPATH=
hadoop classpath
hive
export HIVE_HOME=/home/hadoop/hive122 export PATH=$HIVE_HOME/bin:$PATH export HIVE_CONF_DIR=$HIVE_HOME/conf
flink
export FLINK_HOME=/home/hadoop/flink1132
source /etc/profile
5. 新建需要 HDFS 目录、分发到所有节点
[hadoop@bigdata1 log]$ hadoop fs -mkdir hdfs://bigdata1:8020/flink/checkpoints [hadoop@bigdata1 log]$ hadoop fs -mkdir hdfs://bigdata1:8020/flink/savepoints [hadoop@bigdata1 log]$ hadoop fs -mkdir hdfs://bigdata1:8020/flink/completed-jobs [hadoop@bigdata1 ~]$ scp -r flink1132 hadoop@bigdata2:/home/hadoop/
6 启动 historyserver
[hadoop@bigdata1 bin]$ ${FLINK_HOME}/bin/historyserver.sh start Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set. Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set. Starting historyserver daemon on host bigdata1. [hadoop@bigdata1 bin]$
./flink-daemon.sh start historyserver —configDir ../conf
UI 链接
http://bigdata1:8082/#/overview
请参考上图,由于上文已经配置了 FLINK_CONF_DIR Linux 环境变量,historyserver 可以直接调用。
但是,如果使用 flink-daemon.sh,就必须手动传入参数(../conf)去指定 flink 配置文件,否则启动会报错。
../conf指向的是 flink-conf.yaml
flink-daemon.sh 拿到第三个参数(configDir ../conf)直接传入org.apache.flink.runtime.webmonitor.history.HistoryServer Java 类,里边实际会拼接成(./../conf/flink-conf.yaml)。
[hadoop@bigdata1 bin]$ ./flink-daemon.sh start historyserver —configDir ../conf Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set. Starting historyserver daemon on host bigdata1. [hadoop@bigdata1 bin]$ jps 31648 Jps 16544 DataNode 13169 RunJar 26881 RunJar 31618 HistoryServer 16115 NameNode 17108 ResourceManager 17621 NodeManager 16933 SecondaryNameNode 5197 HistoryServer 16878 HistoryServer [hadoop@bigdata1 bin]$ ps aux| grep HistoryServer
7 启动 flink 服务
Local模式
这个直接执行以下命令即可
bin/start-cluster.sh
bin/stop-cluster.sh
Standalone模式
这个需要修改配置文件 flink-conf.yaml、master、slave
jobmanager.rpc.address: hadoop100
然后把flink程序文件复制到所有节点,再启动 bin/start-cluster.sh
Flink On Yarn模式
我们建议使用 Yarn 来统一管理集群资源,这样 flink 就变成一个存粹的计算程序,不需要提前启动服务,但必须将flink程序文件复制到所有节点,同时设置好程序执行需要的本地路径。
执行示例如下
—class x.y.z \ 此段示例不需要 class 参数
/home/hadoop/flink1132/bin/flink run \ —detached \ —jobmanager yarn-cluster \ —yarnname “x.y.z” \ —yarnjobManagerMemory 1024 \ —yarntaskManagerMemory 2048 \ —yarnslots 2 \ —parallelism 2 \ /home/hadoop/flink1132/examples/batch/WordCount.jar —input /home/hadoop/flink1132/README.txt —output /home/hadoop/flink1132/log/
内存集中管理模式:yarn session
第一步,启动 yarn session:
/home/hadoop/flink1132/bin/yarn-session.sh -Djm 1024 -Dtm 1024 -d
参数解释: //-n 2 表示指定两个容器 // -jm 1024 表示jobmanager 1024M内存 // -tm 2048 表示taskmanager 2048M内存 //-d 任务后台运行 。 //-nm,—name YARN上为一个自定义的应用设置一个名字 //-q,—query 显示yarn中可用的资源 (内存, cpu核数) //-z,—zookeeperNamespace
针对HA模式在zookeeper上创建NameSpace //-id,—applicationId YARN集群上的任务id,附着到一个后台运行的yarn session中
这时候会申请一个 yarn application,然后启动 JobManager,但不会分配 TaskManager
第二步,在 yarn session 内启动任务
/home/hadoop/flink1132/bin/flink run \
—detached \
—jobmanager bigdata1:33559 \
—yarnname “yyds” \
—yarnjobManagerMemory 1024 \
—yarntaskManagerMemory 2048 \
—yarnslots 4 \
—parallelism 12 \
-c com.pixel.flink.ReadingFromKafka \
/home/hadoop/flink1132/flink-1.0.jar
bigdata1:37199 是第一步启动日志里的 JobManager Web Interface: http://bigdata1:37199
独立内存模式:per-job
会启动单独的 yarn application —yarnslots 4 \ # 每个taskManager内的slots个数。默认taskManager只给分配1个cpu,但是不是一个slots设置一个cpu更好呢? —parallelism 12 \ #并行度:taskManager个数=12/4
/home/hadoop/flink1132/bin/flink run \ —detached \ —jobmanager yarn-cluster \ —yarnname “yyds” \ —yarnjobManagerMemory 1024 \ —yarntaskManagerMemory 2048 \ —yarnslots 4 \ —parallelism 12 \ -c com.pixel.flink.ReadingFromKafka \ /home/hadoop/flink1132/flink-1.0.jar
http://bigdata1:49966/cluster/apps
http://bigdata1:8082/#/overview
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
原因 CheckpointTimeout 设置的Checkpoin超时时间内未完成任务 导致的超时异常。
解决 1,增加超时时间 2,增加机器性能 3,减少数据处理量 :source并行度 和窗口数据量减少,sink 并行度增加