YARN 配置

快速开始

启动YARN

启动一个有 4 Task Managers (每个4 GB堆内存)的YARN session命令:

  1. # get the hadoop2 package from the Flink download page at
  2. # http://flink.apache.org/downloads.html
  3. curl -O <flink_hadoop2_download_url>
  4. tar xvzf flink-1.7.1-bin-hadoop2.tgz
  5. cd flink-1.7.1/
  6. ./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m

-s参数可以指定每个TaskManager的处理槽数。建议将插槽数设置为每台计算机的处理器数。

YARN启动后,可以通过 ./bin/flink 工具来提交Flink作业.

在YARN上运行Flink作业

  1. # get the hadoop2 package from the Flink download page at
  2. # http://flink.apache.org/downloads.html
  3. curl -O <flink_hadoop2_download_url>
  4. tar xvzf flink-1.7.1-bin-hadoop2.tgz
  5. cd flink-1.7.1/
  6. ./bin/flink run -m yarn-cluster -yn 4 -yjm 1024m -ytm 4096m ./examples/batch/WordCount.jar

Flink YARN Session

Apache Hadoop YARN 是一个集群资源管理框架。它允许在群集上运行各种分布式应用程序。Flink在YARN上运行。如果已经有YARN,用户不必安装其他任何东西。

系统配置需求

  • 版本不低于Apache Hadoop 2.2
  • HDFS(Hadoop分布式文件系统)(或Hadoop支持的其他分布式文件系统)

如果在使用Flink YARN时遇到问题,请查阅 FAQ section.

启动 Flink 会话

以下说明了解如何在YARN群集中启动Flink会话。

会话将启动Flink程序运行所必须的服务 (JobManager 和 TaskManagers) 这样就可以把程序提交到集群中.需要注意的是,可以在会话中运行多个程序

Download Flink

下载页下载Hadoop版本>2的安装包. 它包含所需的文件。

通过下面命令解压:

  1. tar xvzf flink-1.7.1-bin-hadoop2.tgz
  2. cd flink-1.7.1/

启动一个会话

通过以下命令启动一个session

  1. ./bin/yarn-session.sh

命令将显示以下概述:

  1. Usage:
  2. Required
  3. -n,--container <arg> Number of YARN container to allocate (=Number of Task Managers)
  4. Optional
  5. -D <arg> Dynamic properties
  6. -d,--detached Start detached
  7. -jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB)
  8. -nm,--name Set a custom name for the application on YARN
  9. -q,--query Display available YARN resources (memory, cores)
  10. -qu,--queue <arg> Specify YARN queue.
  11. -s,--slots <arg> Number of slots per TaskManager
  12. -tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB)
  13. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for HA mode

需要注意的是,客户端需要 YARN_CONF_DIRHADOOP_CONF_DIR 环境变量来保证可以读取 YARN 和 HDFS 配置.

举例: 以下命令可以分配10个TaskManager,每个TaskManager有8 GB内存和32个处理插槽

  1. ./bin/yarn-session.sh -n 10 -tm 8192 -s 32

系统会使用 conf/flink-conf.yaml文件中的配置. 需要更改配置可以参考 configuration guide

YARN上的Flink将覆盖以下配置参数 jobmanager.rpc.address (因为JobManager可能会分配到不同的机器上运行), taskmanager.tmp.dirs (我们使用YARN给出的tmp目录) 以及 parallelism.default 如果已经指定插槽数

I如果不希望通过更改配置文件来设置配置参数,则可以选择通过-D标志传递动态属性。例如通过如下方式传递参数: -Dfs.overwrite-files=true -Dtaskmanager.network.memory.min=536346624.

例子中启动了11个容器(即使只请求了10个容器)因为ApplicationMaster和Job Manager还有一个额外的容器。

在YARN群集中部署Flink后,YARN会显示JobManager的详细信息。

按下CTRL + C 或者输入 ‘stop’可以停止YARN会话。

如果群集上资源从租,Flink会按照请求来启动容器。大多数YARN调度账户考虑所请求容器的内存量,还有些账户会思考vcores的数量。 默认情况下,vcores的数量等于处理slots (-s) 参数。 调整 yarn.containers.vcores 可以自定义vcores的数值。 为了使此参数起作用,需要在群集中启用CPU调度。

分离YARN 会话

如果不想让Flink程序一直运行,可以启动一个 分离 YARN 会话。参数称为 -d or --detached

在这种情况下,Flink YARN客户端将仅向群集提交Flink,然后自行关闭。需要注意的是,在这种情况下,无法使用Flink停止YARN会话,需要使用 YARN 工具 (yarn application -kill &lt;appId&gt;) 来停止YARN会话。

附加到已有Session

使用以下命令启动会话

  1. ./bin/yarn-session.sh

命令将会显示一下内容

  1. Usage:
  2. Required
  3. -id,--applicationId <yarnAppId> YARN application Id

正如前面所述, 必须设置YARN_CONF_DIRHADOOP_CONF_DIR 环境变量来确保可以读取YARN和HDFS配置。

举例: 输入以下命令以附加到正在运行的Flink YARN会话 application_1463870264508_0029:

  1. ./bin/yarn-session.sh -id application_1463870264508_0029

附加到正在运行的会话使用YARN ResourceManager来指定JobManagerRPC端口。

按下CTRL + C 或者输入 ‘stop’可以停止YARN会话。

向Flink提交作业

使用以下命令将Flink程序提交到YARN群集:

  1. ./bin/flink

参数可以参阅 command-line client文档。

该命令将会按照如下显示:

  1. [...]
  2. Action "run" compiles and runs a program.
  3. Syntax: run [OPTIONS] <jar-file> <arguments>
  4. "run" action arguments:
  5. -c,--class <classname> Class with the program entry point ("main"
  6. method or "getPlan()" method. Only needed
  7. if the JAR file does not specify the class
  8. in its manifest.
  9. -m,--jobmanager <host:port> Address of the JobManager (master) to
  10. which to connect. Use this flag to connect
  11. to a different JobManager than the one
  12. specified in the configuration.
  13. -p,--parallelism <parallelism> The parallelism with which to run the
  14. program. Optional flag to override the
  15. default value specified in the
  16. configuration

使用 run 命令把作业提交给YARN。客户端能够确定JobManager的地址。 少数情况下可以通过 -m 参数来指定JobManager的地址。JobManager可以在YARN的控制台上可见

示例

  1. wget -O LICENSE-2.0.txt http://www.apache.org/licenses/LICENSE-2.0.txt
  2. hadoop fs -copyFromLocal LICENSE-2.0.txt hdfs:/// ...
  3. ./bin/flink run ./examples/batch/WordCount.jar \
  4. hdfs:///..../LICENSE-2.0.txt hdfs:///.../wordcount-result.txt

如果出现以下错误,请检查所有TaskManagers都已启动:

  1. Exception in thread "main" org.apache.flink.compiler.CompilerException:
  2. Available instances could not be determined from job manager: Connection timed out.

可以在JobManager Web界面中检查TaskManagers的数量。接口的地址打印在YARN会话控制台中。

如果一分钟后TaskManagers没有出现,则需要使用日志文件确认问题。

在 YARN上运行单作业

上面的文档描述了如何在YARN环境中启动Flink集群。也可以通过YARN中启动Flink来执行单个作业。

需要注意客户端可以通过 -yn 参数来指定 TaskManagers的数量

示例:

  1. ./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar

YARN会话的命令行选项也可用于 ./bin/flink.以 y 或者 yarn 为前缀

注意: 可以为每个作业使用不同的配置目录FLINK_CONF_DIR,需要这样配置时,请拷贝 conf目录,然后分发到不同节点上,并修改对应的配置

注意:当对一个分离的YARN会话(-yd)使用-m yarn-cluster参数时,应用程序将不会从ExecutionEnvironment.execute()调用获得任何累加器结果或异常!

jars 和 Classpath

默认情况下,Flink将在运行单个作业时将用户jar包含到系统类路径中。可以使用 yarn.per-job-cluster.include-user-jar 参数控制.

当设置为 DISABLED 时,Flink会使用用户类路径中的jar包

可以通过以下之一来控制类路径中的user-jar位置:

  • ORDER: (默认) 根据字典顺序将jar添加到系统类路径。
  • FIRST: 将jar添加到系统类路径的开头。
  • LAST: 将jar添加到系统类路径的末尾。

Flink 在 YARN上的故障恢复

运行在YARN上的Flink可以通过配置参数来控制发生故障时的行为,相关配置可以在 conf/flink-conf.yaml 文件配置或使用 -D 参数指定。

  • yarn.reallocate-failed: Flink是否应重新分配失败的TaskManager容器。默认值:true。
  • yarn.maximum-failed-containers: ApplicationMaster在YARN会话失败之前接受的最大失败容器数,默认值:最初请求的TaskManagers (-n)的数量。
  • yarn.application-attempts: ApplicationMaster (+ TaskManager ) 尝试次数,如果此值设置为1(默认值),则当Application master失败时,整个YARN会话将失败。较高的值指定YARN重新启动ApplicationMaster的次数。

调试失败的YARN会话

Flink YARN会话部署失败的原因有很多。Hadoop配置错误(HDFS权限,YARN配置),版本不兼容(在Cloudera Hadoop上运行Flink与vanilla Hadoop依赖关系)或其他错误。

日志文件

在部署期间失败,调试时比较有用的一个手段就是YARN日志聚合,参阅 YARN 日志聚合. 在yarn-site.xml 中配置 yarn.log-aggregation-enable 为true可以启用配置。 启用后,用户可以使用以下命令检索(失败的)YARN会话的所有日志文件。

  1. yarn logs -applicationId <application ID>

需要注意的是,会话结束后需要几秒钟才会显示日志。

YARN Client 终端 和 Web 界面

如果在运行期间发生错误,Flink YARN客户端还会在终端中打印错误消息(例如,如果TaskManager在一段时间后停止工作)。

除此之外,还有YARN Resource Manager Web界面(默认情况下在端口8088上)可以看到详情。端口由yarn.resourcemanager.webapp.address配置指定。

界面可以看到日志文件以运行YARN应用程序,显示失败应用程序和一些错误信息。

构建特定版本的YARN客户端

使用Hortonworks,Cloudera或MapR等公司的Hadoop发行版的用户可能必须针对其特定版本的Hadoop(HDFS)和YARN构建Flink。相关内容可以查阅 构建指南

在防火墙后面的YARN上运行Flink

某些YARN群集使用防火墙来控制群集与网络其余部分之间的网络流量。在这些设置中,Flink作业只能从群集网络内(防火墙后面)提交到YARN会话。这种情况下需要放开Flink应用的相关端口。

目前两个与提交作业相关的服务:

  • JobManager (ApplicationMaster in YARN)
  • 在JobManager中运行的BlobServer。

向Flink提交作业时,BlobServer会将带有用户代码的jar分发给所有工作节点(TaskManagers)。JobManager自己接收作业并触发执行。

用于指定端口的两个配置参数如下:

  • yarn.application-master.port
  • blob.server.port

这两个配置选项可以配置,单个端口(例如:“50010”),范围(“50000-50025”)或两者的组合(“50010,50011,50020-50025,50050-50075”)。

(Hadoop使用类似的机制,比如配置yarn.app.mapreduce.am.job.client.port-range.)

Flink和YARN的内部交互机制

本节简要介绍Flink和YARN如何交互。

YARN 配置 - 图1

ARN客户端需要访问Hadoop配置用来连接到YARN资源管理器和HDFS。使用以下策略确定Hadoop配置:

  • 按顺序测试 YARN_CONF_DIR, HADOOP_CONF_DIR 或者 HADOOP_CONF_PATH 是否存在,如果其中一个变量存在,则使用配置中的路径来读取配置。
  • 如果上面的都不存在 (YARN设置正确不应该这样),则客户端正在使用 HADOOP_HOME 环境变量。 如果设置了则尝试访问 $HADOOP_HOME/etc/hadoop (Hadoop 2) 和 $HADOOP_HOME/conf (Hadoop 1)路径来读取配置。

启动新的Flink YARN会话时,客户端首先检查所请求的资源(容器和内存)是否可用。之后,将包含Flink和配置的jar上传到HDFS(步骤1)。

客户端的下一步是请求(步骤2)YARN容器以启动 ApplicationMaster (步骤 3)。由于客户端将配置和jar文件注册为容器的资源,因此在该特定机器上运行的YARN的NodeManager将负责准备容器(例如,下载文件)。完成后,将启动 ApplicationMaster (AM) 。

JobManager 和 AM 在同一容器内,运行一旦它们成功启动,AM就知道JobManager(它自己的主机)的地址。它正在为TaskManagers生成一个新的Flink配置文件(以便它们可以连接到JobManager)。该文件也自动上传到HDFS中, AM 还提供了Flink的WEB界面。YARN代码分配的所有端口都是 临时端口,这样用户可以并行执行多个Flink YARN会话。

之后,AM开始为Flink的TaskManagers分配容器,这将从HDFS下载jar文件和修改后的配置。完成这些步骤后,即可建立Flink并准备接受作业。