我们是在本地模拟了一个Flink集群,而真正的Flink集群是什么样的,需要我们去了解下:首先Flink有几个关键的组件:
客户端(Client)、作业管理器(JobManager)和 任务管理器( TaskManager)。
代码的提交过程:
实际上是由客户端获取并做转换,之后提交给JobManager的。所以JobManager就是Flink集群里的“管事人”,对作业进行中央调度管理;而它获取到要执行的作业后,会进一步处处理转换,然后分发任务给众多的TaskManager,这里的TaskManager就是负责具体操作的,数据的处理操作都是他们来做。
image.png
在实际的项目中,我们肯定是使用生产环境的集群,将作业提交到集群上运行,本文介绍Flink的部署及作业提交的流程。Flink是一个非常灵活的处理框架,它支持处理不同的部署场景,还可以和不同的资源管理平台方便集成。

集群部署:

  1. hadoop启动与关闭:
  2. /export/server/hadoop-3.3.0/sbin/start-all.sh
  3. /export/server/hadoop-3.3.0/sbin/start-stop.sh
  1. 下载并解压安装包
    具体操作与上节相同。
    2. 修改集群配置
    1. cd conf/
    2. vim flink-conf.yaml
    3. # JobManager 节点地址.
    4. jobmanager.rpc.address: hadoop102
    1. $ vim workers
    2. hadoop103
    3. hadoop104
    这样就指定了 hadoop103 和 hadoop104 为 TaskManager 节点 。 ```bash jobmanager.memory.process.size:对JobManager进程可使用到的全部内存进行配置,包括 JVM 元空间和其他开销,默认为 1600M,可以根据集群规模进行适当调整。 taskmanager.memory.process.size:对 TaskManager 进程可使用到的全部内存进行配置,包括 JVM 元空间和其他开销,默认为 1600M,可以根据集群规模进行适当调整。 taskmanager.numberOfTaskSlots:对每个 TaskManager 能够分配的 Slot 数量进行配置,默认为 1,可根据 TaskManager 所在的机器能够提供给 Flink 的 CPU 数量决定。所谓
    1. Slot 就是 TaskManager 中具体运行一个任务所分配的计算资源。

parallelism.default: Flink 任务执行的默认并行度,优先级低于代码中进行的并行度配置和任务提交时使用参数指定的并行度数量。 关于 Slot 和并行度的概念,我们会在下一章做详细讲解。

3. 分发安装目录 <br />配置修改完毕后,将 Flink 安装目录发给另外两个节点服务器 。
```bash
scp -r ./flink-1.13.0 atguigu@hadoop103:/opt/module
scp -r ./flink-1.13.0 atguigu@hadoop104:/opt/module
  1. 启动集群
    bin/start-cluster.sh
    Starting cluster.
    Starting standalonesession daemon on host hadoop102.
    Starting taskexecutor daemon on host hadoop103.
    Starting taskexecutor daemon on host hadoop104.
    
    [atguigu@hadoop102 flink-1.13.0]$ jps
    13859 Jps
    13782 StandaloneSessionClusterEntrypoint
    [atguigu@hadoop103 flink-1.13.0]$ jps
    12215 Jps
    12124 TaskManagerRunner
    [atguigu@hadoop104 flink-1.13.0]$ jps
    11602 TaskManagerRunner
    11694 Jps
    
    访问我自己的地址:
    http://node1:8081

    集群提交作业

    1.WebUI提交作业
    (1)首先是将程序打包,然后上传到Flink的WebUI上;
    image.png
    image.png
    然后点击任务查看该任务的运行情况
    image.png

2. 命令行提交作业
了通过 WEB UI 界面提交任务之外,也可以直接通过命令行来提交任务。这里为方便起见,我们可以先把 jar 包直接上传到目录 flink-1.13.0 下
(1)首先需要启动集群 bin/start-cluster.sh
(2)在 hadoop102 中执行以下命令启动 netcat nc -lk 7777
(3)进入到 Flink 的安装路径下,在命令行使用 flink run 命令提交作业 。
bin/flink run -m hadoop102:8081 -c com.atguigu.wc.StreamWordCount ./FlinkTutorial-1.0-SNAPSHOT.jar
这里的参数 –m 指定了提交到的 JobManager, -c 指定了入口类
(4)在浏览器打开WebUI,http://hadoop102:8081 查看应用执行情况,
image.png
用 netcat 输入数据,可以在 TaskManager 的标准输出(Stdout)看到对应的统计结果 。
(5)在 log 日志中,也可以查看执行结果,需要找到执行该数据任务的 TaskManager 节点查看日志。

部署模式

在一些应用场景中,对于集群资源分配和占用的方式,可能会有特定的需求。 Flink 为各种场景提供了不同的部署模式,主要有以下三种:

  • 会话模式
  • 单作业模式
  • 应用模式

会话模式(Session Mode) :集群启动时所有资源就都已经确定,所以所有提交的作业会竞争集群中的资源。
单作业模式 (Per-Job Mode ):
会话模式因为资源共享会导致很多问题,所以为了更好地隔离资源,我们可以考虑为每个提交的作业启动一个集群,这就是所谓的单作业(Per-Job)模式,单作业模式也很好理解,就是严格的一对一,集群只为这个作业而生。同样由客户端运行应用程序,然后启动集群,作业被提交给 JobManager,进而分发给 TaskManager 执行。作业作业完成后,集群就会关闭,所有资源也会释放。这样一来,每个作业都有它自己的 JobManager管理,占用独享的资源,即使发生故障,它的 TaskManager 宕机也不会影响其他作业。
这些特性使得单作业模式在生产环境运行更加稳定,所以是实际应用的首选模式。需要注意的是, Flink 本身无法直接这样运行,所以单作业模式一般需要借助一些资源管理框架来启动集群,比如 YARN、 Kubernetes
应用模式(Application Mode) :
前面提到的两种模式下,应用代码都是在客户端上执行,然后由客户端提交给 JobManager的。但是这种方式客户端需要占用大量网络带宽,去下载依赖和把二进制数据发送给JobManager;加上很多情况下我们提交作业用的是同一个客户端,就会加重客户端所在节点的资源消耗。
所以解决办法就是,我们不要客户端了,直接把应用提交到 JobManger 上运行。而这也就代表着,我们需要为每一个提交的应用单独启动一个 JobManager,也就是创建一个集群。这个 JobManager 只为执行这一个应用而存在,执行结束之后 JobManager 也就关闭了,这就是所谓的应用模式,如图 3-12 所示。
image.pngimage.pngimage.png
应用模式与单作业模式,都是提交作业之后才创建集群;单作业模式是通过客户端来提交的,客户端解析出的每一个作业对应一个集群;而应用模式下,是直接由 JobManager 执行应用程序的,并且即使应用包含了多个作业,也只创建一个集群。
总结一下,在会话模式下,集群的生命周期独立于集群上运行的任何作业的生命周期,并且提交的所有作业共享资源。而单作业模式为每个提交的作业创建一个集群,带来了更好的资源隔离,这时集群的生命周期与作业的生命周期绑定。最后,应用模式为每个应用程序创建一
个会话集群,在 JobManager 上直接调用应用程序的 main()方法 。

独立模式

独立模式(Standalone)是部署 Flink 最基本也是最简单的方式:所需要的所有 Flink 组件,都只是操作系统上运行的一个 JVM 进程 ,我们也可以将独立模式的集群放在容器中运行。 Flink 提供了独立模式的容器化部署方式,可以在 Docker 或者 Kubernetes 上进行部署。
独立模式一般只用在开发测试或作业非常少的场景下。

3.3.1 会话模式部署
  可以发现,独立模式的特点是不依赖外部资源管理平台,而会话模式的特点是先启动集群、后提交作业。所以,我们在第 3.1 节用的就是独立模式(Standalone)的会话模式部署。
3.3.2 单作业模式部署
  在 3.2.2 节中我们提到, Flink 本身无法直接以单作业方式启动集群,一般需要借助一些资源管理平台。所以 Flink 的独立(Standalone)集群并不支持单作业模式部署。
3.3.3 应用模式部署
  应用模式下不会提前创建集群,所以不能调用 start-cluster.sh 脚本。我们可以使用同样在bin 目录下的 standalone-job.sh 来创建一个 JobManager。
具体步骤如下:
(1)进入到 Flink 的安装路径下,将应用程序的 jar 包放到 lib/目录下。
$ cp ./FlinkTutorial-1.0-SNAPSHOT.jar lib/
(2)执行以下命令,启动 JobManager。
$ ./bin/standalone-job.sh start --job-classname com.atguigu.wc.StreamWordCount
这里我们直接指定作业入口类,脚本会到 lib 目录扫描所有的 jar 包。
(3)同样是使用 bin 目录下的脚本,启动 TaskManager。
$ ./bin/taskmanager.sh start
(4)如果希望停掉集群,同样可以使用脚本,命令如下。
$ ./bin/standalone-job.sh stop
$ ./bin/taskmanager.sh stop

高可用(High Availability ) :

  分布式除了提供高吞吐,另一大好处就是有更好的容错性。对于 Flink 而言,因为一般会有多个 TaskManager,即使运行时出现故障,也不需要将全部节点重启,只要尝试重启故障节点就可以了。但是我们发现,针对一个作业而言,管理它的 JobManager 却只有一个,这同样有可能出现单点故障。为了实现更好的可用性,我们需要 JobManager 做一些主备冗余,这就是所谓的高可用(High Availability,简称 HA)。
  我们可以通过配置,让集群在任何时候都有一个主 JobManager 和多个备用 JobManagers,如图 3-13 所示,这样主节点故障时就由备用节点来接管集群,接管后作业就可以继续正常运行。主备 JobManager 实例之间没有明显的区别,每个 JobManager 都可以充当主节点或者备节点。

 具体配置如下:
 (1)进入 Flink 的安装路径下的 conf 目录下,修改配置文件: flink-conf.yaml,增加如下配置。
      high-availability: zookeeper
      high-availability.storageDir: hdfs://hadoop102:9820/flink/standalone/ha
      high-availability.zookeeper.quorum: hadoop102:2181,hadoop103:2181,hadoop104:2181
      high-availability.zookeeper.path.root: /flink-standalone
      high-availability.cluster-id: /cluster_atguigu
(2)修改配置文件: masters,配置备用 JobManager 列表
      hadoop102:8081
      hadoop103:8081
(3)分发修改后的配置文件到其他节点服务器。
(4)在/etc/profile.d/my_env.sh 中配置环境变量
      export HADOOP_CLASSPATH=`hadoop classpath
注意:
⚫ 需要提前保证 HAOOP_HOME 环境变量配置成功
⚫ 分发到其他节点

具体部署方法如下:
(1)首先启动 HDFS 集群和 Zookeeper 集群
(2)执行以下命令,启动 standalone HA 集群
      bin/start-cluster.sh
(3)可以分别访问两个备用 JobManager 的 Web UI 页面。
      http://hadoop102:8081
      http://hadoop103:8081
( 4)在 zkCli.sh 中查看谁是 leader
[zk: localhost:2181(CONNECTED) 1] get    /flink-standalone/cluster_atguigu/leader/rest_server_lock
    杀死 hadoop102 上的 Jobmanager, 再看 leader。
[zk: localhost:2181(CONNECTED) 7] get /flink-standalone/cluster_atguigu/leader/rest_server_lock

注意: 不管是不是 leader,从 WEB UI 上是看不到区别的, 都可以提交应用。


部署模式:分为 会话,单作业,应用
Flink的部署模式独立模式,独立模式部署会话,单作业,应用
Flink也可以借助YARN,K8S实现会话,单作业,应用模式的部署。
Flink的部署模式通用,可以部署在不同的平台上。
接下来我们了解下YARN模式:

YARN 模式 :安装部署

独立(Standalone)模式由 Flink 自身提供资源,无需其他框架,这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但我们知道, Flink 是大数据计算框架,不是资源调度框架,这并不是它的强项;所以还是应该让专业的框架做专业的事,和其他资源调度框架
集成更靠谱。而在目前大数据生态中,国内应用最为广泛的资源管理平台就是 YARN 了。所以接下来我们就将学习,在强大的 YARN 平台上 Flink 是如何集成部署的。
整体来说, YARN 上部署的过程是:客户端把 Flink 应用提交给 Yarn 的 ResourceManager,Yarn 的 ResourceManager 会向 Yarn 的 NodeManager 申请容器。在这些容器上, Flink 会部署JobManager 和 TaskManager 的实例,从而启动集群。 Flink 会根据运行在JobManger 上的作业所需要的 Slot 数量动态分配 TaskManager 资源。
相关准备和配置 :
在 Flink1.8.0 之前的版本,想要以 YARN 模式部署 Flink 任务时,需要 Flink 是有 Hadoop支持的。从 Flink 1.8 版本开始,不再提供基于 Hadoop 编译的安装包,若需要 Hadoop 的环境支持,需要自行在官网下载 Hadoop 相关版本的组件 flink-shaded-hadoop-2-uber-2.7.5-10.0.jar,并将该组件上传至 Flink 的 lib 目录下。在 Flink 1.11.0 版本之后,增加了很多重要新特性,其中就包括增加了对Hadoop3.0.0以及更高版本Hadoop的支持,不再提供“flink-shaded-hadoop-*”jar 包,而是通过配置环境变量完成与 YARN 集群的对接 。
在将 Flink 任务部署至 YARN 集群之前,需要确认集群是否安装有 Hadoop,保证 Hadoop版本至少在 2.2 以上,并且集群中安装有 HDFS 服务。
具体配置步骤如下 :

( 1)按照 3.1 节所述,下载并解压安装包,并将解压后的安装包重命名为 flink-1.13.0-yarn,本节的相关操作都将默认在此安装路径下执行。
( 2)配置环境变量,增加环境变量配置如下:
    $ sudo vim /etc/profile.d/my_env.sh
    HADOOP_HOME=/opt/module/hadoop-2.7.5
    export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
    export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
    export HADOOP_CLASSPATH=`hadoop classpath`
    这里必须保证设置了环境变量 HADOOP_CLASSPATH。
(3)启动 Hadoop 集群,包括 HDFS 和 YARN。
[atguigu@hadoop102 ~]$ start-dfs.sh
[atguigu@hadoop103 ~]$ start-yarn.sh

分别在 3 台节点服务器查看进程启动情况。
atguigu@hadoop102 ~]$ jps
5190 Jps
5062 NodeManager
4408 NameNode
4589 DataNode

[atguigu@hadoop103 ~]$ jps
5425 Jps
4680 ResourceManager
5241 NodeManager
4447 DataNode

[atguigu@hadoop104 ~]$ jps
4731 NodeManager
4333 DataNode
4861 Jps
4478 SecondaryNameNode

(4)进入 conf 目录,修改 flink-conf.yaml 文件,修改以下配置,这些配置项的含义在进行 Standalone 模式配置的时候进行过讲解,若在提交命令中不特定指明,这些配置将作为默认配置。
$ cd /opt/module/flink-1.13.0-yarn/conf/
$ vim flink-conf.yaml
    jobmanager.memory.process.size: 1600m
    taskmanager.memory.process.size: 1728m
    taskmanager.numberOfTaskSlots: 8
    parallelism.default: 1

YARN会话模式部署
YARN 的会话模式与独立集群略有不同,需要首先申请一个 YARN 会话(YARN session)来启动 Flink 集群。具体步骤如下:
1. 启动集群

(1)启动 hadoop 集群(HDFS, YARN)。
(2)执行脚本命令向 YARN 集群申请资源,开启一个 YARN 会话,启动 Flink 集群。
    $ bin/yarn-session.sh -nm test
  可用参数解读:
  -d:分离模式,如果你不想让 Flink YARN 客户端一直前台运行,可以使用这个参数,即使关掉当前对话窗口, YARN session 也可以后台运行。
  -jm(--jobManagerMemory):配置 JobManager 所需内存,默认单位 MB。
  -nm(--name):配置在 YARN UI 界面上显示的任务名。
  -qu(--queue):指定 YARN 队列名。
  -tm(--taskManager):配置每个 TaskManager 所使用内存。
注意: Flink1.11.0 版本不再使用-n 参数和-s 参数分别指定 TaskManager 数量和 slot 数量,YARN 会按照需求动态分配 TaskManager 和 slot。所以从这个意义上讲, YARN 的会话模式也不会把集群资源固定,同样是动态分配的。

YARN Session 启动之后会给出一个 web UI 地址以及一个 YARN application ID,如下所示,用户可以通过 web UI 或者命令行两种方式提交作业。
2021-06-03 15:54:27,069 INFO org.apache.flink.yarn.YarnClusterDescriptor
[] - YARN application has been deployed successfully.
2021-06-03 15:54:27,070 INFO org.apache.flink.yarn.YarnClusterDescriptor
[] - Found Web Interface hadoop104:39735 of application
'application_1622535605178_0003'.
JobManager Web Interface: http://hadoop104:39735


2. 提交作业
(1)通过 Web UI 提交作业
这种方式比较简单,与上文所述 Standalone 部署模式基本相同。
(2)通过命令行提交作业
① 将 Standalone 模式讲解中打包好的任务运行 JAR 包上传至集群
② 执行以下命令将该任务提交到已经开启的 Yarn-Session 中运行。
$ bin/flink run -c com.atguigu.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar
客户端可以自行确定 JobManager 的地址,也可以通过-m 或者-jobmanager 参数指定JobManager 的地址, JobManager 的地址在 YARN Session 的启动页面中可以找到。
③ 任务提交成功后,可在 YARN 的 Web UI 界面查看运行情况。
④也可以通过 Flink 的 Web UI 页面查看提交任务的运行情况。

YARN单作业模式部署
在 YARN 环境中,由于有了外部平台做资源调度,所以我们也可以直接向 YARN 提交一个单独的作业,从而启动一个 Flink 集群。
(1)执行命令提交作业
$ bin/flink run -d -t yarn-per-job -c com.atguigu.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar
(2)在 YARN 的 ResourceManager 界面查看执行情况 ,点击可以打开 Flink Web UI 页面进行监控,
(3)可以使用命令行查看或取消作业,命令如下。
$ ./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY<br /> $ ./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>
这里的 application_XXXX_YY 是当前应用的 ID, 是作业的 ID。注意如果取消作业,整个 Flink 集群也会停掉。

YARN应用模式部署:
应用模式同样非常简单,与单作业模式类似,直接执行 flink run-application 命令即可。

$ bin/flink run-application -t yarn-application -c com.atguigu.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar
$ ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
$ ./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY  <jobId>
$ ./bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir" hdfs://myhdfs/jars/my-application.jar

# 这种方式下 jar 可以预先上传到 HDFS,而不需要单独发送到集群,这就使得作业提交更加轻量了。

YARN模式高可用
YARN 模式的高可用和独立模式(Standalone)的高可用原理不一样。
Standalone 模式中, 同时启动多个 JobManager, 一个为“领导者”(leader),其他为“后备”(standby) , 当 leader 挂了, 其他的才会有一个成为 leader。
YARN 的高可用是只启动一个 Jobmanager, 当这个 Jobmanager 挂了之后, YARN 会再次启动一个, 所以其实是利用的 YARN 的重试次数来实现的高可用。

(1)在 yarn-site.xml 中配置 
<property>
  <name>yarn.resourcemanager.am.max-attempts</name>
  <value>4</value>
  <description>
  The maximum number of application master execution attempts.
  </description>
</property>

注意: 配置完不要忘记分发, 和重启 YARN。
(2)在 flink-conf.yaml 中配置
yarn.application-attempts: 3
high-availability: zookeeper
high-availability.storageDir: hdfs://hadoop102:9820/flink/yarn/ha
high-availability.zookeeper.quorum:
hadoop102:2181,hadoop103:2181,hadoop104:2181
high-availability.zookeeper.path.root: /flink-yarn

( 3)启动 yarn-session。
( 4)杀死 JobManager, 查看复活情况。
注意: yarn-site.xml 中配置的是 JobManager 重启次数的上限, flink-conf.xml 中的次数应该小于这个值。


K8S 模式
容器化部署是如今业界流行的一项技术,基于 Docker 镜像运行能够让用户更加方便地对应用进行管理和运维。容器管理工具中最为流行的就是 Kubernetes(k8s),而 Flink 也在最近的版本中支持了 k8s 部署模式。基本原理与 YARN 是类似的。

总结
Flink 支持多种不同的部署模式,还可以和不同的资源管理平台方便地集成。本章从快速启动的示例入手,接着介绍了 Flink 中几种部署模式的区别,并进一步针对不同的资源提供者展开讲解了具体的部署操作。在这个过程中,我们不仅熟悉了 Flink 的使用方法,而且接触到
了很多内部运行原理的知识。
关于 Flink 运行时组件概念的作用,以及作业提交运行的流程架构,我们会在下一章进一步详细展开。