Flink部署
Flink部署模式介绍
Flink有三种部署模式,分别为:

Local模式
Local Cluster模式是开箱即用的,直接解压安装包,然后启动即可。
Standalone模式
Stanalone CLuster是一种独立的集群模式,集群运行不需要依赖外部系统,完全自己独立进行管理。(如果配置HA高可用,则需要ZK进行协调管理)
Yarn(或其他资源管理框架)模式
YARN模式是使用YARN做为Flink运行平台,JobManager、TaskManager、用户提交的应用程序都运行在YARN上。
Flume-Yarn模式部署(Centos7+flink-1.4.0-bin-hadoop26-scala_2.11.tgz)

1、yarn cluster 模式部署介绍

mr和spark都可以基于yarn模式部署,flink也不例外,生产中很多也基于yarn模式部署。
flink的yarn模式部署也分为两种方式,一种是yarn-session,一种是yarn-per-job。大致如下图:
image.png

2、flink session HA模式

需要先启动集群,然后在提交作业,接着会向yarn申请一块资源空间后,资源永远保持不变。如果资源满了,下一个作业就无法提交,只能等到yarn中的其中一个作业执行完成后,释放了资源,那下一个作业才会正常提交。
适合场景:
当作业很少并且都较小,能快速执行完成时,可以使用。否则一般不会使用该模式。
这种模式,不需要做任何配置,直接将任务提价到yarn集群上面去,我们需要提前启动hdfs以及yarn集群即可。

两个进程:
运行yarn-session的主机上会运行FlinkYarnSessionCli和YarnSessionClusterEntrypoint两个进程。
在yarn-session提交的主机上必然运行FlinkYarnSessionCli,这个进场代表本节点可以命令方式提交job,而且可以不用指定-m参数。
YarnSessionClusterEntrypoint进场代表yarn-session集群入口,实际就是jobmanager节点,也是yarn的ApplicationMaster节点。
这两个进程可能会出现在同一节点上,也可能在不同的节点上。

1、配置

  1. [root@hadoop01 flink-1.9.1]# vi ./conf/flink-conf.yaml
  2. 追加如下内容:
  3. # flink yarn HA settings
  4. high-availability: zookeeper
  5. high-availability.zookeeper.quorum: hadoop01:2181,hadoop02:2181,hadoop03:2181
  6. high-availability.zookeeper.path.root: /flink_yarn
  7. high-availability.cluster-id: /cluster_flink_yarn
  8. high-availability.storageDir: hdfs://hadoop01:9000/flink_yarn/recovery

hadoop02 和 hadoop03分别做如上的配置。

2、启动flink session
先确保zookeeper、hdfs、yarn是启动okay。

  1. [root@hadoop01 flink-1.9.1]# yarn-session.sh -n 3 -jm 1024 -tm 1024
  2. ...................................
  3. 2020-04-14 11:52:59,248 INFO org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
  4. 2020-04-14 11:52:59,753 INFO org.apache.flink.runtime.rest.RestClient - Rest client endpoint started.
  5. Flink JobManager is now running on hadoop02:41674 with leader id 04caacdd-23c6-4e79-acd5-6db3b1014be0.
  6. JobManager Web Interface: http://hadoop02:8081 ##代表jobmanager启动到hadoop02

报错:

  1. Diagnostics: Container [pid=9528,containerID=container_1586835850522_0001_03_000001] is running beyond virtual memory limits. Current usage: 316.1 MB of 1 GB physical memory used; 2.3 GB of 2.1 GB virtual memory used. Killing container.
  2. 解决方法:
  3. hadoop01hadoop02hadoop03中的yarn-site.xml中配置如下:
  4. <!--关闭nm的虚拟内存检测-->
  5. <property>
  6. <name>yarn.nodemanager.vmem-check-enabled</name>
  7. <value>false</value>
  8. </property>

3、环境检测
根据启动的信息可知,flink启动到咯hadoop02,则使用jps测试一下:
jps检测进程:

  1. [root@hadoop02 flink-1.9.1]# jps
  2. 8992 DataNode
  3. 9985 YarnSessionClusterEntrypoint

web页面查看:http://hadoop02:8081
image.png

查看yarn的web控制台:http://hadoop01:8088
image.png

4、提交作业测试:
提交作业和standalone一样正常提交即可。

  1. [root@hadoop02 flink-1.9.1]# flink run /usr/local/flink-1.9.1/examples/batch/WordCount.jar --input /home/words --output /home/out/fl01
  2. Starting execution of program
  3. Program execution finished
  4. Job with JobID c3fd22587744bc54a6d69af6573a3183 has finished.
  5. Job Runtime: 20642 ms

5、HA切换检测

  1. [root@hadoop02 flink-1.9.1]# jps
  2. 8992 DataNode
  3. 9985 YarnSessionClusterEntrypoint
  4. 8901 QuorumPeerMain
  5. 9096 SecondaryNameNode
  6. 9720 NodeManager
  7. 11210 Jps
  8. #杀死进程造成异常退出
  9. [root@hadoop02 flink-1.9.1]# kill -9 9985

当是HA时,,则一个挂掉后,则JM将会失败转移到另外的服务器上。如下是转移到hadoop01上。

  1. [root@hadoop01 flink-1.9.1]# jps
  2. 9408 NameNode
  3. 12017 YarnSessionClusterEntrypoint
  4. 再次测试job
  5. [root@hadoop02 flink-1.9.1]# flink run /usr/local/flink-1.9.1/examples/batch/WordCount.jar --input /home/words --output /home/out/fl03
  6. Starting execution of program
  7. Program execution finished
  8. Job with JobID 779689f706af7a9cb05e771a80e89128 has finished.
  9. Job Runtime: 11462 ms
  10. yarn session提交的作业,,在yarnweb平台中看不到。可以通过flink --list来查看。

6、如何停止运行的程序 通过cancel命令进行停止:

  1. flink cancel -s hdfs:///flink/savepoints /savepoints-* -yid application_1586836326559_0002

或者通过 flink list 获得 jobId

  1. flink list
  2. flink cancel -s hdfs:///flink/savepoints/savepoint-* jobId

其中-s为可选操作

7、关闭jobmanager
直接将yarn-session停止掉:

  1. yarn application -kill applicationId

3、flink-per-job模式

yarn session需要先启动一个集群,然后在提交作业。 但是Flink-per-job直接提交作业即可,不需要额外的去启动一个flink-session集群。直接提交作业,即可完成Flink作业。
适合场景:
作业多、且每个作业运行时长不定。生产推荐使用该模式运行作业。
1、直接使用flink run运行即可
[root@hadoop01 flink-1.9.1]# flink run -m yarn-cluster /usr/local/flink-1.9.1/examples/batch/WordCount.jar —input /home/words —output /home/out/fl05
image.png

查看yarn的web平台:
image.png