本地执行

FLink可以在单个机器上运行,甚至可以在单个Java虚拟机上运行。这允许用户在本地测试和调试Flink程序。本节概述了本地执行机制。

本地环境和执行器允许您在本地Java虚拟机中运行FLink程序,或者在任何JVM中运行现有程序的一部分。大多数例子都可以通过点击IDE的“运行”按钮在本地启动。

Flink支持两种不同类型的本地执行。“LocalExecutionEnvironment”正在启动完整的Flink运行时,包括JobManager和TaskManager。这些包括内存管理和在集群模式下执行的所有内部算法。

“收集环境”正在执行Java集合上的FLink程序。此模式不会启动完整的Flink运行时,因此执行开销非常低,而且很轻。例如,一个“DataSet.MaP()”-转换将通过将“MAP()”函数应用到Java列表中的所有元素来执行。

调试

如果您在本地运行FLink程序,也可以像任何其他Java程序一样调试程序。可以使用“system.out.println()”写出一些内部变量,也可以使用调试器。可以在’map()、'reduce()和所有其他方法中设置断点。还请参阅Java API文档中的[调试部分](//C.Apache .Org.Org/Opj.Org/Org/Forink -DOCS-Relay1.7/Dev/Palp/index)HTML HTML调试,以测试Java API中的测试和本地调试实用工具。

Maven依赖

如果您在Maven项目中开发您的程序,则必须使用此依赖项添加“Flink客户端”模块:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-clients_2.11</artifactId>
  4. <version>1.7.1</version>
  5. </dependency>

本地环境

“LocalEnvironment”是flink程序本地执行的句柄。使用它来运行本地JVM中的程序-独立的或嵌入到其他程序中。

本地环境通过方法“ExecutionEnvironment.createLocalEnvironment()”进行实例化。默认情况下,它将使用与您的计算机具有CPU核心(硬件上下文)一样多的本地线程来执行。您也可以指定所需的并行性。本地环境可以配置为使用’enableLogging()/disableLogging()`’登录到控制台。

在大多数情况下,调用“ExecutionEnvironment.getExecutionEnvironment()”是更好的方法。当程序在本地启动(在命令行界面之外)时,该方法返回一个“LocalEnvironment”,当程序被[命令行界面]调用时,该方法返回一个预先配置的集群执行环境(//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/cli.html)。

  1. public static void main(String[] args) throws Exception {
  2. ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
  3. DataSet<String> data = env.readTextFile("file:///path/to/file");
  4. data
  5. .filter(new FilterFunction<String>() {
  6. public boolean filter(String value) {
  7. return value.startsWith("http://");
  8. }
  9. })
  10. .writeAsText("file:///path/to/result");
  11. JobExecutionResult res = env.execute();
  12. }

执行完成后返回的“JobExecutionResult”对象包含程序运行时和累加器结果。

“LocalEnvironment”还允许将自定义配置值传递给flink。

  1. Configuration conf = new Configuration();
  2. conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.5f);
  3. final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);

Note: The local execution environments do not start any web frontend to monitor the execution.

收集环境

使用“收集环境”来执行Java集合是执行FLink程序的低开销方法。此模式的典型用例是自动测试、调试和代码重用。

用户可以使用为批处理实现的算法,也可以在更具交互的情况下使用。在Java应用服务器中可以使用FLink程序的稍微变化的变体来处理传入的请求。

基于集合的执行框架

  1. public static void main(String[] args) throws Exception {
  2. // initialize a new Collection-based execution environment
  3. final ExecutionEnvironment env = new CollectionEnvironment();
  4. DataSet<User> users = env.fromCollection( /* get elements from a Java Collection */);
  5. /* Data Set transformations ... */
  6. // retrieve the resulting Tuple2 elements into a ArrayList.
  7. Collection<...> result = new ArrayList<...>();
  8. resultDataSet.output(new LocalCollectionOutputFormat<...>(result));
  9. // kick off execution.
  10. env.execute();
  11. // Do some work with the resulting ArrayList (=Collection).
  12. for(... t : result) {
  13. System.err.println("Result = "+t);
  14. }
  15. }

“flink examples batch”模块包含一个完整的示例,称为“CollectionExecutionExample”。

请注意,只有适合于JVM堆的小数据才可以执行基于集合的Flink程序。集合上的执行不是多线程的,只使用一个线程。