Flink提供命令行界面(CLI)来运行打包为JAR文件的程序,并控制它们的执行。CLI是任何Flink设置的一部分,可在本地单节点设置和分布式设置中使用。它位于<flink-home>/bin/flink 默认情况下,并连接到从同一安装目录启动的正在运行的Flink主服务器(JobManager)。
使用命令行界面的先决条件是Flink主机(JobManager)已启动(通过 <flink-home>/bin/start-cluster.sh)或YARN环境可用。
命令行可用于
例子
- 运行没有参数的示例程序:
./bin/flink run ./examples/batch/WordCount.jar
- 使用输入和结果文件的参数运行示例程序:
./bin/flink run ./examples/batch/WordCount.jar \--input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
- 运行带有并行性的示例程序16以及输入和结果文件的参数:
./bin/flink run -p 16 ./examples/batch/WordCount.jar \--input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
- 运行禁用flink log输出的示例程序:
./bin/flink run -q ./examples/batch/WordCount.jar
- 以分离模式运行示例程序:
./bin/flink run -d ./examples/batch/WordCount.jar
- 在特定的JobManager上运行示例程序:
./bin/flink run -m myJMHost:8081 \./examples/batch/WordCount.jar \--input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
- 以特定类作为入口点运行示例程序:
./bin/flink run -c org.apache.flink.examples.java.wordcount.WordCount \./examples/batch/WordCount.jar \--input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
- 使用具有2个TaskManagers 的每作业YARN群集运行示例程序:
./bin/flink run -m yarn-cluster -yn 2 \./examples/batch/WordCount.jar \--input hdfs:///user/hamlet.txt --output hdfs:///user/wordcount_out
- 以Word为单位显示WordCount示例程序的优化执行计划:
./bin/flink info ./examples/batch/WordCount.jar \--input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
- 列出计划和正在运行的作业(包括其JobID):
./bin/flink list
- 列出预定作业(包括其作业ID):
./bin/flink list -s
- 列出正在运行的作业(包括其作业ID):
./bin/flink list -r
- 列出所有现有工作(包括其作业ID):
./bin/flink list -a
- 列出在Flink YARN会话中运行Flink作业:
./bin/flink list -m yarn-cluster -yid <yarnApplicationID> -r
- 取消工作:
./bin/flink cancel <jobID>
- 使用保存点取消作业:
./bin/flink cancel -s [targetDirectory] <jobID>
- 停止工作(仅限流处理工作):
./bin/flink stop <jobID>
- 修改正在运行的作业(仅限流式处理作业):. / bin/flink modify
-p
注意:取消和停止(流处理)作业的区别如下:
在取消呼叫中,作业中的算子立即接收cancel()方法调用以尽快取消它们。如果算子在取消呼叫后没有停止,Flink将开始定期中断线程,直到它停止。
“停止”呼叫是一种更优雅的方式来停止正在运行的流处理作业。Stop仅适用于使用实现StoppableFunction接口的源的作业。当用户请求停止作业时,所有源都将接收stop()方法调用。该工作将继续运行,直到所有资源正常关闭。这允许作业完成处理所有飞行数据。
保存点
保存点通过命令行客户端控制:
触发保存点
./bin/flink savepoint <jobId> [savepointDirectory]
这将触发具有ID的作业的保存点jobId,并返回创建的保存点的路径。您需要此路径来还原和部署保存点。
此外,您可以选择指定目标文件系统目录以存储保存点。该目录需要可由JobManager访问。
如果未指定目标目录,则需要配置默认目录。否则,触发保存点将失败。
使用YARN触发保存点
./bin/flink savepoint <jobId> [savepointDirectory] -yid <yarnAppId>
这将触发具有ID jobId和YARN应用程序ID 的作业的保存点yarnAppId,并返回创建的保存点的路径。
其他所有内容与上面触发保存点部分中描述的相同。
使用保存点取消
您可以自动触发保存点并取消作业。
./bin/flink cancel -s [savepointDirectory] <jobID>
如果未配置保存点目录,则需要为Flink安装配置默认保存点目录(请参阅保存点)。
只有保存点成功,才会取消该作业。
恢复保存点
./bin/flink run -s <savepointPath> ...
run命令有一个保存点标志来提交作业,该作业从保存点恢复其状态。savepoint trigger命令返回保存点路径。
默认情况下,我们尝试将所有保存点状态与正在提交的作业进行匹配。如果要允许跳过无法使用新作业恢复的保存点状态,可以设置allowNonRestoredState标志。如果在触发保存点并且仍想使用保存点时从程序中删除了作为程序一部分的 算子,则需要允许此 算子操作。
./bin/flink run -s <savepointPath> -n ...
如果您的程序删除了属于保存点的 算子,这将非常有用。
配置保存点
./bin/flink savepoint -d <savepointPath>
在给定路径处理保存点。savepoint trigger命令返回保存点路径。
如果使用自定义状态实例(例如自定义还原状态或RocksDB状态),则必须指定触发保存点的程序JAR的路径,以便使用用户代码类加载器处理保存点:
./bin/flink savepoint -d <savepointPath> -j <jarFile>
否则,你会遇到一个ClassNotFoundException。
用法
命令行语法如下:
./flink <ACTION> [OPTIONS] [ARGUMENTS]The following actions are available:Action "run" compiles and runs a program.Syntax: run [OPTIONS] <jar-file> <arguments>"run" action options:-c,--class <classname> Class with the program entry point("main" method or "getPlan()" method.Only needed if the JAR file does notspecify the class in its manifest.-C,--classpath <url> Adds a URL to each user codeclassloader on all nodes in thecluster. The paths must specify aprotocol (e.g. file://) and beaccessible on all nodes (e.g. by meansof a NFS share). You can use thisoption multiple times for specifyingmore than one URL. The protocol mustbe supported by the {@linkjava.net.URLClassLoader}.-d,--detached If present, runs the job in detachedmode-n,--allowNonRestoredState Allow to skip savepoint state thatcannot be restored. You need to allowthis if you removed an operator fromyour program that was part of theprogram when the savepoint wastriggered.-p,--parallelism <parallelism> The parallelism with which to run theprogram. Optional flag to override thedefault value specified in theconfiguration.-q,--sysoutLogging If present, suppress logging output tostandard out.-s,--fromSavepoint <savepointPath> Path to a savepoint to restore the jobfrom (for examplehdfs:///flink/savepoint-1537).Options for yarn-cluster mode:-d,--detached If present, runs the job in detachedmode-m,--jobmanager <arg> Address of the JobManager (master) towhich to connect. Use this flag toconnect to a different JobManager thanthe one specified in theconfiguration.-yD <property=value> use value for given property-yd,--yarndetached If present, runs the job in detachedmode (deprecated; use non-YARNspecific option instead)-yh,--yarnhelp Help for the Yarn session CLI.-yid,--yarnapplicationId <arg> Attach to running YARN session-yj,--yarnjar <arg> Path to Flink jar file-yjm,--yarnjobManagerMemory <arg> Memory for JobManager Containerwith optional unit (default: MB)-yn,--yarncontainer <arg> Number of YARN container to allocate(=Number of Task Managers)-ynm,--yarnname <arg> Set a custom name for the applicationon YARN-yq,--yarnquery Display available YARN resources(memory, cores)-yqu,--yarnqueue <arg> Specify YARN queue.-ys,--yarnslots <arg> Number of slots per TaskManager-yst,--yarnstreaming Start Flink in streaming mode-yt,--yarnship <arg> Ship files in the specified directory(t for transfer)-ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Containerwith optional unit (default: MB)-yz,--yarnzookeeperNamespace <arg> Namespace to create the Zookeepersub-paths for high availability mode-ynl,--yarnnodeLabel <arg> Specify YARN node label for the YARN application-z,--zookeeperNamespace <arg> Namespace to create the Zookeepersub-paths for high availability modeOptions for default mode:-m,--jobmanager <arg> Address of the JobManager (master) to whichto connect. Use this flag to connect to adifferent JobManager than the one specifiedin the configuration.-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-pathsfor high availability modeAction "info" shows the optimized execution plan of the program (JSON).Syntax: info [OPTIONS] <jar-file> <arguments>"info" action options:-c,--class <classname> Class with the program entry point ("main"method or "getPlan()" method. Only neededif the JAR file does not specify the classin its manifest.-p,--parallelism <parallelism> The parallelism with which to run theprogram. Optional flag to override thedefault value specified in theconfiguration.Action "list" lists running and scheduled programs.Syntax: list [OPTIONS]"list" action options:-r,--running Show only running programs and their JobIDs-s,--scheduled Show only scheduled programs and their JobIDsOptions for yarn-cluster mode:-m,--jobmanager <arg> Address of the JobManager (master) towhich to connect. Use this flag to connectto a different JobManager than the onespecified in the configuration.-yid,--yarnapplicationId <arg> Attach to running YARN session-z,--zookeeperNamespace <arg> Namespace to create the Zookeepersub-paths for high availability modeOptions for default mode:-m,--jobmanager <arg> Address of the JobManager (master) to whichto connect. Use this flag to connect to adifferent JobManager than the one specifiedin the configuration.-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-pathsfor high availability modeAction "stop" stops a running program (streaming jobs only).Syntax: stop [OPTIONS] <Job ID>"stop" action options:Options for yarn-cluster mode:-m,--jobmanager <arg> Address of the JobManager (master) towhich to connect. Use this flag to connectto a different JobManager than the onespecified in the configuration.-yid,--yarnapplicationId <arg> Attach to running YARN session-z,--zookeeperNamespace <arg> Namespace to create the Zookeepersub-paths for high availability modeOptions for default mode:-m,--jobmanager <arg> Address of the JobManager (master) to whichto connect. Use this flag to connect to adifferent JobManager than the one specifiedin the configuration.-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-pathsfor high availability modeAction "cancel" cancels a running program.Syntax: cancel [OPTIONS] <Job ID>"cancel" action options:-s,--withSavepoint <targetDirectory> Trigger savepoint and cancel job.The target directory is optional. Ifno directory is specified, theconfigured default directory(state.savepoints.dir) is used.Options for yarn-cluster mode:-m,--jobmanager <arg> Address of the JobManager (master) towhich to connect. Use this flag to connectto a different JobManager than the onespecified in the configuration.-yid,--yarnapplicationId <arg> Attach to running YARN session-z,--zookeeperNamespace <arg> Namespace to create the Zookeepersub-paths for high availability modeOptions for default mode:-m,--jobmanager <arg> Address of the JobManager (master) to whichto connect. Use this flag to connect to adifferent JobManager than the one specifiedin the configuration.-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-pathsfor high availability modeAction "savepoint" triggers savepoints for a running job or disposes existing ones.Syntax: savepoint [OPTIONS] <Job ID> [<target directory>]"savepoint" action options:-d,--dispose <arg> Path of savepoint to dispose.-j,--jarfile <jarfile> Flink program JAR file.Options for yarn-cluster mode:-m,--jobmanager <arg> Address of the JobManager (master) towhich to connect. Use this flag to connectto a different JobManager than the onespecified in the configuration.-yid,--yarnapplicationId <arg> Attach to running YARN session-z,--zookeeperNamespace <arg> Namespace to create the Zookeepersub-paths for high availability modeOptions for default mode:-m,--jobmanager <arg> Address of the JobManager (master) to whichto connect. Use this flag to connect to adifferent JobManager than the one specifiedin the configuration.-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-pathsfor high availability modeAction "modify" modifies a running job (e.g. change of parallelism).Syntax: modify <Job ID> [OPTIONS]"modify" action options:-h,--help Show the help message for the CLIFrontend or the action.-p,--parallelism <newParallelism> New parallelism for the specified job.-v,--verbose This option is deprecated.Options for yarn-cluster mode:-m,--jobmanager <arg> Address of the JobManager (master) towhich to connect. Use this flag to connectto a different JobManager than the onespecified in the configuration.-yid,--yarnapplicationId <arg> Attach to running YARN session-z,--zookeeperNamespace <arg> Namespace to create the Zookeepersub-paths for high availability modeOptions for default mode:-m,--jobmanager <arg> Address of the JobManager (master) to whichto connect. Use this flag to connect to adifferent JobManager than the one specifiedin the configuration.-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-pathsfor high availability mode
