Amazon 服务 (AWS)
Amazon Web Services 提供可以运行Flink的云计算服务。
EMR: Elastic MapReduce
Amazon Elastic MapReduce (Amazon EMR) 是一种Web服务,可以轻松快速地设置Hadoop集群。因为它负责设置所有内容,所以在AWS上运行Flink 非常推荐,使用这种方式 。
标准 EMR 安装
Flink是Amazon EMR上受支持的应用程序。 亚马逊的文档 描述了如何配置Flink、创建和监控集群以及处理作业。
自定义 EMR 安装
Amazon EMR 会定期更新到最新版本, 也可以在EMR集群中安装不同版本的Flink
创建 EMR 集群
EMR文档包含了 创建 EMR集群。通过该文档可以安装任何版本的EMR,不需要安装EMR中的All Applications部分,但是Core Hadoop是必须的。
注意 访问S3存储需要配置IAM角色,文档请参阅 配置 IAM角色 。
在EMR集群中安装Flink
在创建集群之后,连接到主节点并安装Flink,文档请参阅 连接到主节点 。
- 转到 下载页 并 下载与EMR集群中Hadoop版本匹配的二进制版本 , 例如, Hadoop 2.7 for EMR releases 4.3.0, 4.4.0, or 4.5.0.
- 解压Flink,设置Hadoop配置目录后,可以运行Flink作业,文档请参阅 Flink jobs via YARN :
HADOOP_CONF_DIR=/etc/hadoop/conf ./bin/flink run -m yarn-cluster -yn 1 examples/streaming/WordCount.jar
S3: 简单存储
Amazon Simple Storage Service (Amazon S3)为各种实例提供云对象存储,S3可以作为Flink的数据输入 或 输出 ,或者是为 streaming state backends 、以及YARN提供对象存储。
通过以下格式指定路径来使用常规文件等S3对象:
s3://<your-bucket>/<endpoint>
endpoint 是单个文件或目录,例如:
// 从S3中读取
env.readTextFile("s3://<bucket>/<endpoint>");
// 写入到S3
stream.writeAsText("s3://<bucket>/<endpoint>");
// 使用S3作为 FsStatebackend
env.setStateBackend(new FsStateBackend("s3://<your-bucket>/<endpoint>"));
上文的示例并没有列出全部情况,也可以在其他地方使用S3存储,包括 高可用安装 或 RocksDBStateBackend等 任何Flink期望的文件地址。
多数情况下我们使用 flink-s3-fs-hadoop
和 flink-s3-fs-presto
作为S3在Hadoop 上的实现 ,例如,使用S3作为YARN的资源存储目录,可能需要设置特定的Hadoop S3 FileSystem实现。两种方式如下所述。
Shaded Hadoop/Presto S3 文件系统 (推荐)
Note: 如果在EMR上运行Flink,则无需手动配置,文档可以参阅 Flink on EMR.
要使用 flink-s3-fs-hadoop
或 flink-s3-fs-presto
,co在启动Flink之前将相应的JAR文件从 opt
目录拷贝到Flink的 lib
目录下,例如
cp ./opt/flink-s3-fs-presto-1.7.1.jar ./lib/
flink-s3-fs-hadoop
和 flink-s3-fs-presto
注册了不同的文件URI s3://
实现, flink-s3-fs-hadoop
还注册了 s3a://
, flink-s3-fs-presto
注册了 s3p://
,可以同时使用这些不同的URI。
配置访问凭据
设置S3 文件系统实现之后,需要保证Flink程序可以访问S3存储。
身份和访问管理 (IAM) (推荐)
比较推荐使用身份和访问管理 (IAM)来设置Access key,可以通过IAM提供的功能来让Flink程序安全的访问S3存储。 详细可以参阅文档身份和访问管理 (IAM)。具体用户角色权限控制可以参考 IAM Roles.
如果正确设置此选项,则可以在AWS中管理对S3的访问,并且Flink访问S3不需要任何keys
Access Keys (不推荐)
还可以通过 access key来对S3进行访问。相关内容可以参阅 IAM的角色.
这种情况下,需要在Flink的 flink-conf.yaml
文件中同时配置s3.access-key
和 s3.secret-key
s3.access-key: your-access-key
s3.secret-key: your-secret-key
Hadoop提供的S3文件系统-手动设置
Note: 在EMR上运行Flink不需要配置,请参阅 Flink on EMR.
因为这个设置比较复杂,所以除非是有其他需求,否则,还是建议才用上面的方式来实现。例如,修改Hadoop的 core-site.xml
文件中的 fs.defaultFS
配置来把S3作为存储目录.
设置 S3 文件系统
S3相关内容可以参阅 Hadoop’s S3 FileSystem clients:
S3AFileSystem
(推荐 Hadoop 2.7 及其以上版本使用): 用于在内部使用Amazon SDK读取和写入常规文件的文件系统。没有最大文件大小并且与IAM角色一起使用。NativeS3FileSystem
(Hadoop 2.6 及一起版本): 用于读写常规文件的文件系统。最大对象大小为5GB,不适用于IAM角色。
S3AFileSystem
(推荐)
推荐使用的S3 FileSystem实现。它在内部使用Amazon的SDK并与IAM角色一起使用 (参阅 配置访问凭据).
需要修改Flink中Hadoop的配置,配置文件 core-site.xml
:
<configuration>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<!-- Comma separated list of local directories used to buffer
large results prior to transmitting them to S3\. -->
<property>
<name>fs.s3a.buffer.dir</name>
<value>/tmp</value>
</property>
</configuration>
这里注册 S3AFileSystem
作为 s3a://
URI开头的文件实现.
NativeS3FileSystem
此文件系统仅限于最大5GB的文件,并且不适用于IAM角色(参阅 配置访问凭据),所以需要在配置文件中配置AWS的access key。
需要修改Flink中Hadoop的配置,配置文件core-site.xml
:
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
</property>
这里注册 NativeS3FileSystem
作为 s3://
URI开头文件的实现.
Hadoop 配置
可以采用如下两种方式指定 Hadoop 配置 把Flink指向Hadoop的配置目录
- 设置环境变量
HADOOP_CONF_DIR
, - 设置
flink-conf.yaml
文件中的fs.hdfs.hadoopconf
:
fs.hdfs.hadoopconf: /path/to/etc/hadoop
/path/to/etc/hadoop
被注册为Hadoop的配置目录. Flink 会在目录下查找 core-site.xml
和 hdfs-site.xml
文件。
配置访问凭据
Note: 如果在EMR上直接运行Flink,则无需额外配置,请参阅 Flink on EMR.
Flink中使用 S3 存储时,需要保证Flink可以访问到S3存储。
身份和访问管理 (IAM) (推荐)
比较推荐使用身份和访问管理 (IAM)来设置Access key,可以通过IAM提供的功能来让Flink程序安全的访问S3存储。 详细可以参阅文档身份和访问管理 (IAM)。具体用户角色权限控制可以参考 IAM 角色介绍.
如果正确设置此选项,则可以在AWS中管理对S3的访问,并且Flink访问S3不需要任何keys
Access Keys (不推荐)
还可以通过 access key来对S3进行访问。相关内容可以参阅 IAM的角色.
请注意,这只适用于 S3AFileSystem
而不是 NativeS3FileSystem
.
通过Access Keys 访问 S3AFileSystem
(不推荐)
可以用过 Access Keys来授权对S3存储的访问。 但是这种操作并不推荐,请参阅 IAM 角色介绍.
对于 S3AFileSystem
需要在Hadoop的core-site.xml
文件中同时配置 fs.s3a.access.key
和 fs.s3a.secret.key
:
<property>
<name>fs.s3a.access.key</name>
<value></value>
</property>
<property>
<name>fs.s3a.secret.key</name>
<value></value>
</property>
通过Access Keys访问NativeS3FileSystem
(不推荐)
可以用过 Access Keys来授权对S3存储的访问。 此文件系统已经过时,最好使用 S3AFileSystem
来代替,详情参阅 IAM 角色要求.
对于NativeS3FileSystem
需要在Hadoop的 core-site.xml
文件中同事配置 fs.s3.awsAccessKeyId
和 fs.s3.awsSecretAccessKey
:
<property>
<name>fs.s3.awsAccessKeyId</name>
<value></value>
</property>
<property>
<name>fs.s3.awsSecretAccessKey</name>
<value></value>
</property>
提供 S3 FileSystem 依赖
Note: 在EMR上运行Flink则无需配置,请参阅 Flink on EMR.
Hadoop的S3 FileSystem客户端打包在 hadoop-aws
jar中(Hadoop 2.6及其以后)。需要将JAR及其所有依赖项添加到Flink的类路径中,即Job和TaskManagers的类路径。根据使用的FileSystem实现以及使用的Flink和Hadoop版本,需要添加不同的依赖项(请参阅下文)。
有多种方法可以将JAR添加到Flink的类路径中,最简单的方法就是将JAR放在Flink的 lib
目录下。 拷贝 hadoop-aws
JAR 和他的所有依赖复制到lib下,还可以把一个目录指定为 HADOOP_CLASSPATH
环境变量。
Flink for Hadoop 2.7
根据您使用的操作系统,请添加以下依赖项。可以在hadoop-2.7/share/hadoop/tools/lib
找到一部分:
S3AFileSystem
:hadoop-aws-2.7.3.jar
aws-java-sdk-s3-1.11.183.jar
及其依赖:aws-java-sdk-core-1.11.183.jar
aws-java-sdk-kms-1.11.183.jar
jackson-annotations-2.6.7.jar
jackson-core-2.6.7.jar
jackson-databind-2.6.7.jar
joda-time-2.8.1.jar
httpcore-4.4.4.jar
httpclient-4.5.3.jar
NativeS3FileSystem
:hadoop-aws-2.7.3.jar
guava-11.0.2.jar
注意 hadoop-common
是Flink的一部分, 但 Guava不是。
Flink for Hadoop 2.6
根据您使用的操作系统,请添加以下依赖项。可以在hadoop-2.6/share/hadoop/tools/lib
找到一部分:
S3AFileSystem
:hadoop-aws-2.6.4.jar
aws-java-sdk-1.7.4.jar
and its dependencies:jackson-annotations-2.1.1.jar
jackson-core-2.1.1.jar
jackson-databind-2.1.1.jar
joda-time-2.2.jar
httpcore-4.2.5.jar
httpclient-4.2.5.jar
NativeS3FileSystem
:hadoop-aws-2.6.4.jar
guava-11.0.2.jar
注意 hadoop-common
是Flink的一部分, 但 Guava不是。
Flink for Hadoop 2.4及其以下版本
2.4及其以下版本只支持 NativeS3FileSystem
相应依赖已经包含在hadoop-common
中了,所以不需要额外添加依赖。
常见问题
下面列出了在AWS上使用Flink时的部分常见问题。
Missing S3 FileSystem Configuration
如果作业提交失败,并显示 No file system found with scheme s3
这说明S3文件系统的配置不正确,需要检查文件配置,参照 Shaded Hadoop/Presto S3 文件系统 或者 Hadoop基本配置 保证配置正确性
org.apache.flink.client.program.ProgramInvocationException: The program execution failed:
Failed to submit job cd927567a81b62d7da4c18eaa91c3c39 (WordCount Example) [...]
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error:
No file system found with scheme s3, referenced in file URI 's3://<bucket>/<endpoint>'. [...]
Caused by: java.io.IOException: No file system found with scheme s3,
referenced in file URI 's3://<bucket>/<endpoint>'.
at o.a.f.core.fs.FileSystem.get(FileSystem.java:296)
at o.a.f.core.fs.Path.getFileSystem(Path.java:311)
at o.a.f.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
at o.a.f.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
at o.a.f.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
AWS Access Key ID and Secret Access Key Not Specified
如果作业失败并显示异常 AWS Access Key ID and Secret Access Key must be specified as the username or password
, 未正确设置您的访问凭据。有关如何配置请参阅 shaded Hadoop/Presto or Hadoop基本配置 。
org.apache.flink.client.program.ProgramInvocationException: The program execution failed:
Failed to submit job cd927567a81b62d7da4c18eaa91c3c39 (WordCount Example) [...]
Caused by: java.io.IOException: The given file URI (s3://<bucket>/<endpoint>) points to the
HDFS NameNode at <bucket>, but the File System could not be initialized with that address:
AWS Access Key ID and Secret Access Key must be specified as the username or password
(respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId
or fs.s3n.awsSecretAccessKey properties (respectively) [...]
Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must
be specified as the username or password (respectively) of a s3 URL, or by setting
the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively) [...]
at o.a.h.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
at o.a.h.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:80)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at o.a.h.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at o.a.h.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at o.a.h.fs.s3native.$Proxy6.initialize(Unknown Source)
at o.a.h.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:330)
at o.a.f.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:321)
ClassNotFoundException: NativeS3FileSystem/S3AFileSystem Not Found
看到此异常,表示S3 FileSystem不是Flink的类路径的一部分请参阅 S3 文件存储依赖 。
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2186)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
at org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
... 25 more
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2154)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2178)
... 32 more
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2060)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2152)
... 33 more
IOException: 400: Bad Request
如果您已正确配置所有内容,但是请求却显示 Bad Request
异常 and S3 bucket在 eu-central-1
可用区, 可能是因为S3客户端不支持,请参阅Amazon’s signature version 4.
[...]
Caused by: java.io.IOException: s3://<bucket-in-eu-central-1>/<endpoint> : 400 : Bad Request [...]
Caused by: org.jets3t.service.impl.rest.HttpException [...]
或
com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: [...], AWS Error Code: null, AWS Error Message: Bad Request, S3 Extended Request ID: [...]
Hadoop/Presto S3 存储不会有问题,但是 Hadoop-provided S3 file systems会有。Hadoop versions 大于 2.7.2 运行于 NativeS3FileSystem
(依赖 JetS3t 0.9.0
版本 >= 0.9.4) 都会受影响,部分 S3AFileSystem
也可能出现该异常。
除了更改可用区之外,可以参阅亚马逊的 requesting signature version 4 for request authentication进行修改, 例如. 在 flink-conf.yaml
中添加JVM参数(参阅 配置):
env.java.opts: -Dcom.amazonaws.services.s3.enableV4
NullPointerException at org.apache.hadoop.fs.LocalDirAllocator
此异常通常是由跳过本地缓冲区目录配置引起 S3AFileSystem
的fs.s3a.buffer.dir
配置。参阅 S3A文件系统配置 来正确配置.
[...]
Caused by: java.lang.NullPointerException at
o.a.h.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268) at
o.a.h.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344) at
o.a.h.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416) at
o.a.h.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198) at
o.a.h.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87) at
o.a.h.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410) at
o.a.h.fs.FileSystem.create(FileSystem.java:907) at
o.a.h.fs.FileSystem.create(FileSystem.java:888) at
o.a.h.fs.FileSystem.create(FileSystem.java:785) at
o.a.f.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404) at
o.a.f.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48) at
... 25 more