本文将介绍常见的3种flink安装部署模式:

以flink-1.12.0版本为例

  • 本地local模式
  • standalone集群模式(包含HA)
  • On Yarn模式

    本地local模式

    local模式下,Flink将以多线程的形式模拟集群运行,基本架构如下:
    flink-local.png
    执行流程如下:
  1. Flink程序由JobClient进行提交给JobManager
  2. JobManager负责协调资源分配给作业执行。资源分配后,将任务提交给相应的TaskManager
  3. TaskManager启动一个线程开始执行,并会向JobManager报告状态如:开始执行、正在执行或已完成
  4. 作业执行完成后,结果将发送会客户短JobClient

    部署步骤

vim .bash_profile

FLINK_HOME=/Users/sgr/software/bigdata/flink-1.12.0 PATH=$PATH:$FLINK_HOME/bin export FLINK_HOME

source .bash_profile

  1. <a name="o1P4U"></a>
  2. ## 测试例子
  3. <a name="RkK8h"></a>
  4. ### 通过flink脚本提交作业
  5. - 启动local集群:./bin/start-cluster.sh
  6. > 启动完会启动两个进程:
  7. > ![image.png](https://cdn.nlark.com/yuque/0/2021/png/2922466/1618218706693-29221d99-8512-4bd3-80d4-d2586d15942a.png#align=left&display=inline&height=61&margin=%5Bobject%20Object%5D&name=image.png&originHeight=61&originWidth=410&size=21318&status=done&style=none&width=410)
  8. > 并会启动web UI,端口为8081
  9. > ![image.png](https://cdn.nlark.com/yuque/0/2021/png/2922466/1618218897276-5b517c97-5d53-40cc-b657-ac2c6d9eb209.png#align=left&display=inline&height=265&margin=%5Bobject%20Object%5D&name=image.png&originHeight=530&originWidth=1919&size=99370&status=done&style=none&width=959.5)
  10. > slot是flink的资源组,flink会将任务拆分成task分配给slot进行执行
  11. - 提交作业
  12. ```shell
  13. ./bin/flink run ./examples/batch/WordCount.jar --input ./README.txt --output ./out

执行输出: cat ./out image.png

  • 停止集群:./bin/stop-cluster.sh

    启动shell交互

    该方式类似与spark-shell,可以交互式的执行scala代码

  • 启动client:./bin/start-scala-shell.sh local

    启动完会启动进程 image.png

  • 提交作业

    benv.readTextFile("./README.txt").flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1).print()
    

    执行输出:
    image.png

    standalone独立集群模式

    standalone独立集群的架构如下:
    flink-standalone.png
    执行流程如下:

  1. client客户端提交作业给JobManager
  2. JobManager负责申请运行作业需要的资源,并管理资源和作业
  3. JobManager分发任务给TaskManager执行,TaskManager会定期向JobManager汇报状态

    部署步骤

  • 集群规划如下 | 机器列表 | JobManager | TaskManager | | :—-: | :—-: | :—-: | | node1 | ✅ | ✅ | | node2 | | ✅ | | node3 | | ✅ |

  • 修改配置文件

    解压flink包: tar zxvf flink-1.12.0-bin-scala_2.11.tgz -C /opt/

    • 修改$FLINK_HOME/conf/flink-conf.yaml ```yaml

      设置jobmanager为node1

      jobmanager.rpc.address: node1

      设置每个taskmanager 2个slot

      taskmanager.numberOfTaskSlots: 2

      运行web提交

      web.submit.enable: true

历史服务器

jobmanager.archive.fs.dir: hdfs://node1:9000/flink/completed-jobs/ historyserver.web.address: node1 historyserver.web.port: 8082 historyserver.archive.fs.dir: hdfs://node1:9000/flink/completed-jobs/


   - 修改$FLINK_HOME/conf/masters

node1:8081


   - 修改$FLINK_HOME/conf/workers

node1 node2 node3


- 分发配置文件
```shell
cd /opt
for i in {2..3};do scp -r flink-1.12.0 node$i:$PWD;done
  • 启动集群 ```shell

    在node1上执行如下命令:

    cd /opt/flink-1.12.0 ./bin/start-cluster.sh

或者可以单独启动jobmanager/taskmanager

在node1上执行

./bin/jobmanager.sh start && ./bin/taskmanager.sh start

剩余的node2、node3上执行

./bin/taskmanager.sh start

可以通过访问flink web UI:[http://node1:8081/](http://node1:8081/#/overview)
> ![image.png](https://cdn.nlark.com/yuque/0/2021/png/2922466/1618371246712-c276309d-5fe6-4c76-a226-2cad2d783b53.png#align=left&display=inline&height=636&margin=%5Bobject%20Object%5D&name=image.png&originHeight=848&originWidth=2520&size=163602&status=done&style=none&width=1890)

- 启动历史服务器
```shell
# 在node1上执行
./bin/historyserver.sh start

启动会报如下错误:
image.png

原因是flink采用Pluggable File Systems模式,如果要操作hdfs的话,必须要在flink安装目录的 lib 下加上额外的jar包。 Note: historyserver目录 hdfs://node1:9000/flink/completed-jobs 需要事先创建,不然会报错。

我这边hadoop集群版本为2.7.4jar,对应包的下载地址:https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-7.0/flink-shaded-hadoop-2-uber-2.7.5-7.0.jar
也可以通过flink官网下载:https://flink.apache.org/downloads.html

image.png

可以通过访问flink historyserver web UI:http://node1:8082/

image.png

测试例子

  • 提交作业

    cd /opt/flink-1.12.0
    ./bin/flink run ./examples/batch/WordCount.jar --input ./README.txt --output ./out
    
  • 查看历史

我们可以在hdfs看到作业信息:
image.png
web UI上也能查看到成功的job:

image.png

  • 停止集群:./bin/stop-cluster.sh

    standalone HA搭建

    之前的架构中,有一个很明显的问题就是JobManager存在单点问题,JobManager负责任务调度及资源分配,一旦JobManager出现问题,整个集群将不可用。
    为了解决这个问题,我们引入Zookeeper,同时部署多个JobManager。在整个集群中只有一个active的JobManager,其余的将处于standby状态。当主JobManager出现问题,Zookeeper将会从剩余的standby状态中选取一个新的JobManager来管理Flink集群。基本架构如下:
    flink-ha.png

    部署流程

  • 集群规划 | 机器列表 | JobManager | TaskManager | | :—-: | :—-: | :—-: | | node1 | ✅ | ✅ | | node2 | ✅ | ✅ | | node3 | | ✅ |

  • 启动Zookeeper集群:这边不详细展开

  • 修改配置文件

    • 修改$FLINK_HOME/conf/flink-conf.yaml,新增如下配置:

      # 开启HA后,使用文件系统作为快照存储
      state.backend: filesystem
      # 设置检查点,可以将快照信息保持到hdfs
      state.checkpoints.dir: hdfs://node1:9000/flink/flink-checkpoints
      # HA 设置
      high-availability: zookeeper
      # 存储JobManager的元数据到hdfs
      high-availability.storageDir: hdfs://node1:9000/flink/ha/
      high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181
      
    • 修改$FLINK_HOME/conf/masters

      node1:8081
      node2:8081
      
  • 分配配置文件

    cd /opt/flink-1.12.0
    for i in {2..3};do scp -r conf node$i:$PWD;done
    

    注意:需要修改node2上的$FLINK_HOME/conf/flink-conf.yaml配置中jobmanager.rpc.address为node2

  • 启动集群

    # 在node1上执行如下命令:
    cd /opt/flink-1.12.0
    ./bin/start-cluster.sh
    

    image.png

可以分别访问两台flink web UI:

这样当某个JobManager出现问题时,另一个会进行接管,用户还是可以正常提交作业执行。

Flink on Yarn模式

我们使用yarn集群来管理Flink应用有如下好处:

  • Yarn对资源可以按需使用,配合多种调度模式,能提高集群资源利用率
  • Yarn对任务调度能设置优先级,保障重要任务
  • 基于Yarn调度,能够自动处理各个角色的Failover(容错机制)

    JobManager和TaskManager进程都由Yarn NodeManager进行监控,如果JobManager异常退出,Yarn的ResourceManager会重新调度JobManager到其他机器;如果TaskManager异常退出,JobManager会收到消息并重新向ResourceManager申请资源进行启动。

Flink与Yarn的交互基本如下所示:
flink-yarn1.png
作业提交流程如下:
flink-yarn2.png

  1. 首先用户上次程序jar包和配置文件到hdfs集群上
  2. Client向Yarn ResourceManager提交作业并申请资源
  3. Yarn ResourceManager分配Container资源并启动ApplicationMaster,然后AM加载Flink相关的jar包和配置文件构建好坏境,启动JobManager

    JobManager和ApplicationMaster运行在同一个Container中。 他俩成功启动后,AM就获得了JobManager的地址,他会为TaskManager生成一个新的Flink配置(如何连接到JobManager的信息,这个配置会被上传到hdfs上)。 同时,AM容器会为Flink提供一个临时的web服务端口,用户允许并行执行多个Flink应用

  4. ApplicationMaster向ResourceManager申请工作资源,NodeManager会加载Flink的jar包和配置构建坏境启动TaskManager

  5. TaskManager启动后,会向JobManager发送心跳,等待JobManager分配任务

Flink on Yarn有两种运行模式可以选择:

Session模式

该模式会在Yarn集群中启动一个Flink集群,并进行重复使用,结构如下:
image.png
它有如下特点:
flink-session.png 该模式事先申请好资源,并长期运行JobManager和TaskManager。这样的好处就是每次提交作业不需要重新申请资源,缺点也是资源将不会被释放,会一直占用系统资源。
适用场景:作业频繁提交,小作业较多的场景。

PerJob模式

该模式会对每一个Flink任务单独启动一个Flink集群,运行完将会释放,不能被重复使用。结构如下:
image.png
有如下特点:
flink-s2.png
该模式每次提交作业都要重新申请资源,并作业运行完,资源会被释放,这会影响到执行效率,因为资源申请需要消耗时间,但带来的好处就是能够进行资源隔离,并且可以按需申请TM slots数量。
适用场景:作业数量小,大作业较多的场景。实际场景中该方式使用的最多。

部署测试

对于Yarn集群,我们关闭内存检查:
修改yarn-site.xml配置文件

<!-- 关闭yarn内存检查 -->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
    <value>false</value>
</property>
<property>
     <name>yarn.nodemanager.vmem-check-enabled</name>
     <value>false</value>
</property>

上面参数确定是否启动线程来检查每个任务正在使用的虚拟内存量,如果任务超出分配值,会被直接杀掉,默认为true。我们需要关闭,因为flink任务本身会大量使用内存,在yarn模式下很容易超标。

Session会话模式

  • 启动Flink session
    cd /opt/flink-1.12.0
    ./bin/yarn-session.sh -n 2 -tm 800 -s 1 -d
    

    上面命令会启动2个cpu,1600m内存的flink集群 -n 2表示申请2个容器启动taskmanager -tm 800 表示每个taskmanager内存为800m -s 1 表示每个taskmanager的slots数量为1 -d 表示后台程序运行

提交运行成功,可以在yarn web UI上看到一个常驻的application:

image.png

  • 提交任务测试

    ./bin/flink run examples/batch/WordCount.jar
    

    image.png

  • 停止flink集群

    yarn application -kill application_1618344613555_0002(appid)
    

    Job分离模式

  • 提交Flink Job

    ./bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 examples/batch/WordCount.jar
    

    -m 设置JobManager地址为yarn-cluster,使用YARN执行模式 -yjm 设置jobmanager内存 -ytm 设置taskmanager内存

提交完在yarn UI上可以看到job信息:

image.png