- 1.Flink-conf.yaml
- jobManager 的IP地址
- JobManager 的端口号
- JobManager JVM heap 内存大小,不建议配的太大,1-2G足够。
- TaskManager JVM heap 内存大小,大小视任务量而定。需要存储任务的中间值,网络缓存,用户数据等
- 每个 TaskManager 提供的任务 slots 数量大小
- 在yarn模式使用的时候会受到yarn.scheduler.maximum-allocation-vcores值的影响。此处指定的slot数量如果超过yarn的maximum-allocation-vcores,flink启动会报错。
- 在yarn模式,flink启动的task manager个数可以参照如下计算公式:num_of_tm = ceil(parallelism / slot) 即并行度除以slot个数,结果向上取整
- 程序默认并行计算的个数,如果任务未指定并行度,将采用此设置
- 文件系统来源
- fs.default-scheme
- 用于存储和检查点状态,可选值为rocksdb,filesystem,hdfs。
- state.backend: filesystem
- 存储检查点的数据文件和元数据的默认目录
- state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
- savepoints 的默认目标目录(可选)
- state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints
- 用于启用/禁用增量 checkpoints 的标志
- state.backend.incremental: false
- io.tmp.dirs: /tmp
- 是否应在 TaskManager 启动时预先分配 TaskManager 管理的内存
- taskmanager.memory.preallocate: false
- 类加载解析顺序,是先检查用户代码 jar(“child-first”)还是应用程序类路径(“parent-first”)。 默认设置指示首先从用户代码 jar 加载类
- classloader.resolve-order: child-first
- 用于网络缓冲区的 JVM 内存的分数。 这决定了 TaskManager 可以同时拥有多少流数据交换通道以及通道缓冲的程度。 如果作业被拒绝或者您收到系统没有足够缓冲区的警告,请增加此值或下面的最小/最大值。 另请注意,“taskmanager.network.memory.min”和“taskmanager.network.memory.max”可能会覆盖此分数
- taskmanager.network.memory.fraction: 0.1
- taskmanager.network.memory.min: 67108864
- taskmanager.network.memory.max: 1073741824
- 覆盖以下配置以提供自定义 ZK 服务名称
- zookeeper.sasl.service-name: zookeeper
- 该配置必须匹配 “security.kerberos.login.contexts” 中的列表(含有一个)
- zookeeper.sasl.login-context-name: Client
前面已经介绍了 Flink 是什么以及如何安装使用flink,现在我们再来了解下 Flink 的配置文件。当然对于单节点的使用 Flink 已经具备了开箱即用。
我们到 Flink 的 conf 看下都有什么
可以看到 conf 下主要有 flink-conf.yaml 配置、日志的配置文件、zk 配置(zoo.cfg)、Flink SQL Client 配置(sql-client-defaults.yaml)
1.Flink-conf.yaml
JobManager 的端口号
jobmanager.rpc.port: 6123
JobManager JVM heap 内存大小,不建议配的太大,1-2G足够。
jobmanager.heap.size: 1024m
TaskManager JVM heap 内存大小,大小视任务量而定。需要存储任务的中间值,网络缓存,用户数据等
taskmanager.heap.size: 1024m
每个 TaskManager 提供的任务 slots 数量大小
在yarn模式使用的时候会受到yarn.scheduler.maximum-allocation-vcores值的影响。此处指定的slot数量如果超过yarn的maximum-allocation-vcores,flink启动会报错。
在yarn模式,flink启动的task manager个数可以参照如下计算公式:num_of_tm = ceil(parallelism / slot) 即并行度除以slot个数,结果向上取整
taskmanager.numberOfTaskSlots: 1
程序默认并行计算的个数,如果任务未指定并行度,将采用此设置
parallelism.default: 1
文件系统来源
fs.default-scheme
- **High Availability(高可用配置)**
```yaml
# 使用zookeeper负责HA实现,可以选择 'NONE' 或者 'zookeeper'.
# high-availability: zookeeper
# 文件系统路径,让 Flink 在高可用性设置中持久保存元数据
# high-availability.storageDir: hdfs:///flink/ha/
# zookeeper 集群的节点的 ip 和 port 端口号
# high-availability.zookeeper.quorum: localhost:2181
# 默认是 open,如果 zookeeper security 启用了该值会更改成 creator
# high-availability.zookeeper.client.acl: open
- Fault tolerance and checkpointing(容错和检查点配置)
```yaml
用于存储和检查点状态,可选值为rocksdb,filesystem,hdfs。
state.backend: filesystem
存储检查点的数据文件和元数据的默认目录
state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
savepoints 的默认目标目录(可选)
state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints
用于启用/禁用增量 checkpoints 的标志
state.backend.incremental: false
- **Rest & web frontend(web 前端配置)**
```yaml
# 基于 Web 的运行时监视器侦听的地址.
#jobmanager.web.address: 0.0.0.0
# Web 的运行时监视器端口
rest.port: 8081
# 是否从基于 Web 的 jobmanager 启用作业提交
# jobmanager.web.submit.enable: false
是否应在 TaskManager 启动时预先分配 TaskManager 管理的内存
taskmanager.memory.preallocate: false
类加载解析顺序,是先检查用户代码 jar(“child-first”)还是应用程序类路径(“parent-first”)。 默认设置指示首先从用户代码 jar 加载类
classloader.resolve-order: child-first
用于网络缓冲区的 JVM 内存的分数。 这决定了 TaskManager 可以同时拥有多少流数据交换通道以及通道缓冲的程度。 如果作业被拒绝或者您收到系统没有足够缓冲区的警告,请增加此值或下面的最小/最大值。 另请注意,“taskmanager.network.memory.min”和“taskmanager.network.memory.max”可能会覆盖此分数
taskmanager.network.memory.fraction: 0.1
taskmanager.network.memory.min: 67108864
taskmanager.network.memory.max: 1073741824
- **Flink Cluster Security Configuration(Flink 集群安全配置)**
```yaml
# 指示是否从 Kerberos ticket 缓存中读取
# security.kerberos.login.use-ticket-cache: true
# 包含用户凭据的 Kerberos 密钥表文件的绝对路径
# security.kerberos.login.keytab: /path/to/kerberos/keytab
# 与 keytab 关联的 Kerberos 主体名称
# security.kerberos.login.principal: flink-user
# 以逗号分隔的登录上下文列表,用于提供 Kerberos 凭据(例如,`Client,KafkaClient`使用凭证进行 ZooKeeper 身份验证和 Kafka 身份验证)
# security.kerberos.login.contexts: Client,KafkaClient
- ZK Security Configuration(Zookeeper 安全配置)
```yaml
覆盖以下配置以提供自定义 ZK 服务名称
zookeeper.sasl.service-name: zookeeper
该配置必须匹配 “security.kerberos.login.contexts” 中的列表(含有一个)
zookeeper.sasl.login-context-name: Client
- **HistoryServer**
可以通过 bin/historyserver.sh (start|stop) 命令启动和关闭 HistoryServer
```shell
# 开启
./bin/historyserver.sh start
# 关闭
bin/historyserver.sh stop
# 指定由JobManager归档的作业信息所存放的目录,这里使用的是HDFS
# jobmanager.archive.fs.dir: hdfs:///completed-jobs/
# 基于 Web 的 HistoryServer 的地址,0.0.0.0代表允许所有ip访问
# historyserver.web.address: 0.0.0.0
# 基于 Web 的 HistoryServer 的端口号
# historyserver.web.port: 8082
# 指定History Server扫描哪些归档目录,多个目录使用逗号分隔
# historyserver.archive.fs.dir: hdfs:///completed-jobs/
# History Server间隔多少毫秒扫描一次归档目录
# historyserver.archive.fs.refresh-interval: 10000
2. Master
localhost:8081
3.Workers
localhost
4.Zoo.cfg
# 每个 tick 的毫秒数
tickTime=2000
# 初始同步阶段可以采取的 tick
initLimit=10
# 在发送请求和获取确认之间可以传递的 tick 数
syncLimit=5
# 存储快照的目录.
# dataDir=/tmp/zookeeper
# 客户端连接的端口
clientPort=2181
# ZooKeeper quorum peers
server.1=localhost:2888:3888
# server.2=host:peer-port:leader-port
5. Flink 在不同环境下的日志配置(这里就不展开了)
log4j-cli.properties
log4j-console.properties
log4j-yarn-session.properties
log4j.properties
logback-console.xml
logback-yarn.xml
logback.xml
6.sql-client-defaults.yaml
execution:
# 执行 流处理(streaming)还是批处理(batch)
type: streaming
#在源中允许'event-time'或仅'processing-time'
time-characteristic: event-time
# 用于调用周期性水印的间隔(以毫秒为单位)
periodic-watermarks-interval: 200
# 'changelog' or 'table' presentation of results
result-mode: changelog
# 程序的并行度
parallelism: 1
# 最大并行度
max-parallelism: 128
# 以毫秒为单位的最小空闲状态保留时间
min-idle-state-retention: 0
# 以毫秒为单位的最大空闲状态保留时间
max-idle-state-retention: 0
deployment:
# 以毫秒为单位的一般集群通信超时
response-timeout: 5000
# (可选)从集群到网关的地址
gateway-address: ""
# (可选)从集群到网关的端口
gateway-port: 0
Flink-sql-client详细的介绍可以看官网 Flink-Sql-Client 官网介绍中文版 ,Flink-Sql-Client 官网介绍英文版
更多 Flink 配置文件相关的知识点可以看官网(目前还没有中文版) https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/