这里是Flink常见问题的解决办法
传递命令行参数并在Flink程序中使用
大部分Flink程序(无论是批处理还是流式计算)都依赖于外部的参数配置. 比如指定输入输出位置(路径或地址),系统参数(并行度和运行时配置),程序相关的参数配置。
Flink提供一个叫 ParameterTool 的工具类来解决此问题。 ParameterTool 并不是解决这个问题的唯一办法,其他框架,比如 Commons CLI 和 argparse4j 也可以解决这个问题.
通过 ParameterTool 来获取配置值
ParameterTool 工具提供了很多读取配置的静态方法, 工具类内部实现类似于 Map<String, String>, 所以和项目代码风格不会有冲突
从 .properties 文件获取配置
下面是个读 Properties 文件并将内容转换成key-value的例子
String propertiesFilePath = "/home/sam/flink/myjob.properties";ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFilePath);File propertiesFile = new File(propertiesFilePath);ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);InputStream propertiesFileInputStream = new FileInputStream(file);ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFileInputStream);
从命令行中获取配置
下面是命令行参数的例子,例如在命令行后加入 --input hdfs:///mydata --elements 42 参数。
public static void main(String[] args) {ParameterTool parameter = ParameterTool.fromArgs(args);// .. regular code ..
从系统属性中获取配置
启动JVM时,可以把系统属性传递给程序,例如: -Dinput=hdfs:///mydata。 可以通过 ParameterTool 来获取配置,例如:
ParameterTool parameter = ParameterTool.fromSystemProperties();
在Flink程序中使用配置参数
根据上文,我们已经获取了参数,下面是如何使用它们。
使用 ParameterTool 工具类
ParameterTool 工具类有直接访问值的方法,例如:
ParameterTool parameters = // ...parameter.getRequired("input");parameter.get("output", "myDefaultValue");parameter.getLong("expectedCount", -1L);parameter.getNumberOfParameters()// .. there are more methods available.
您可以直接在 main() 中使用, 您可以通过如下代码来设置算子的并行度:
ParameterTool parameters = ParameterTool.fromArgs(args);int parallelism = parameters.get("mapParallelism", 2);DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).setParallelism(parallelism);
因为 ParameterTool 是可序列化的,所以你可以把它作为函数的参数:
ParameterTool parameters = ParameterTool.fromArgs(args);DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer(parameters));
然后在函数内通过上文方法获取值.
注册全局参数
通过 ExecutionConfig 注册为全局参数后,可以被JobManager Web界面中的配置值和代码内的所有函数来访问.
注册为全局参数:
ParameterTool parameters = ParameterTool.fromArgs(args);// set up the execution environmentfinal ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();env.getConfig().setGlobalJobParameters(parameters);
在代码中的函数内访问:
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) {ParameterTool parameters = (ParameterTool)getRuntimeContext().getExecutionConfig().getGlobalJobParameters();parameters.getRequired("input");// .. do more ..
超大TupleX类型命令
强烈推荐使用 POJO 来代替有很多字段的 TupleX, POJO 可以代替多字段的 Tuple类型.
Example
而不是使用:
Tuple11<String, String, ..., String> var = new ...;
把大型元组类型继承为自定义类型也会方便很多
CustomType var = new ...;public static class CustomType extends Tuple11<String, String, ..., String> {// constructor matching super}
用Logback而不是Log4j
注意:本教程适用于Flink 0.10及其以上版本
Apache Flink 使用 slf4j 作为记录日志的抽象.,建议用户在使用事采用sfl4j来记录日志。
Sfl4j 是一个日志记录的抽象接口,可以在代码中使用不同的日志实现,例如 log4j 或者 Logback。
Flink 默认依赖于Log4j. 本页介绍如何使用在Flink中使用Logback. 用户报告说, 他们可以通过 Graylog来建立一个集中式日志收集处理系统.
以下代码可以获取Logger实例:
import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class MyClass implements MapFunction {private static final Logger LOG = LoggerFactory.getLogger(MyClass.class);// ...
当通过 IDE 或者 Java程序运行Flink程序时如何使用Logback
在任何情况下,类都是通过依赖管理器(例如maven)创建的路径来执行, Flink 会把log4j的相关依赖拉入到classpath中.
因此, 需要排除掉Flink的log4j依赖. 下面的pom文件从 Flink quickstart创建的maven项目.
pom.xml文件需要进行如下更改:
<dependencies><!-- Add the two required logback dependencies --><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-core</artifactId><version>1.1.3</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.1.3</version></dependency><!-- Add the log4j -> sfl4j (-> logback) bridge into the classpathHadoop is logging to log4j! --><dependency><groupId>org.slf4j</groupId><artifactId>log4j-over-slf4j</artifactId><version>1.7.7</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.7.1</version><exclusions><exclusion><groupId>log4j</groupId><artifactId>*</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.7.1</version><exclusions><exclusion><groupId>log4j</groupId><artifactId>*</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>1.7.1</version><exclusions><exclusion><groupId>log4j</groupId><artifactId>*</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency></dependencies>
<dependencies> 部分进行了以下更改:
- 从Flink中排除了所有的
log4j依赖项: maven会忽略所有Flink对log4j的依赖传递。 - 从Flink中排除了所有的
slf4j-log4j12依赖项: 因为我们要使用sl4j来进行logback绑定, 所以我们必须将slf4j从log4j中的绑定删除。 - 增加
logback-core和logback-classic的依赖项。 - 增加
log4j-over-slf4j依赖项。log4j-over-slf4j是一个可以通过Log4j API来使用Slf4j接口的工具。 Flink依赖于Hadoop,然而Hadoop直接使用Log4j进行日志记录。 因此, 我们需要将所有日志记录器调用从Log4j重定向到Slf4j,后者又记录到Logback。
请注意,您需要对新添加到pom中的Flink依赖都进行手动排除。
您还需要检查其他非Flink依赖项是不是引入了log4j绑定,可以通过mvn dependency:tree来对项目依赖进行分析.
在集群上的Flink程序如何使用Logback
This tutorial is applicable when running Flink on YARN or as a standalone cluster.
本教程适用于在YARN上运行Flink或其他独立集群。
为了在Flink中使用Logback而不是Log4j, 您首先需要从 lib/ 目录移除 log4j-1.2.xx.jar 和 sfl4j-log4j12-xxx.jar。
然后, 您需要把以下Jar文件放到 lib/ 目录下:
logback-classic.jarlogback-core.jarlog4j-over-slf4j.jar: 此桥接器需要存在于classpath中,以便Hadoop将日志记录器调用从Log4j重定向到Slf4j。
请注意,此情况你需要在向YARN提交Flink作业时显式指定 lib/ 目录。 例如:
./bin/flink run -yt $FLINK_HOME/lib <... remaining arguments ...>
