译者:flink.sojb.cn

Amazon Web Services提供可以运行Flink的云计算服务。

EMR:弹性MapReduce

Amazon Elastic MapReduce(Amazon EMR)是一种Web服务,可以轻松快速地设置Hadoop集群。这是在AWS上运行Flink 的推荐方法,因为它负责设置所有内容。

标准EMR安装

Flink是Amazon EMR上受支持的应用程序。亚马逊的文档 描述了配置Flink,创建和监控集群以及处理作业。

自定义EMR安装

Amazon EMR服务会定期更新到新版本,但可以在库存EMR群集中手动安装不可用的Flink版本。

创建EMR集群

EMR文档包含显示如何启动EMR群集的示例。您可以按照该指南安装任何EMR版本。您不需要安装EMR版本的All Applications部分,但可以坚持使用Core Hadoop

注意 访问S3存储区需要 在创建EMR集群时配置IAM角色

在EMR群集上安装Flink

创建群集后,您可以连接到主节点并安装Flink:

  1. 转到下载页面下载与您的EMR集群的Hadoop版本匹配的Flink二进制版本,例如Hadoop 2.7 for EMR版本4.3.0,4.4.0或4.5.0。
  2. 解压缩Flink发行版,您可以在设置Hadoop配置目录通过YARN部署Flink作业
  1. HADOOP_CONF_DIR=/etc/hadoop/conf ./bin/flink run -m yarn-cluster -yn 1 examples/streaming/WordCount.jar

S3:简单存储服务

Amazon Simple Storage Service(Amazon S3)为各种用例提供云对象存储。您可以将S3与Flink一起用于读取写入数据以及状态后台甚至作为YARN对象存储。

您可以通过以下格式指定路径来使用常规文件等S3对象:

  1. s3://<your-bucket>/<endpoint>

端点可以是单个文件或目录,例如:

  1. // Read from S3 bucket
  2. env.readTextFile("s3://<bucket>/<endpoint>");
  3. // Write to S3 bucket
  4. stream.writeAsText("s3://<bucket>/<endpoint>");
  5. // Use S3 as FsStatebackend
  6. env.setStateBackend(new FsStateBackend("s3://<your-bucket>/<endpoint>"));

请注意,这些示例并非详尽无遗,您也可以在其他地方使用S3,包括高可用性设置RocksDBStateBackend ; Flink期望文件系统URI到处都是。

对于大多数用例,您可以使用我们的shaded flink-s3-fs-hadoopflink-s3-fs-prestoS3文件系统打包器之一,这些打包器非常容易设置。但是,对于某些情况,例如,使用S3作为YARN的资源存储目录,可能需要设置特定的Hadoop S3 FileSystem实现。两种方式如下所述。

Shaded Hadoop / Presto S3文件系统(推荐)

注意:如果在EMR上运行Flink,则无需手动配置。

要使用flink-s3-fs-hadoopflink-s3-fs-presto,请在启动Flink之前将相应的JAR文件从opt目录复制 到libFlink分发的目录,例如

  1. cp ./opt/flink-s3-fs-presto-1.7-SNAPSHOT.jar ./lib/

双方flink-s3-fs-hadoopflink-s3-fs-presto注册与URI的默认文件系统的打包s3://方案,flink-s3-fs-hadoop也注册了s3a://

配置访问凭据

设置S3 FileSystem打包器后,您需要确保允许Flink访问您的S3存储桶。

身份和访问管理(IAM)(推荐)

在AWS上设置凭据的推荐方法是通过身份和访问管理(IAM)。您可以使用IAM函数为Flink实例安全地提供他们访问S3存储桶所需的凭据。有关如何执行此 算子操作的详细信息超出了本文档的范围。请参阅AWS用户指南。您正在寻找的是IAM角色

如果您正确设置此选项,则可以在AWS中管理对S3的访问,并且不需要将任何访问Keys分发给Flink。

访问Keys(气馁)

可以通过您的访问和Keys对授予对S3的访问权限。请注意,自从引入IAM角色以来,不鼓励这样做。

您需要同时配置s3.access-keys3.secret-key 在Flink的 flink-conf.yaml

  1. s3.access-key: your-access-key
  2. s3.secret-key: your-secret-key

Hadoop提供的S3文件系统 - 手动设置

注意:如果在EMR上运行Flink,则无需手动配置。

这个设置有点复杂,我们建议使用我们的阴影Hadoop / Presto文件系统(见上文),除非另有要求,例如通过fs.defaultFSHadoop中的配置属性将S3用作YARN的资源存储目录core-site.xml

设置S3 FileSystem

与S3的交互通过Hadoop的S3 FileSystem客户端之一进行

  1. S3AFileSystem推荐用于Hadoop 2.7及更高版本):用于在内部使用Amazon SDK读取和写入常规文件的文件系统。没有最大文件大小并且与IAM角色一起使用。
  2. NativeS3FileSystem(对于Hadoop 2.6及更早版本):用于读写常规文件的文件系统。最大对象大小为5GB,不适用于IAM角色。

S3AFileSystem (推荐的)

这是推荐使用的S3 FileSystem实现。它在内部使用Amazon的SDK并与IAM角色一起使用(请参阅配置访问凭据)。

您需要将Flink指向有效的Hadoop配置,该配置包含以下属性core-site.xml

  1. <configuration>
  2. <property>
  3. <name>fs.s3.impl</name>
  4. <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
  5. </property>
  6. <!-- Comma separated list of local directories used to buffer
  7. large results prior to transmitting them to S3\. -->
  8. <property>
  9. <name>fs.s3a.buffer.dir</name>
  10. <value>/tmp</value>
  11. </property>
  12. </configuration>

这将注册S3AFileSystem为具有该s3a://方案的URI的默认文件系统。

NativeS3FileSystem

此文件系统仅限于最大5GB的文件,并且不适用于IAM角色(请参阅配置访问凭据),这意味着您必须在Hadoop配置文件中手动配置AWS凭据。

您需要将Flink指向有效的Hadoop配置,该配置包含以下属性core-site.xml

  1. <property>
  2. <name>fs.s3.impl</name>
  3. <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
  4. </property>

这将注册NativeS3FileSystem为具有该s3://方案的URI的默认文件系统。

Hadoop配置

例如,您可以通过各种方式指定Hadoop配置,将Flink指向Hadoop配置目录的路径

  • 通过设置环境变量HADOOP_CONF_DIR,或
  • 通过fs.hdfs.hadoopconfflink-conf.yaml以下位置设置配置选项:
  1. fs.hdfs.hadoopconf: /path/to/etc/hadoop

这将/path/to/etc/hadoop在Flink中注册为Hadoop的配置目录。Flink将查找指定目录中的core-site.xmlhdfs-site.xml文件。

配置访问凭据

注意:如果在EMR上运行Flink,则无需手动配置。

设置S3 FileSystem后,您需要确保允许Flink访问您的S3存储桶。

身份和访问管理(IAM)(推荐)

使用时S3AFileSystem,在AWS上设置凭据的推荐方法是通过身份和访问管理(IAM)。您可以使用IAM函数为Flink实例安全地提供他们访问S3存储桶所需的凭据。有关如何执行此 算子操作的详细信息超出了本文档的范围。请参阅AWS用户指南。您正在寻找的是IAM角色

如果您正确设置此选项,则可以在AWS中管理对S3的访问,并且不需要将任何访问Keys分发给Flink。

请注意,这只适用于S3AFileSystem而不是NativeS3FileSystem

访问KeysS3AFileSystem(不鼓励)

可以通过您的访问和Keys对授予对S3的访问权限。请注意,自从引入IAM角色以来,不鼓励这样做。

对于S3AFileSystem您需要配置fs.s3a.access.keyfs.s3a.secret.key 在Hadoop的 core-site.xml

  1. <property>
  2. <name>fs.s3a.access.key</name>
  3. <value></value>
  4. </property>
  5. <property>
  6. <name>fs.s3a.secret.key</name>
  7. <value></value>
  8. </property>

访问KeysNativeS3FileSystem(不鼓励)

可以通过您的访问和Keys对授予对S3的访问权限。但是这是不鼓励的,你应该使用S3AFileSystem 所需的IAM角色

对于NativeS3FileSystem您需要配置fs.s3.awsAccessKeyIdfs.s3.awsSecretAccessKey 在Hadoop的 core-site.xml

  1. <property>
  2. <name>fs.s3.awsAccessKeyId</name>
  3. <value></value>
  4. </property>
  5. <property>
  6. <name>fs.s3.awsSecretAccessKey</name>
  7. <value></value>
  8. </property>

提供S3 FileSystem依赖关系

注意:如果在EMR上运行Flink,则无需手动配置。

Hadoop的S3 FileSystem客户端打包在hadoop-aws工件中(Hadoop 2.6及更高版本)。需要将此JAR及其所有依赖项添加到Flink的类路径中,即Job和TaskManagers的类路径。根据您使用的FileSystem实现以及您使用的Flink和Hadoop版本,您需要提供不同的依赖项(请参阅下文)。

有多种方法可以将JAR添加到Flink的类路径中,最简单的方法就是将JAR放在Flink的lib文件夹中。您需要复制hadoop-aws具有所有依赖项的JAR。您还可以HADOOP_CLASSPATH在所有计算机上将包含这些JAR的目录导出为环境变量的一部分。

Flink for Hadoop 2.7

根据您使用的文件系统,请添加以下依赖项。您可以在以下位置找到这些作为Hadoop二进制文件的一部分hadoop-2.7/share/hadoop/tools/lib

  • S3AFileSystem

    • hadoop-aws-2.7.3.jar
    • aws-java-sdk-s3-1.11.183.jar 及其依赖:

      • aws-java-sdk-core-1.11.183.jar
      • aws-java-sdk-kms-1.11.183.jar
      • jackson-annotations-2.6.7.jar
      • jackson-core-2.6.7.jar
      • jackson-databind-2.6.7.jar
      • joda-time-2.8.1.jar
      • httpcore-4.4.4.jar
      • httpclient-4.5.3.jar
  • NativeS3FileSystem

    • hadoop-aws-2.7.3.jar
    • guava-11.0.2.jar

请注意,hadoop-common它可作为Flink的一部分提供,但番石榴被Flink遮蔽。

Flink for Hadoop 2.6

根据您使用的文件系统,请添加以下依赖项。您可以在以下位置找到这些作为Hadoop二进制文件的一部分hadoop-2.6/share/hadoop/tools/lib

  • S3AFileSystem

    • hadoop-aws-2.6.4.jar
    • aws-java-sdk-1.7.4.jar 及其依赖:

      • jackson-annotations-2.1.1.jar
      • jackson-core-2.1.1.jar
      • jackson-databind-2.1.1.jar
      • joda-time-2.2.jar
      • httpcore-4.2.5.jar
      • httpclient-4.2.5.jar
  • NativeS3FileSystem

    • hadoop-aws-2.6.4.jar
    • guava-11.0.2.jar

请注意,hadoop-common它可作为Flink的一部分提供,但番石榴被Flink遮蔽。

Flink for Hadoop 2.4及更早版本

这些Hadoop版本只支持NativeS3FileSystem。这是预装了Flink for Hadoop 2的一部分hadoop-common。您不需要向类路径添加任何内容。

常见问题

以下部分列出了在AWS上使用Flink时的常见问题。

缺少S3文件系统配置

如果您的作业提交失败,并显示异常消息,指出No file system found with scheme s3这意味着没有为S3配置FileSystem。有关如何正确配置的详细信息,请查看我们的着色Hadoop / Presto通用Hadoop文件系统的配置部分。

  1. org.apache.flink.client.program.ProgramInvocationException: The program execution failed:
  2. Failed to submit job cd927567a81b62d7da4c18eaa91c3c39 (WordCount Example) [...]
  3. Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error:
  4. No file system found with scheme s3, referenced in file URI 's3://<bucket>/<endpoint>'. [...]
  5. Caused by: java.io.IOException: No file system found with scheme s3,
  6. referenced in file URI 's3://<bucket>/<endpoint>'.
  7. at o.a.f.core.fs.FileSystem.get(FileSystem.java:296)
  8. at o.a.f.core.fs.Path.getFileSystem(Path.java:311)
  9. at o.a.f.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
  10. at o.a.f.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
  11. at o.a.f.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)

未指定AWS访问KeysID和密钥访问Keys

如果您发现您的作业失败并显示异常AWS Access Key ID and Secret Access Key must be specified as the username or password,并且未正确设置您的访问凭据。有关如何配置此信息的详细信息,请参阅我们的着色Hadoop / Presto通用Hadoop文件系统的访问凭据部分。

  1. org.apache.flink.client.program.ProgramInvocationException: The program execution failed:
  2. Failed to submit job cd927567a81b62d7da4c18eaa91c3c39 (WordCount Example) [...]
  3. Caused by: java.io.IOException: The given file URI (s3://<bucket>/<endpoint>) points to the
  4. HDFS NameNode at <bucket>, but the File System could not be initialized with that address:
  5. AWS Access Key ID and Secret Access Key must be specified as the username or password
  6. (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId
  7. or fs.s3n.awsSecretAccessKey properties (respectively) [...]
  8. Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must
  9. be specified as the username or password (respectively) of a s3 URL, or by setting
  10. the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively) [...]
  11. at o.a.h.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
  12. at o.a.h.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:80)
  13. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  14. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  15. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  16. at java.lang.reflect.Method.invoke(Method.java:606)
  17. at o.a.h.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
  18. at o.a.h.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
  19. at o.a.h.fs.s3native.$Proxy6.initialize(Unknown Source)
  20. at o.a.h.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:330)
  21. at o.a.f.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:321)

ClassNotFoundException:找不到NativeS3FileSystem / S3AFileSystem

如果您看到此异常,则S3 FileSystem不是Flink的类路径的一部分。有关如何正确配置的详细信息,请参阅S3 FileSystem相关性部分

  1. Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
  2. at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2186)
  3. at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
  4. at org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
  5. at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
  6. at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
  7. at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
  8. at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
  9. at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
  10. ... 25 more
  11. Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
  12. at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2154)
  13. at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2178)
  14. ... 32 more
  15. Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
  16. at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2060)
  17. at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2152)
  18. ... 33 more

IOException异常: 400: Bad Request

如果您已正确配置所有内容但获得Bad Request异常 S3存储桶位于区域中eu-central-1,则可能正在运行S3客户端,该客户端不支持Amazon的签名版本4

  1. [...]
  2. Caused by: java.io.IOException: s3://<bucket-in-eu-central-1>/<endpoint> : 400 : Bad Request [...]
  3. Caused by: org.jets3t.service.impl.rest.HttpException [...]

要么

  1. com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: [...], AWS Error Code: null, AWS Error Message: Bad Request, S3 Extended Request ID: [...]

这不应该适用于我们的阴影Hadoop / Presto S3文件系统,但可以适用于Hadoop提供的S3文件系统。特别是,所有运行高达2.7.2的Hadoop版本NativeS3FileSystem(取决于JetS3t 0.9.0版本> = 0.9.4)都会受到影响,但用户也会报告这种情况S3AFileSystem

除了更改存储区域之外,您还可以通过请求用于请求身份验证的签名版本4来解决此问题 ,例如将其添加到Flink的JVM选项中flink-conf.yaml(请参阅 配置):

  1. env.java.opts: -Dcom.amazonaws.services.s3.enableV4

org.apache.hadoop.fs.LocalDirAllocator中的NullPointerException

此异常通常是由跳过本地缓冲区目录配置引起fs.s3a.buffer.dirS3AFileSystem。请参阅S3AFileSystem配置部分以了解如何S3AFileSystem正确配置。

  1. [...]
  2. Caused by: java.lang.NullPointerException at
  3. o.a.h.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268) at
  4. o.a.h.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344) at
  5. o.a.h.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416) at
  6. o.a.h.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198) at
  7. o.a.h.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87) at
  8. o.a.h.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410) at
  9. o.a.h.fs.FileSystem.create(FileSystem.java:907) at
  10. o.a.h.fs.FileSystem.create(FileSystem.java:888) at
  11. o.a.h.fs.FileSystem.create(FileSystem.java:785) at
  12. o.a.f.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404) at
  13. o.a.f.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48) at
  14. ... 25 more