Apache Flink 是一个在无界和有界数据流上进行状态计算的框架和分布式处理引擎。 Flink 已经可以在所有常见的集群境中运行,并以 in-memory 的速度和任意的规模进行计算。
1.1 Flink组件栈
各层详细介绍:
- 物理部署层:Flink 支持本地运行、能在独立集群或者在被 YARN 管理的集群上运行,
也能部署在云上,该层主要涉及Flink的部署模式,目前Flink支持多种部署模式:本地、集群(Standalone、YARN)、云(GCE/EC2)、Kubenetes。Flink能够通过该层能够支持不同平台的部署,用户可以根据需要选择使用对应的部署模式。 - Runtime核心层:Runtime层提供了支持Flink计算的全部核心实现,为上层API层提供基础服务,该层主要负责对上层不同接口提供基础服务,也是Flink分布式计算框架的核心实现层,支持分布式Stream作业的执行、JobGraph到ExecutionGraph的映射转换、任务调度等。将DataSteam和DataSet转成统一的可执行的Task Operator,达到在流式引擎下同时处理批量计算和流式计算的目的。
- API&Libraries层:Flink 首先支持了 Scala 和 Java 的 API,Python 也正在测试中。DataStream、DataSet、Table、SQL
API,作为分布式数据处理框架,Flink同时提供了支撑计算和批计算的接口,两者都提供给用户丰富的数据处理高级API,例如Map、FlatMap操作等,也提供比较低级的Process Function API,用户可以直接操作状态和时间等底层数据。 - 扩展库:Flink 还包括用于复杂事件处理的CEP,机器学习库FlinkML,图处理库Gelly等。Table
是一种接口化的 SQL 支持,也就是 API 支持(DSL),而不是文本化的SQL 解析和执行。1.2Flink基石
- Checkpoint
- State
- Time
- Window
1.3部署方式
1.简单模式
#!!!!!!1.单机部署
tar -zxvf flink-1.12.0-bin-scala_2.12.tgz
#启动集群
/export/server/flink-1.12.0/bin/start-cluster.sh
#测试
bin/flink run examples/batch/WordCount.jar --input /export/data/flink/input --output /export/data/flink/output/result.txt
#!!!!!终级版flink on yarn
2.Standalone-HA模式
--- 1.flink-conf.yaml配置修改
jobmanager.archive.fs.dir: hdfs://ns/flink/completed-jobs/
historyserver.web.address: node2
# The port under which the web-based HistoryServer listens.
historyserver.web.port: 8082
# Comma separated list of directories to monitor for completed jobs.
historyserver.archive.fs.dir: hdfs://ns/flink/completed-jobs/
# Interval in milliseconds for refreshing the monitored directories.
historyserver.archive.fs.refresh-interval: 10000
state.backend: filesystem
state.backend.fs.checkpointdir: hdfs://ns/flink-checkpoints
high-availability: zookeeper
high-availability.storageDir: hdfs://ns/flink/ha/
high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181
high-availability: zookeeper
high-availability.zookeeper.quorum: zetagmaster:2181,zetagworker1:2181,zetagworker2:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /default_ns
high-availability.storageDir: hdfs://mycluster/flink/recovery
---2 分发jar包到$flink/lib/
flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
3.Session模式
Session-Cluster模式需要先启动集群,然后再提交作业,接着会向yarn申请一块空间后,资源永远保持不变。如果资源满了,下一个作业就无法提交,只能等到yarn中的其中一个作业执行完成后,释放了资源,下个作业才会正常提交。所有作业共享Dispatcher和ResourceManager;共享资源;适合规模小执行时间短的作业。
在yarn中初始化一个flink集群,开辟指定的资源,以后提交任务都向这里提交。这个flink集群会常驻在yarn集群中,除非手工停止。
bin/yarn-session.sh -n 2 -tm 800 -s 1 -d
说明:
申请2个CPU、1600M内存
# -n 表示申请2个容器,这里指的就是多少个taskmanager
# -tm 表示每个TaskManager的内存大小
# -s 表示每个TaskManager的slots数量
# -d 表示以后台程序方式运行
查看nodemanage http://node2:8088/cluster
bin/flink run examples/batch/WordCount.jar
4.Per-Job-Cluster 模式
一个Job会对应一个集群,每提交一个作业会根据自身的情况,都会单独向yarn申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常提交和运行。独享Dispatcher和ResourceManager,按需接受资源申请;适合规模大长时间运行的作业。
每次提交都会创建一个新的flink集群,任务之间互相独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失。
bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 examples/batch/WordCount.jar
# -m jobmanager的地址
# -yjm 1024 指定jobmanager的内存信息
# -ytm 1024 指定taskmanager的内存信息
2.flink on yarn提交流程
1.Client上传jar包,配置文件
2.Client向Resourcemanager发送请求,运行job任务
3.Resourcemanager会从任务队列中接受到这个请求,根据各个nodemanger的资源使用情况分配一个nodemanage
4.Resourcemanage会创建一个AppMaster(ApplicationMaster),AppMaster下载jar包和配置文件
5.AppMaster根据配置文件向ResourceMange申请创建几个TaskManager
6.ResourceManger返回对应节点的信息,然后通过封装container创建TaskMange运行程序
7.AppMaster监控TaskManage的运行状况,
1.4启动停机命令
flink run $FLINK_HOME/examples/streaming/SocketWindowWordCount.jar --hostname node2 --port 9999
flink stop --savepointPath hdfs://node1:8020/data/flink/savepoint f9b830f2789f07ac299567ea394a805c
flink run $FLINK_HOME/examples/streaming/SocketWindowWordCount.jar \
--hostname node2 --port 9999 --fromSavepoint hdfs://node1:8020/data/flink/savepoint/savepoint-f9b830-e73c083cb33a
flink编译
mvn -T2C clean install -DskipTests -Pvendor-repos -Dhadoop.version=3.0.0-cdh6.3.2 -T 4 -Dscala-2.11 -DskipTests -Drat.skip=true -Dcheckstyle.skip=true