Amazon 服务 (AWS)

Amazon Web Services 提供可以运行Flink的云计算服务。

EMR: Elastic MapReduce

Amazon Elastic MapReduce (Amazon EMR) 是一种Web服务,可以轻松快速地设置Hadoop集群。因为它负责设置所有内容,所以在AWS上运行Flink 非常推荐,使用这种方式 。

标准 EMR 安装

Flink是Amazon EMR上受支持的应用程序。 亚马逊的文档 描述了如何配置Flink、创建和监控集群以及处理作业。

自定义 EMR 安装

Amazon EMR 会定期更新到最新版本, 也可以在EMR集群中安装不同版本的Flink

创建 EMR 集群

EMR文档包含了 创建 EMR集群。通过该文档可以安装任何版本的EMR,不需要安装EMR中的All Applications部分,但是Core Hadoop是必须的。

注意 访问S3存储需要配置IAM角色,文档请参阅 配置 IAM角色

在EMR集群中安装Flink

在创建集群之后,连接到主节点并安装Flink,文档请参阅 连接到主节点

  1. 转到 下载页下载与EMR集群中Hadoop版本匹配的二进制版本 , 例如, Hadoop 2.7 for EMR releases 4.3.0, 4.4.0, or 4.5.0.
  2. 解压Flink,设置Hadoop配置目录后,可以运行Flink作业,文档请参阅 Flink jobs via YARN :
  1. HADOOP_CONF_DIR=/etc/hadoop/conf ./bin/flink run -m yarn-cluster -yn 1 examples/streaming/WordCount.jar

S3: 简单存储

Amazon Simple Storage Service (Amazon S3)为各种实例提供云对象存储,S3可以作为Flink的数据输入输出 ,或者是为 streaming state backends 、以及YARN提供对象存储。

通过以下格式指定路径来使用常规文件等S3对象:

  1. s3://<your-bucket>/<endpoint>

endpoint 是单个文件或目录,例如:

  1. // 从S3中读取
  2. env.readTextFile("s3://<bucket>/<endpoint>");
  3. // 写入到S3
  4. stream.writeAsText("s3://<bucket>/<endpoint>");
  5. // 使用S3作为 FsStatebackend
  6. env.setStateBackend(new FsStateBackend("s3://<your-bucket>/<endpoint>"));

上文的示例并没有列出全部情况,也可以在其他地方使用S3存储,包括 高可用安装RocksDBStateBackend等 任何Flink期望的文件地址。

多数情况下我们使用 flink-s3-fs-hadoopflink-s3-fs-presto 作为S3在Hadoop 上的实现 ,例如,使用S3作为YARN的资源存储目录,可能需要设置特定的Hadoop S3 FileSystem实现。两种方式如下所述。

Shaded Hadoop/Presto S3 文件系统 (推荐)

Note: 如果在EMR上运行Flink,则无需手动配置,文档可以参阅 Flink on EMR.

要使用 flink-s3-fs-hadoopflink-s3-fs-presto,co在启动Flink之前将相应的JAR文件从 opt 目录拷贝到Flink的 lib 目录下,例如

  1. cp ./opt/flink-s3-fs-presto-1.7.1.jar ./lib/

flink-s3-fs-hadoopflink-s3-fs-presto 注册了不同的文件URI s3:// 实现, flink-s3-fs-hadoop 还注册了 s3a://flink-s3-fs-presto 注册了 s3p://,可以同时使用这些不同的URI。

配置访问凭据

设置S3 文件系统实现之后,需要保证Flink程序可以访问S3存储。

身份和访问管理 (IAM) (推荐)

比较推荐使用身份和访问管理 (IAM)来设置Access key,可以通过IAM提供的功能来让Flink程序安全的访问S3存储。 详细可以参阅文档身份和访问管理 (IAM)。具体用户角色权限控制可以参考 IAM Roles.

如果正确设置此选项,则可以在AWS中管理对S3的访问,并且Flink访问S3不需要任何keys

Access Keys (不推荐)

还可以通过 access key来对S3进行访问。相关内容可以参阅 IAM的角色.

这种情况下,需要在Flink的 flink-conf.yaml文件中同时配置s3.access-keys3.secret-key

  1. s3.access-key: your-access-key
  2. s3.secret-key: your-secret-key

Hadoop提供的S3文件系统-手动设置

Note: 在EMR上运行Flink不需要配置,请参阅 Flink on EMR.

因为这个设置比较复杂,所以除非是有其他需求,否则,还是建议才用上面的方式来实现。例如,修改Hadoop的 core-site.xml文件中的 fs.defaultFS配置来把S3作为存储目录.

设置 S3 文件系统

S3相关内容可以参阅 Hadoop’s S3 FileSystem clients:

  1. S3AFileSystem (推荐 Hadoop 2.7 及其以上版本使用): 用于在内部使用Amazon SDK读取和写入常规文件的文件系统。没有最大文件大小并且与IAM角色一起使用。
  2. NativeS3FileSystem (Hadoop 2.6 及一起版本): 用于读写常规文件的文件系统。最大对象大小为5GB,不适用于IAM角色。
S3AFileSystem (推荐)

推荐使用的S3 FileSystem实现。它在内部使用Amazon的SDK并与IAM角色一起使用 (参阅 配置访问凭据).

需要修改Flink中Hadoop的配置,配置文件 core-site.xml:

  1. <configuration>
  2. <property>
  3. <name>fs.s3.impl</name>
  4. <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
  5. </property>
  6. <!-- Comma separated list of local directories used to buffer
  7. large results prior to transmitting them to S3\. -->
  8. <property>
  9. <name>fs.s3a.buffer.dir</name>
  10. <value>/tmp</value>
  11. </property>
  12. </configuration>

这里注册 S3AFileSystem 作为 s3a:// URI开头的文件实现.

NativeS3FileSystem

此文件系统仅限于最大5GB的文件,并且不适用于IAM角色(参阅 配置访问凭据),所以需要在配置文件中配置AWS的access key。

需要修改Flink中Hadoop的配置,配置文件core-site.xml:

  1. <property>
  2. <name>fs.s3.impl</name>
  3. <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
  4. </property>

这里注册 NativeS3FileSystem 作为 s3:// URI开头文件的实现.

Hadoop 配置

可以采用如下两种方式指定 Hadoop 配置 把Flink指向Hadoop的配置目录

  • 设置环境变量HADOOP_CONF_DIR,
  • 设置 flink-conf.yaml文件中的 fs.hdfs.hadoopconf:
  1. fs.hdfs.hadoopconf: /path/to/etc/hadoop

/path/to/etc/hadoop 被注册为Hadoop的配置目录. Flink 会在目录下查找 core-site.xmlhdfs-site.xml 文件。

配置访问凭据

Note: 如果在EMR上直接运行Flink,则无需额外配置,请参阅 Flink on EMR.

Flink中使用 S3 存储时,需要保证Flink可以访问到S3存储。

身份和访问管理 (IAM) (推荐)

比较推荐使用身份和访问管理 (IAM)来设置Access key,可以通过IAM提供的功能来让Flink程序安全的访问S3存储。 详细可以参阅文档身份和访问管理 (IAM)。具体用户角色权限控制可以参考 IAM 角色介绍.

如果正确设置此选项,则可以在AWS中管理对S3的访问,并且Flink访问S3不需要任何keys

Access Keys (不推荐)

还可以通过 access key来对S3进行访问。相关内容可以参阅 IAM的角色.

请注意,这只适用于 S3AFileSystem 而不是 NativeS3FileSystem.

通过Access Keys 访问 S3AFileSystem (不推荐)

可以用过 Access Keys来授权对S3存储的访问。 但是这种操作并不推荐,请参阅 IAM 角色介绍.

对于 S3AFileSystem 需要在Hadoop的core-site.xml文件中同时配置 fs.s3a.access.keyfs.s3a.secret.key :

  1. <property>
  2. <name>fs.s3a.access.key</name>
  3. <value></value>
  4. </property>
  5. <property>
  6. <name>fs.s3a.secret.key</name>
  7. <value></value>
  8. </property>
通过Access Keys访问NativeS3FileSystem (不推荐)

可以用过 Access Keys来授权对S3存储的访问。 此文件系统已经过时,最好使用 S3AFileSystem来代替,详情参阅 IAM 角色要求.

对于NativeS3FileSystem 需要在Hadoop的 core-site.xml文件中同事配置 fs.s3.awsAccessKeyIdfs.s3.awsSecretAccessKey :

  1. <property>
  2. <name>fs.s3.awsAccessKeyId</name>
  3. <value></value>
  4. </property>
  5. <property>
  6. <name>fs.s3.awsSecretAccessKey</name>
  7. <value></value>
  8. </property>

提供 S3 FileSystem 依赖

Note: 在EMR上运行Flink则无需配置,请参阅 Flink on EMR.

Hadoop的S3 FileSystem客户端打包在 hadoop-aws jar中(Hadoop 2.6及其以后)。需要将JAR及其所有依赖项添加到Flink的类路径中,即Job和TaskManagers的类路径。根据使用的FileSystem实现以及使用的Flink和Hadoop版本,需要添加不同的依赖项(请参阅下文)。

有多种方法可以将JAR添加到Flink的类路径中,最简单的方法就是将JAR放在Flink的 lib 目录下。 拷贝 hadoop-aws JAR 和他的所有依赖复制到lib下,还可以把一个目录指定为 HADOOP_CLASSPATH 环境变量。

Flink for Hadoop 2.7

根据您使用的操作系统,请添加以下依赖项。可以在hadoop-2.7/share/hadoop/tools/lib找到一部分:

  • S3AFileSystem:
    • hadoop-aws-2.7.3.jar
    • aws-java-sdk-s3-1.11.183.jar 及其依赖:
      • aws-java-sdk-core-1.11.183.jar
      • aws-java-sdk-kms-1.11.183.jar
      • jackson-annotations-2.6.7.jar
      • jackson-core-2.6.7.jar
      • jackson-databind-2.6.7.jar
      • joda-time-2.8.1.jar
      • httpcore-4.4.4.jar
      • httpclient-4.5.3.jar
  • NativeS3FileSystem:
    • hadoop-aws-2.7.3.jar
    • guava-11.0.2.jar

注意 hadoop-common 是Flink的一部分, 但 Guava不是。

Flink for Hadoop 2.6

根据您使用的操作系统,请添加以下依赖项。可以在hadoop-2.6/share/hadoop/tools/lib找到一部分:

  • S3AFileSystem:
    • hadoop-aws-2.6.4.jar
    • aws-java-sdk-1.7.4.jar and its dependencies:
      • jackson-annotations-2.1.1.jar
      • jackson-core-2.1.1.jar
      • jackson-databind-2.1.1.jar
      • joda-time-2.2.jar
      • httpcore-4.2.5.jar
      • httpclient-4.2.5.jar
  • NativeS3FileSystem:
    • hadoop-aws-2.6.4.jar
    • guava-11.0.2.jar

注意 hadoop-common 是Flink的一部分, 但 Guava不是。

Flink for Hadoop 2.4及其以下版本

2.4及其以下版本只支持 NativeS3FileSystem相应依赖已经包含在hadoop-common中了,所以不需要额外添加依赖。

常见问题

下面列出了在AWS上使用Flink时的部分常见问题。

Missing S3 FileSystem Configuration

如果作业提交失败,并显示 No file system found with scheme s3 这说明S3文件系统的配置不正确,需要检查文件配置,参照 Shaded Hadoop/Presto S3 文件系统 或者 Hadoop基本配置 保证配置正确性

  1. org.apache.flink.client.program.ProgramInvocationException: The program execution failed:
  2. Failed to submit job cd927567a81b62d7da4c18eaa91c3c39 (WordCount Example) [...]
  3. Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error:
  4. No file system found with scheme s3, referenced in file URI 's3://<bucket>/<endpoint>'. [...]
  5. Caused by: java.io.IOException: No file system found with scheme s3,
  6. referenced in file URI 's3://<bucket>/<endpoint>'.
  7. at o.a.f.core.fs.FileSystem.get(FileSystem.java:296)
  8. at o.a.f.core.fs.Path.getFileSystem(Path.java:311)
  9. at o.a.f.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
  10. at o.a.f.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
  11. at o.a.f.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)

AWS Access Key ID and Secret Access Key Not Specified

如果作业失败并显示异常 AWS Access Key ID and Secret Access Key must be specified as the username or password, 未正确设置您的访问凭据。有关如何配置请参阅 shaded Hadoop/Presto or Hadoop基本配置

  1. org.apache.flink.client.program.ProgramInvocationException: The program execution failed:
  2. Failed to submit job cd927567a81b62d7da4c18eaa91c3c39 (WordCount Example) [...]
  3. Caused by: java.io.IOException: The given file URI (s3://<bucket>/<endpoint>) points to the
  4. HDFS NameNode at <bucket>, but the File System could not be initialized with that address:
  5. AWS Access Key ID and Secret Access Key must be specified as the username or password
  6. (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId
  7. or fs.s3n.awsSecretAccessKey properties (respectively) [...]
  8. Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must
  9. be specified as the username or password (respectively) of a s3 URL, or by setting
  10. the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively) [...]
  11. at o.a.h.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
  12. at o.a.h.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:80)
  13. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  14. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  15. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  16. at java.lang.reflect.Method.invoke(Method.java:606)
  17. at o.a.h.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
  18. at o.a.h.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
  19. at o.a.h.fs.s3native.$Proxy6.initialize(Unknown Source)
  20. at o.a.h.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:330)
  21. at o.a.f.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:321)

ClassNotFoundException: NativeS3FileSystem/S3AFileSystem Not Found

看到此异常,表示S3 FileSystem不是Flink的类路径的一部分请参阅 S3 文件存储依赖

  1. Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
  2. at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2186)
  3. at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
  4. at org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
  5. at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
  6. at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
  7. at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
  8. at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
  9. at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
  10. ... 25 more
  11. Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
  12. at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2154)
  13. at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2178)
  14. ... 32 more
  15. Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
  16. at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2060)
  17. at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2152)
  18. ... 33 more

IOException: 400: Bad Request

如果您已正确配置所有内容,但是请求却显示 Bad Request 异常 and S3 bucket在 eu-central-1可用区, 可能是因为S3客户端不支持,请参阅Amazon’s signature version 4.

  1. [...]
  2. Caused by: java.io.IOException: s3://<bucket-in-eu-central-1>/<endpoint> : 400 : Bad Request [...]
  3. Caused by: org.jets3t.service.impl.rest.HttpException [...]

  1. com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: [...], AWS Error Code: null, AWS Error Message: Bad Request, S3 Extended Request ID: [...]

Hadoop/Presto S3 存储不会有问题,但是 Hadoop-provided S3 file systems会有。Hadoop versions 大于 2.7.2 运行于 NativeS3FileSystem (依赖 JetS3t 0.9.0 版本 >= 0.9.4) 都会受影响,部分 S3AFileSystem也可能出现该异常。

除了更改可用区之外,可以参阅亚马逊的 requesting signature version 4 for request authentication进行修改, 例如. 在 flink-conf.yaml 中添加JVM参数(参阅 配置):

  1. env.java.opts: -Dcom.amazonaws.services.s3.enableV4

NullPointerException at org.apache.hadoop.fs.LocalDirAllocator

此异常通常是由跳过本地缓冲区目录配置引起 S3AFileSystemfs.s3a.buffer.dir配置。参阅 S3A文件系统配置 来正确配置.

  1. [...]
  2. Caused by: java.lang.NullPointerException at
  3. o.a.h.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268) at
  4. o.a.h.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344) at
  5. o.a.h.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416) at
  6. o.a.h.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198) at
  7. o.a.h.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87) at
  8. o.a.h.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410) at
  9. o.a.h.fs.FileSystem.create(FileSystem.java:907) at
  10. o.a.h.fs.FileSystem.create(FileSystem.java:888) at
  11. o.a.h.fs.FileSystem.create(FileSystem.java:785) at
  12. o.a.f.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404) at
  13. o.a.f.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48) at
  14. ... 25 more