这里是Flink常见问题的解决办法
传递命令行参数并在Flink程序中使用
大部分Flink程序(无论是批处理还是流式计算)都依赖于外部的参数配置. 比如指定输入输出位置(路径或地址),系统参数(并行度和运行时配置),程序相关的参数配置。
Flink提供一个叫 ParameterTool
的工具类来解决此问题。 ParameterTool
并不是解决这个问题的唯一办法,其他框架,比如 Commons CLI 和 argparse4j 也可以解决这个问题.
通过 ParameterTool
来获取配置值
ParameterTool
工具提供了很多读取配置的静态方法, 工具类内部实现类似于 Map<String, String>
, 所以和项目代码风格不会有冲突
从 .properties
文件获取配置
下面是个读 Properties 文件并将内容转换成key-value的例子
String propertiesFilePath = "/home/sam/flink/myjob.properties";
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFilePath);
File propertiesFile = new File(propertiesFilePath);
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);
InputStream propertiesFileInputStream = new FileInputStream(file);
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFileInputStream);
从命令行中获取配置
下面是命令行参数的例子,例如在命令行后加入 --input hdfs:///mydata --elements 42
参数。
public static void main(String[] args) {
ParameterTool parameter = ParameterTool.fromArgs(args);
// .. regular code ..
从系统属性中获取配置
启动JVM时,可以把系统属性传递给程序,例如: -Dinput=hdfs:///mydata
。 可以通过 ParameterTool
来获取配置,例如:
ParameterTool parameter = ParameterTool.fromSystemProperties();
在Flink程序中使用配置参数
根据上文,我们已经获取了参数,下面是如何使用它们。
使用 ParameterTool
工具类
ParameterTool
工具类有直接访问值的方法,例如:
ParameterTool parameters = // ...
parameter.getRequired("input");
parameter.get("output", "myDefaultValue");
parameter.getLong("expectedCount", -1L);
parameter.getNumberOfParameters()
// .. there are more methods available.
您可以直接在 main()
中使用, 您可以通过如下代码来设置算子的并行度:
ParameterTool parameters = ParameterTool.fromArgs(args);
int parallelism = parameters.get("mapParallelism", 2);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).setParallelism(parallelism);
因为 ParameterTool
是可序列化的,所以你可以把它作为函数的参数:
ParameterTool parameters = ParameterTool.fromArgs(args);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer(parameters));
然后在函数内通过上文方法获取值.
注册全局参数
通过 ExecutionConfig
注册为全局参数后,可以被JobManager Web界面中的配置值和代码内的所有函数来访问.
注册为全局参数:
ParameterTool parameters = ParameterTool.fromArgs(args);
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);
在代码中的函数内访问:
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
ParameterTool parameters = (ParameterTool)
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
parameters.getRequired("input");
// .. do more ..
超大TupleX类型命令
强烈推荐使用 POJO 来代替有很多字段的 TupleX
, POJO 可以代替多字段的 Tuple
类型.
Example
而不是使用:
Tuple11<String, String, ..., String> var = new ...;
把大型元组类型继承为自定义类型也会方便很多
CustomType var = new ...;
public static class CustomType extends Tuple11<String, String, ..., String> {
// constructor matching super
}
用Logback而不是Log4j
注意:本教程适用于Flink 0.10及其以上版本
Apache Flink 使用 slf4j 作为记录日志的抽象.,建议用户在使用事采用sfl4j来记录日志。
Sfl4j 是一个日志记录的抽象接口,可以在代码中使用不同的日志实现,例如 log4j 或者 Logback。
Flink 默认依赖于Log4j. 本页介绍如何使用在Flink中使用Logback. 用户报告说, 他们可以通过 Graylog来建立一个集中式日志收集处理系统.
以下代码可以获取Logger实例:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyClass implements MapFunction {
private static final Logger LOG = LoggerFactory.getLogger(MyClass.class);
// ...
当通过 IDE 或者 Java程序运行Flink程序时如何使用Logback
在任何情况下,类都是通过依赖管理器(例如maven)创建的路径来执行, Flink 会把log4j的相关依赖拉入到classpath中.
因此, 需要排除掉Flink的log4j依赖. 下面的pom文件从 Flink quickstart创建的maven项目.
pom.xml
文件需要进行如下更改:
<dependencies>
<!-- Add the two required logback dependencies -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.1.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.1.3</version>
</dependency>
<!-- Add the log4j -> sfl4j (-> logback) bridge into the classpath
Hadoop is logging to log4j! -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.7.1</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.7.1</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.7.1</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<dependencies>
部分进行了以下更改:
- 从Flink中排除了所有的
log4j
依赖项: maven会忽略所有Flink对log4j的依赖传递。 - 从Flink中排除了所有的
slf4j-log4j12
依赖项: 因为我们要使用sl4j来进行logback绑定, 所以我们必须将slf4j从log4j中的绑定删除。 - 增加
logback-core
和logback-classic
的依赖项。 - 增加
log4j-over-slf4j
依赖项。log4j-over-slf4j
是一个可以通过Log4j API来使用Slf4j接口的工具。 Flink依赖于Hadoop,然而Hadoop直接使用Log4j进行日志记录。 因此, 我们需要将所有日志记录器调用从Log4j重定向到Slf4j,后者又记录到Logback。
请注意,您需要对新添加到pom中的Flink依赖都进行手动排除。
您还需要检查其他非Flink依赖项是不是引入了log4j绑定,可以通过mvn dependency:tree
来对项目依赖进行分析.
在集群上的Flink程序如何使用Logback
This tutorial is applicable when running Flink on YARN or as a standalone cluster.
本教程适用于在YARN上运行Flink或其他独立集群。
为了在Flink中使用Logback而不是Log4j, 您首先需要从 lib/
目录移除 log4j-1.2.xx.jar
和 sfl4j-log4j12-xxx.jar
。
然后, 您需要把以下Jar文件放到 lib/
目录下:
logback-classic.jar
logback-core.jar
log4j-over-slf4j.jar
: 此桥接器需要存在于classpath中,以便Hadoop将日志记录器调用从Log4j重定向到Slf4j。
请注意,此情况你需要在向YARN提交Flink作业时显式指定 lib/
目录。 例如:
./bin/flink run -yt $FLINK_HOME/lib <... remaining arguments ...>