连接器

从文件系统读取

Flink内置了对以下文件系统的支持:

文件系统 方案 提示
Hadoop Distributed File System (HDFS) hdfs:// 所有都HDFS都支持
Amazon S3 s3:// 通过Hadoop文件系统实现支持(见下文)
MapR file system maprfs:// 用户必须手动将所需的JAR文件放在’lib/`dir中
Alluxio alluxio:// 通过Hadoop文件系统实现支持(见下文)

使用Hadoop文件系统实现

ApacheFlink允许用户使用任何实现’org.apache.hadoop.fs.file system’接口的文件系统。有hadoopfilesystem实现用于

为了在Flink中使用Hadoop文件系统,请确保

*’flink-conf.yaml’已将’fs.hdfs.hadoop conf’属性设置为Hadoop配置目录。对于自动测试或从IDE运行,可以通过定义“flink-conf-dir”环境变量来设置包含“flink-conf.yaml”的目录。

*Hadoop配置(在该目录中)在“core site.xml”文件中具有所需文件系统的条目。S3和Alluxio的示例如下所示。

*使用文件系统所需的类可以在Flink安装的“lib/”文件夹中找到(在所有运行Flink的计算机上)。如果无法将文件放入目录,Flink还将考虑使用“hadoop-classpath”环境变量将hadoop-jar文件添加到类路径。

Amazon S3

有关可用的S3文件系统实现、配置和所需的库,请参阅[部署和操作-部署-AWS-S3:简单存储服务](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/aws.html)。

Alluxio

对于Alluxio支持,请将以下条目添加到“core site.xml”文件中:

  1. <property>
  2. <name>fs.alluxio.impl</name>
  3. <value>alluxio.hadoop.FileSystem</value>
  4. </property>

使用Hadoop的输入/输出格式包装器连接到其他系统

ApacheFlink允许用户访问许多不同的系统作为数据源或接收器。系统的设计非常容易扩展。与ApacheHadoop类似,Flink也有所谓的“inputformat”和“outputformat”的概念。

这些“inputformat”的一个实现是“hadoopinputformat”。这是一个包装器,允许用户将所有现有的Hadoop输入格式与Flink一起使用。

本节展示了一些将Flink连接到其他系统的示例。[了解有关Flink中Hadoop兼容性的更多信息](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/hadoop_compatibility.html)。

Flink中的Avro支持

Flink对[Apache Avro](http://avro.apache.org/)有广泛的内置支持。这使得用flink可以很容易地从avro文件中读取数据。此外,Flink的序列化框架能够处理从avro模式生成的类。请确保包含对项目pom.xml的Flink Avro依赖性。

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-avro</artifactId>
  4. <version>1.7.1</version>
  5. </dependency>

要从avro文件中读取数据,必须指定“AvroinputFormat”。

Example:

  1. AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
  2. DataSet<User> usersDS = env.createInput(users);

请注意,“user”是由avro生成的POJO。Flink还允许对这些pojos执行基于字符串的键选择。例如:

  1. usersDS.groupBy("name")

请注意,使用Flink可以使用“genericData.record”类型,但不推荐使用。因为记录包含完整的模式,所以它的数据非常密集,因此使用起来可能很慢。

Flink的pojo字段选择也适用于从avro生成的POJOS。但是,只有将字段类型正确写入生成的类中,才能使用。如果字段类型为“object”,则不能将该字段用作联接或分组键。像这样在avro中指定一个字段“”name“:”type_double_test“,”type“:”double“,”很好,但是将它指定为只有一个字段(”name“:”type_double_test“,”type“:[”double“],”)的联合类型将生成一个“object”类型的字段。请注意,可以指定可以为空的类型(“name”:“type_double_test”,“type”:[“null”,“double”],`)!

访问Microsoft Azure表存储

注:此示例从Flink 0.6开始工作-孵化

此示例使用“hadoop input format”包装器使用现有的hadoop输入格式实现访问[azure的表存储](https://azure.microsoft.com/en-us/documentation/articles/storage-introduction/)。

1.下载并编译“azure tables hadoop”项目。项目开发的输入格式在Maven Central中尚不可用,因此,我们必须自己构建项目。执行以下命令:

  1. git clone https://github.com/mooso/azure-tables-hadoop.git
  2. cd azure-tables-hadoop
  3. mvn clean install

2.使用快速入门设置新的Flink项目:

  1. curl https://flink.apache.org/q/quickstart.sh | bash

3.将以下依赖项(在“依赖项”部分)添加到您的“pom.xml”文件中:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-hadoop-compatibility_2.11</artifactId>
  4. <version>1.7.1</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.microsoft.hadoop</groupId>
  8. <artifactId>microsoft-hadoop-azure</artifactId>
  9. <version>0.0.4</version>
  10. </dependency>

Flink Hadoop Compatibility是一个提供Hadoop输入格式包装器的Flink包。Microsoft Hadoop Azure正在将我们以前构建的项目添加到我们的项目中。

这个项目现在已经准备好开始编码了。我们建议将项目导入到一个IDE中,如Eclipse或Intellij。(作为Maven项目导入!)注意到“Java.java”文件的代码。这是一个为一个Flink的工作的空demo。

将以下代码粘贴到其中:

  1. import java.util.Map;
  2. import org.apache.flink.api.common.functions.MapFunction;
  3. import org.apache.flink.api.java.DataSet;
  4. import org.apache.flink.api.java.ExecutionEnvironment;
  5. import org.apache.flink.api.java.tuple.Tuple2;
  6. import org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import com.microsoft.hadoop.azure.AzureTableConfiguration;
  10. import com.microsoft.hadoop.azure.AzureTableInputFormat;
  11. import com.microsoft.hadoop.azure.WritableEntity;
  12. import com.microsoft.windowsazure.storage.table.EntityProperty;
  13. public class AzureTableExample {
  14. public static void main(String[] args) throws Exception {
  15. // set up the execution environment
  16. final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  17. // create a AzureTableInputFormat, using a Hadoop input format wrapper
  18. HadoopInputFormat<Text, WritableEntity> hdIf = new HadoopInputFormat<Text, WritableEntity>(new AzureTableInputFormat(), Text.class, WritableEntity.class, new Job());
  19. // set the Account URI, something like: https://apacheflink.table.core.windows.net
  20. hdIf.getConfiguration().set(AzureTableConfiguration.Keys.ACCOUNT_URI.getKey(), "TODO");
  21. // set the secret storage key here
  22. hdIf.getConfiguration().set(AzureTableConfiguration.Keys.STORAGE_KEY.getKey(), "TODO");
  23. // set the table name here
  24. hdIf.getConfiguration().set(AzureTableConfiguration.Keys.TABLE_NAME.getKey(), "TODO");
  25. DataSet<Tuple2<Text, WritableEntity>> input = env.createInput(hdIf);
  26. // a little example how to use the data in a mapper.
  27. DataSet<String> fin = input.map(new MapFunction<Tuple2<Text,WritableEntity>, String>() {
  28. @Override
  29. public String map(Tuple2<Text, WritableEntity> arg0) throws Exception {
  30. System.err.println("--------------------------------\nKey = "+arg0.f0);
  31. WritableEntity we = arg0.f1;
  32. for(Map.Entry<String, EntityProperty> prop : we.getProperties().entrySet()) {
  33. System.err.println("key="+prop.getKey() + " ; value (asString)="+prop.getValue().getValueAsString());
  34. }
  35. return arg0.f0.toString();
  36. }
  37. });
  38. // emit result (this works only locally)
  39. fin.print();
  40. // execute program
  41. env.execute("Azure Example");
  42. }
  43. }

该示例演示了如何访问一个Azure表并将数据转换为Flink的“数据集”(更具体地说,集合的类型是“data set&lt;tuple2&lt;text,writableEntity&gt;gt;`)。使用“dataset”,可以将所有已知的转换应用到数据集。

访问MongoDB

This GitHub repository documents how to use MongoDB with Apache Flink (starting from 0.7-incubating).