最佳实践

这里是Flink常见问题的解决办法

传递命令行参数并在Flink程序中使用

大部分Flink程序(无论是批处理还是流式计算)都依赖于外部的参数配置. 比如指定输入输出位置(路径或地址),系统参数(并行度和运行时配置),程序相关的参数配置。

Flink提供一个叫 ParameterTool 的工具类来解决此问题。 ParameterTool 并不是解决这个问题的唯一办法,其他框架,比如 Commons CLIargparse4j 也可以解决这个问题.

通过 ParameterTool 来获取配置值

ParameterTool 工具提供了很多读取配置的静态方法, 工具类内部实现类似于 Map<String, String>, 所以和项目代码风格不会有冲突

.properties 文件获取配置

下面是个读 Properties 文件并将内容转换成key-value的例子

  1. String propertiesFilePath = "/home/sam/flink/myjob.properties";
  2. ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFilePath);
  3. File propertiesFile = new File(propertiesFilePath);
  4. ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);
  5. InputStream propertiesFileInputStream = new FileInputStream(file);
  6. ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFileInputStream);

从命令行中获取配置

下面是命令行参数的例子,例如在命令行后加入 --input hdfs:///mydata --elements 42 参数。

  1. public static void main(String[] args) {
  2. ParameterTool parameter = ParameterTool.fromArgs(args);
  3. // .. regular code ..

从系统属性中获取配置

启动JVM时,可以把系统属性传递给程序,例如: -Dinput=hdfs:///mydata。 可以通过 ParameterTool 来获取配置,例如:

  1. ParameterTool parameter = ParameterTool.fromSystemProperties();

在Flink程序中使用配置参数

根据上文,我们已经获取了参数,下面是如何使用它们。

使用 ParameterTool 工具类

ParameterTool 工具类有直接访问值的方法,例如:

  1. ParameterTool parameters = // ...
  2. parameter.getRequired("input");
  3. parameter.get("output", "myDefaultValue");
  4. parameter.getLong("expectedCount", -1L);
  5. parameter.getNumberOfParameters()
  6. // .. there are more methods available.

您可以直接在 main() 中使用, 您可以通过如下代码来设置算子的并行度:

  1. ParameterTool parameters = ParameterTool.fromArgs(args);
  2. int parallelism = parameters.get("mapParallelism", 2);
  3. DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).setParallelism(parallelism);

因为 ParameterTool 是可序列化的,所以你可以把它作为函数的参数:

  1. ParameterTool parameters = ParameterTool.fromArgs(args);
  2. DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer(parameters));

然后在函数内通过上文方法获取值.

注册全局参数

通过 ExecutionConfig 注册为全局参数后,可以被JobManager Web界面中的配置值和代码内的所有函数来访问.

注册为全局参数:

  1. ParameterTool parameters = ParameterTool.fromArgs(args);
  2. // set up the execution environment
  3. final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  4. env.getConfig().setGlobalJobParameters(parameters);

在代码中的函数内访问:

  1. public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
  2. @Override
  3. public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
  4. ParameterTool parameters = (ParameterTool)
  5. getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
  6. parameters.getRequired("input");
  7. // .. do more ..

超大TupleX类型命令

强烈推荐使用 POJO 来代替有很多字段的 TupleX, POJO 可以代替多字段的 Tuple类型.

Example

而不是使用:

  1. Tuple11<String, String, ..., String> var = new ...;

把大型元组类型继承为自定义类型也会方便很多

  1. CustomType var = new ...;
  2. public static class CustomType extends Tuple11<String, String, ..., String> {
  3. // constructor matching super
  4. }

用Logback而不是Log4j

注意:本教程适用于Flink 0.10及其以上版本

Apache Flink 使用 slf4j 作为记录日志的抽象.,建议用户在使用事采用sfl4j来记录日志。

Sfl4j 是一个日志记录的抽象接口,可以在代码中使用不同的日志实现,例如 log4j 或者 Logback

Flink 默认依赖于Log4j. 本页介绍如何使用在Flink中使用Logback. 用户报告说, 他们可以通过 Graylog来建立一个集中式日志收集处理系统.

以下代码可以获取Logger实例:

  1. import org.slf4j.Logger;
  2. import org.slf4j.LoggerFactory;
  3. public class MyClass implements MapFunction {
  4. private static final Logger LOG = LoggerFactory.getLogger(MyClass.class);
  5. // ...

当通过 IDE 或者 Java程序运行Flink程序时如何使用Logback

在任何情况下,类都是通过依赖管理器(例如maven)创建的路径来执行, Flink 会把log4j的相关依赖拉入到classpath中.

因此, 需要排除掉Flink的log4j依赖. 下面的pom文件从 Flink quickstart创建的maven项目.

pom.xml文件需要进行如下更改:

  1. <dependencies>
  2. <!-- Add the two required logback dependencies -->
  3. <dependency>
  4. <groupId>ch.qos.logback</groupId>
  5. <artifactId>logback-core</artifactId>
  6. <version>1.1.3</version>
  7. </dependency>
  8. <dependency>
  9. <groupId>ch.qos.logback</groupId>
  10. <artifactId>logback-classic</artifactId>
  11. <version>1.1.3</version>
  12. </dependency>
  13. <!-- Add the log4j -> sfl4j (-> logback) bridge into the classpath
  14. Hadoop is logging to log4j! -->
  15. <dependency>
  16. <groupId>org.slf4j</groupId>
  17. <artifactId>log4j-over-slf4j</artifactId>
  18. <version>1.7.7</version>
  19. </dependency>
  20. <dependency>
  21. <groupId>org.apache.flink</groupId>
  22. <artifactId>flink-java</artifactId>
  23. <version>1.7.1</version>
  24. <exclusions>
  25. <exclusion>
  26. <groupId>log4j</groupId>
  27. <artifactId>*</artifactId>
  28. </exclusion>
  29. <exclusion>
  30. <groupId>org.slf4j</groupId>
  31. <artifactId>slf4j-log4j12</artifactId>
  32. </exclusion>
  33. </exclusions>
  34. </dependency>
  35. <dependency>
  36. <groupId>org.apache.flink</groupId>
  37. <artifactId>flink-streaming-java_2.11</artifactId>
  38. <version>1.7.1</version>
  39. <exclusions>
  40. <exclusion>
  41. <groupId>log4j</groupId>
  42. <artifactId>*</artifactId>
  43. </exclusion>
  44. <exclusion>
  45. <groupId>org.slf4j</groupId>
  46. <artifactId>slf4j-log4j12</artifactId>
  47. </exclusion>
  48. </exclusions>
  49. </dependency>
  50. <dependency>
  51. <groupId>org.apache.flink</groupId>
  52. <artifactId>flink-clients_2.11</artifactId>
  53. <version>1.7.1</version>
  54. <exclusions>
  55. <exclusion>
  56. <groupId>log4j</groupId>
  57. <artifactId>*</artifactId>
  58. </exclusion>
  59. <exclusion>
  60. <groupId>org.slf4j</groupId>
  61. <artifactId>slf4j-log4j12</artifactId>
  62. </exclusion>
  63. </exclusions>
  64. </dependency>
  65. </dependencies>

&lt;dependencies&gt; 部分进行了以下更改:

  • 从Flink中排除了所有的 log4j 依赖项: maven会忽略所有Flink对log4j的依赖传递。
  • 从Flink中排除了所有的 slf4j-log4j12 依赖项: 因为我们要使用sl4j来进行logback绑定, 所以我们必须将slf4j从log4j中的绑定删除。
  • 增加 logback-corelogback-classic的依赖项。
  • 增加 log4j-over-slf4j依赖项。 log4j-over-slf4j 是一个可以通过Log4j API来使用Slf4j接口的工具。 Flink依赖于Hadoop,然而Hadoop直接使用Log4j进行日志记录。 因此, 我们需要将所有日志记录器调用从Log4j重定向到Slf4j,后者又记录到Logback。

请注意,您需要对新添加到pom中的Flink依赖都进行手动排除。

您还需要检查其他非Flink依赖项是不是引入了log4j绑定,可以通过mvn dependency:tree来对项目依赖进行分析.

在集群上的Flink程序如何使用Logback

This tutorial is applicable when running Flink on YARN or as a standalone cluster. 本教程适用于在YARN上运行Flink或其他独立集群。

为了在Flink中使用Logback而不是Log4j, 您首先需要从 lib/ 目录移除 log4j-1.2.xx.jarsfl4j-log4j12-xxx.jar

然后, 您需要把以下Jar文件放到 lib/ 目录下:

  • logback-classic.jar
  • logback-core.jar
  • log4j-over-slf4j.jar: 此桥接器需要存在于classpath中,以便Hadoop将日志记录器调用从Log4j重定向到Slf4j。

请注意,此情况你需要在向YARN提交Flink作业时显式指定 lib/ 目录。 例如:

./bin/flink run -yt $FLINK_HOME/lib &lt;... remaining arguments ...&gt;