提交 Job

注意⚠️

请在 flink 集群下的机器上进行操作

首先切换到 flink 安装路径

  1. cd $FLINK_HOME

Session 集群模式

  • 启动 flink 集群
  1. ./bin/yarn-session.sh --detached

此时会返回一个 application-id,后续查看 job 状态需要用到,如果忘记了,可以使用 yarn app -list 查看所有的 application

同时也会返回 flink 集群的 IP 和端口

Flink Job 提交 - 图1

  • 提交 job(需要在启动 flink 集群的节点上提交)
  1. ./bin/flink run ./examples/streaming/TopSpeedWindowing.jar

在其他机器上可以通过 -m 指定 flink 集群的地址进行提交

  1. ./bin/flink run -m node05:37633 ./examples/batch/WordCount.jar
  • 查看 application 列表,以及 flink 集群的 ip 地址和端口(Tracking-URL)
  1. $ yarn app -list
  2. 2022-02-11 14:16:03,640 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at node01/172.31.1.16:8032
  3. Total number of applications (application-types: [], states: [SUBMITTED, ACCEPTED, RUNNING] and tags: []):2
  4. Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL
  5. application_1642755490153_0029 Flink per-job cluster Apache Flink root default RUNNING UNDEFINED 100% http://node01:38727
  6. application_1642755490153_0030 Flink session cluster Apache Flink root default RUNNING UNDEFINED 100% http://node02:42253
  • 查看 flink 集群运行的 job
  1. ./bin/flink list -Dyarn.application.id=application_XXXX_YY
  • 取消运行的 job
  1. ./bin/flink cancel $JOB_ID
  • 停止 Flink 集群
  1. echo "stop" | ./bin/yarn-session.sh -id application_XXXXX_XXX

Per-Job 集群模式(推荐)

  • 提交任务
  1. ./bin/flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar

此时会返回一个 application-id,后续查看 job 状态需要用到,如果忘记了,可以使用 yarn app -list 查看所有的 application

Flink Job 提交 - 图2

  • 查看 application 列表,以及 flink 集群的 ip 地址和端口(Tracking-URL)
  1. root@node01:~# yarn app -list
  2. 2022-02-11 14:16:03,640 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at node01/172.31.1.16:8032
  3. Total number of applications (application-types: [], states: [SUBMITTED, ACCEPTED, RUNNING] and tags: []):2
  4. Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL
  5. application_1642755490153_0029 Flink per-job cluster Apache Flink root default RUNNING UNDEFINED 100% http://node01:38727
  6. application_1642755490153_0030 Flink session cluster Apache Flink root default RUNNING UNDEFINED 100% http://node02:42253
  • 查看 flink 集群运行的 job
  1. ./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
  • 取消运行的 job(job 停止后 flink 集群自动销毁)
  1. ./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>

Application 集群模式

  • 提交任务
  1. ./bin/flink run-application -t yarn-application ./examples/streaming/TopSpeedWindowing.jar

此时会返回一个 application-id,如果忘记了,可以使用 yarn app -list 查看所有的 application

Flink Job 提交 - 图3

  • 查看 application 列表,以及 flink 集群的 ip 地址和端口(Tracking-URL)
  1. root@node01:~# yarn app -list
  2. 2022-02-11 14:16:03,640 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at node01/172.31.1.16:8032
  3. Total number of applications (application-types: [], states: [SUBMITTED, ACCEPTED, RUNNING] and tags: []):2
  4. Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL
  5. application_1642755490153_0029 Flink per-job cluster Apache Flink root default RUNNING UNDEFINED 100% http://node01:38727
  6. application_1642755490153_0030 Flink session cluster Apache Flink root default RUNNING UNDEFINED 100% http://node02:42253
  • 查看 flink 集群运行的 job
  1. ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
  • 取消运行的 job(job 停止后 flink 集群自动销毁)
  1. ./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>

提交 PyFlink Job

注意⚠️: Python 版本3.6 +
  1. $ python --version
  2. # Python 版本必须 3.6+

提交 pylink 作业

  1. ./bin/flink run --python examples/python/table/word_count.py

Flink job 提交更详细内容可以参考

命令行界面 | Apache Flink

通过 Yarn 集群管理进入 Flink job 管理界面

  • Yarn 管理接口 8088

Flink Job 提交 - 图4

  • 可以查看 application 列表,以及每个 application 的状态

Flink Job 提交 - 图5 Flink Job 提交 - 图6

  • 跳转到 flink web 管理页面

点击 Tracking UI 下面对应 application 的 ApplicationMaster 链接跳转到 flink web 管理页面,此时使用了代理进行转发,不需要关心 flink 集群使用的 IP 和端口(flink 集群挂掉,重新启动新的集群,此时 flink 集群使用新的 IP 和端口)

Flink Job 提交 - 图7

  • 点击 application 链接,查看 application 详细信息

Flink Job 提交 - 图8

Flink Job 提交 - 图9

  • 可以在详情界面点击 kill application,杀掉 application

Flink Job 提交 - 图10

  • 可以直接使用 flink 集群的 IP 和端口进入flink web 管理页面

Flink Job 提交 - 图11