命令行界面

译者:flink.sojb.cn

Flink提供命令行界面(CLI)来运行打包为JAR文件的程序,并控制它们的执行。CLI是任何Flink设置的一部分,可在本地单节点设置和分布式设置中使用。它位于<flink-home>/bin/flink 默认情况下,并连接到从同一安装目录启动的正在运行的Flink主服务器(JobManager)。

使用命令行界面的先决条件是Flink主机(JobManager)已启动(通过 <flink-home>/bin/start-cluster.sh)或YARN环境可用。

命令行可用于

例子

  • 运行没有参数的示例程序:

    1. ./bin/flink run ./examples/batch/WordCount.jar
  • 使用输入和结果文件的参数运行示例程序:

    1. ./bin/flink run ./examples/batch/WordCount.jar \
    2. --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
  • 运行带有并行性的示例程序16以及输入和结果文件的参数:

    1. ./bin/flink run -p 16 ./examples/batch/WordCount.jar \
    2. --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
  • 运行禁用flink log输出的示例程序:

    1. ./bin/flink run -q ./examples/batch/WordCount.jar
  • 以分离模式运行示例程序:

    1. ./bin/flink run -d ./examples/batch/WordCount.jar
  • 在特定的JobManager上运行示例程序:

    1. ./bin/flink run -m myJMHost:8081 \
    2. ./examples/batch/WordCount.jar \
    3. --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
  • 以特定类作为入口点运行示例程序:

    1. ./bin/flink run -c org.apache.flink.examples.java.wordcount.WordCount \
    2. ./examples/batch/WordCount.jar \
    3. --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
  • 使用具有2个TaskManagers 的每作业YARN群集运行示例程序:

    1. ./bin/flink run -m yarn-cluster -yn 2 \
    2. ./examples/batch/WordCount.jar \
    3. --input hdfs:///user/hamlet.txt --output hdfs:///user/wordcount_out
  • 以Word为单位显示WordCount示例程序的优化执行计划:

    1. ./bin/flink info ./examples/batch/WordCount.jar \
    2. --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
  • 列出计划和正在运行的作业(包括其JobID):

    1. ./bin/flink list
  • 列出预定作业(包括其作业ID):

    1. ./bin/flink list -s
  • 列出正在运行的作业(包括其作业ID):

    1. ./bin/flink list -r
  • 列出所有现有工作(包括其作业ID):

    1. ./bin/flink list -a
  • 列出在Flink YARN会话中运行Flink作业:

    1. ./bin/flink list -m yarn-cluster -yid <yarnApplicationID> -r
  • 取消工作:

    1. ./bin/flink cancel <jobID>
  • 使用保存点取消作业:

    1. ./bin/flink cancel -s [targetDirectory] <jobID>
  • 停止工作(仅限流处理工作):

    1. ./bin/flink stop <jobID>
  • 修改正在运行的作业(仅限流式处理作业):. / bin/flink modify <jobid>-p<newparallelism></newparallelism></jobid>

注意:取消和停止(流处理)作业的区别如下:

在取消呼叫中,作业中的算子立即接收cancel()方法调用以尽快取消它们。如果算子在取消呼叫后没有停止,Flink将开始定期中断线程,直到它停止。

“停止”呼叫是一种更优雅的方式来停止正在运行的流处理作业。Stop仅适用于使用实现StoppableFunction接口的源的作业。当用户请求停止作业时,所有源都将接收stop()方法调用。该工作将继续运行,直到所有资源正常关闭。这允许作业完成处理所有飞行数据。

保存点

保存点通过命令行客户端控制:

触发保存点

  1. ./bin/flink savepoint <jobId> [savepointDirectory]

这将触发具有ID的作业的保存点jobId,并返回创建的保存点的路径。您需要此路径来还原和部署保存点。

此外,您可以选择指定目标文件系统目录以存储保存点。该目录需要可由JobManager访问。

如果未指定目标目录,则需要配置默认目录。否则,触发保存点将失败。

使用YARN触发保存点

  1. ./bin/flink savepoint <jobId> [savepointDirectory] -yid <yarnAppId>

这将触发具有ID jobId和YARN应用程序ID 的作业的保存点yarnAppId,并返回创建的保存点的路径。

其他所有内容与上面触发保存点部分中描述的相同。

使用保存点取消

您可以自动触发保存点并取消作业。

  1. ./bin/flink cancel -s [savepointDirectory] <jobID>

如果未配置保存点目录,则需要为Flink安装配置默认保存点目录(请参阅保存点)。

只有保存点成功,才会取消该作业。

恢复保存点

  1. ./bin/flink run -s <savepointPath> ...

run命令有一个保存点标志来提交作业,该作业从保存点恢复其状态。savepoint trigger命令返回保存点路径。

默认情况下,我们尝试将所有保存点状态与正在提交的作业进行匹配。如果要允许跳过无法使用新作业恢复的保存点状态,可以设置allowNonRestoredState标志。如果在触发保存点并且仍想使用保存点时从程序中删除了作为程序一部分的 算子,则需要允许此 算子操作。

  1. ./bin/flink run -s <savepointPath> -n ...

如果您的程序删除了属于保存点的 算子,这将非常有用。

配置保存点

  1. ./bin/flink savepoint -d <savepointPath>

在给定路径处理保存点。savepoint trigger命令返回保存点路径。

如果使用自定义状态实例(例如自定义还原状态或RocksDB状态),则必须指定触发保存点的程序JAR的路径,以便使用用户代码类加载器处理保存点:

  1. ./bin/flink savepoint -d <savepointPath> -j <jarFile>

否则,你会遇到一个ClassNotFoundException

用法

命令行语法如下:

  1. ./flink <ACTION> [OPTIONS] [ARGUMENTS]
  2. The following actions are available:
  3. Action "run" compiles and runs a program.
  4. Syntax: run [OPTIONS] <jar-file> <arguments>
  5. "run" action options:
  6. -c,--class <classname> Class with the program entry point
  7. ("main" method or "getPlan()" method.
  8. Only needed if the JAR file does not
  9. specify the class in its manifest.
  10. -C,--classpath <url> Adds a URL to each user code
  11. classloader on all nodes in the
  12. cluster. The paths must specify a
  13. protocol (e.g. file://) and be
  14. accessible on all nodes (e.g. by means
  15. of a NFS share). You can use this
  16. option multiple times for specifying
  17. more than one URL. The protocol must
  18. be supported by the {@link
  19. java.net.URLClassLoader}.
  20. -d,--detached If present, runs the job in detached
  21. mode
  22. -n,--allowNonRestoredState Allow to skip savepoint state that
  23. cannot be restored. You need to allow
  24. this if you removed an operator from
  25. your program that was part of the
  26. program when the savepoint was
  27. triggered.
  28. -p,--parallelism <parallelism> The parallelism with which to run the
  29. program. Optional flag to override the
  30. default value specified in the
  31. configuration.
  32. -q,--sysoutLogging If present, suppress logging output to
  33. standard out.
  34. -s,--fromSavepoint <savepointPath> Path to a savepoint to restore the job
  35. from (for example
  36. hdfs:///flink/savepoint-1537).
  37. Options for yarn-cluster mode:
  38. -d,--detached If present, runs the job in detached
  39. mode
  40. -m,--jobmanager <arg> Address of the JobManager (master) to
  41. which to connect. Use this flag to
  42. connect to a different JobManager than
  43. the one specified in the
  44. configuration.
  45. -yD <property=value> use value for given property
  46. -yd,--yarndetached If present, runs the job in detached
  47. mode (deprecated; use non-YARN
  48. specific option instead)
  49. -yh,--yarnhelp Help for the Yarn session CLI.
  50. -yid,--yarnapplicationId <arg> Attach to running YARN session
  51. -yj,--yarnjar <arg> Path to Flink jar file
  52. -yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container
  53. with optional unit (default: MB)
  54. -yn,--yarncontainer <arg> Number of YARN container to allocate
  55. (=Number of Task Managers)
  56. -ynm,--yarnname <arg> Set a custom name for the application
  57. on YARN
  58. -yq,--yarnquery Display available YARN resources
  59. (memory, cores)
  60. -yqu,--yarnqueue <arg> Specify YARN queue.
  61. -ys,--yarnslots <arg> Number of slots per TaskManager
  62. -yst,--yarnstreaming Start Flink in streaming mode
  63. -yt,--yarnship <arg> Ship files in the specified directory
  64. (t for transfer)
  65. -ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container
  66. with optional unit (default: MB)
  67. -yz,--yarnzookeeperNamespace <arg> Namespace to create the Zookeeper
  68. sub-paths for high availability mode
  69. -ynl,--yarnnodeLabel <arg> Specify YARN node label for the YARN application
  70. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
  71. sub-paths for high availability mode
  72. Options for default mode:
  73. -m,--jobmanager <arg> Address of the JobManager (master) to which
  74. to connect. Use this flag to connect to a
  75. different JobManager than the one specified
  76. in the configuration.
  77. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
  78. for high availability mode
  79. Action "info" shows the optimized execution plan of the program (JSON).
  80. Syntax: info [OPTIONS] <jar-file> <arguments>
  81. "info" action options:
  82. -c,--class <classname> Class with the program entry point ("main"
  83. method or "getPlan()" method. Only needed
  84. if the JAR file does not specify the class
  85. in its manifest.
  86. -p,--parallelism <parallelism> The parallelism with which to run the
  87. program. Optional flag to override the
  88. default value specified in the
  89. configuration.
  90. Action "list" lists running and scheduled programs.
  91. Syntax: list [OPTIONS]
  92. "list" action options:
  93. -r,--running Show only running programs and their JobIDs
  94. -s,--scheduled Show only scheduled programs and their JobIDs
  95. Options for yarn-cluster mode:
  96. -m,--jobmanager <arg> Address of the JobManager (master) to
  97. which to connect. Use this flag to connect
  98. to a different JobManager than the one
  99. specified in the configuration.
  100. -yid,--yarnapplicationId <arg> Attach to running YARN session
  101. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
  102. sub-paths for high availability mode
  103. Options for default mode:
  104. -m,--jobmanager <arg> Address of the JobManager (master) to which
  105. to connect. Use this flag to connect to a
  106. different JobManager than the one specified
  107. in the configuration.
  108. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
  109. for high availability mode
  110. Action "stop" stops a running program (streaming jobs only).
  111. Syntax: stop [OPTIONS] <Job ID>
  112. "stop" action options:
  113. Options for yarn-cluster mode:
  114. -m,--jobmanager <arg> Address of the JobManager (master) to
  115. which to connect. Use this flag to connect
  116. to a different JobManager than the one
  117. specified in the configuration.
  118. -yid,--yarnapplicationId <arg> Attach to running YARN session
  119. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
  120. sub-paths for high availability mode
  121. Options for default mode:
  122. -m,--jobmanager <arg> Address of the JobManager (master) to which
  123. to connect. Use this flag to connect to a
  124. different JobManager than the one specified
  125. in the configuration.
  126. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
  127. for high availability mode
  128. Action "cancel" cancels a running program.
  129. Syntax: cancel [OPTIONS] <Job ID>
  130. "cancel" action options:
  131. -s,--withSavepoint <targetDirectory> Trigger savepoint and cancel job.
  132. The target directory is optional. If
  133. no directory is specified, the
  134. configured default directory
  135. (state.savepoints.dir) is used.
  136. Options for yarn-cluster mode:
  137. -m,--jobmanager <arg> Address of the JobManager (master) to
  138. which to connect. Use this flag to connect
  139. to a different JobManager than the one
  140. specified in the configuration.
  141. -yid,--yarnapplicationId <arg> Attach to running YARN session
  142. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
  143. sub-paths for high availability mode
  144. Options for default mode:
  145. -m,--jobmanager <arg> Address of the JobManager (master) to which
  146. to connect. Use this flag to connect to a
  147. different JobManager than the one specified
  148. in the configuration.
  149. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
  150. for high availability mode
  151. Action "savepoint" triggers savepoints for a running job or disposes existing ones.
  152. Syntax: savepoint [OPTIONS] <Job ID> [<target directory>]
  153. "savepoint" action options:
  154. -d,--dispose <arg> Path of savepoint to dispose.
  155. -j,--jarfile <jarfile> Flink program JAR file.
  156. Options for yarn-cluster mode:
  157. -m,--jobmanager <arg> Address of the JobManager (master) to
  158. which to connect. Use this flag to connect
  159. to a different JobManager than the one
  160. specified in the configuration.
  161. -yid,--yarnapplicationId <arg> Attach to running YARN session
  162. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
  163. sub-paths for high availability mode
  164. Options for default mode:
  165. -m,--jobmanager <arg> Address of the JobManager (master) to which
  166. to connect. Use this flag to connect to a
  167. different JobManager than the one specified
  168. in the configuration.
  169. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
  170. for high availability mode
  171. Action "modify" modifies a running job (e.g. change of parallelism).
  172. Syntax: modify <Job ID> [OPTIONS]
  173. "modify" action options:
  174. -h,--help Show the help message for the CLI
  175. Frontend or the action.
  176. -p,--parallelism <newParallelism> New parallelism for the specified job.
  177. -v,--verbose This option is deprecated.
  178. Options for yarn-cluster mode:
  179. -m,--jobmanager <arg> Address of the JobManager (master) to
  180. which to connect. Use this flag to connect
  181. to a different JobManager than the one
  182. specified in the configuration.
  183. -yid,--yarnapplicationId <arg> Attach to running YARN session
  184. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
  185. sub-paths for high availability mode
  186. Options for default mode:
  187. -m,--jobmanager <arg> Address of the JobManager (master) to which
  188. to connect. Use this flag to connect to a
  189. different JobManager than the one specified
  190. in the configuration.
  191. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
  192. for high availability mode