连接器
从文件系统读取
Flink内置了对以下文件系统的支持:
文件系统 | 方案 | 提示 |
---|---|---|
Hadoop Distributed File System (HDFS) | hdfs:// |
所有都HDFS都支持 |
Amazon S3 | s3:// |
通过Hadoop文件系统实现支持(见下文) |
MapR file system | maprfs:// |
用户必须手动将所需的JAR文件放在’lib/`dir中 |
Alluxio | alluxio:// |
通过Hadoop文件系统实现支持(见下文) |
使用Hadoop文件系统实现
ApacheFlink允许用户使用任何实现’org.apache.hadoop.fs.file system’接口的文件系统。有hadoopfilesystem
实现用于
- S3 (tested)
- Google Cloud Storage Connector for Hadoop (tested)
- Alluxio (tested)
- XtreemFS (tested)
- FTP via Hftp (not tested)
- 还有更多.
为了在Flink中使用Hadoop文件系统,请确保
*’flink-conf.yaml’已将’fs.hdfs.hadoop conf’属性设置为Hadoop配置目录。对于自动测试或从IDE运行,可以通过定义“flink-conf-dir”环境变量来设置包含“flink-conf.yaml”的目录。
*Hadoop配置(在该目录中)在“core site.xml”文件中具有所需文件系统的条目。S3和Alluxio的示例如下所示。
*使用文件系统所需的类可以在Flink安装的“lib/”文件夹中找到(在所有运行Flink的计算机上)。如果无法将文件放入目录,Flink还将考虑使用“hadoop-classpath”环境变量将hadoop-jar文件添加到类路径。
Amazon S3
有关可用的S3文件系统实现、配置和所需的库,请参阅[部署和操作-部署-AWS-S3:简单存储服务](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/aws.html)。
Alluxio
对于Alluxio支持,请将以下条目添加到“core site.xml”文件中:
<property>
<name>fs.alluxio.impl</name>
<value>alluxio.hadoop.FileSystem</value>
</property>
使用Hadoop的输入/输出格式包装器连接到其他系统
ApacheFlink允许用户访问许多不同的系统作为数据源或接收器。系统的设计非常容易扩展。与ApacheHadoop类似,Flink也有所谓的“inputformat”和“outputformat”的概念。
这些“inputformat”的一个实现是“hadoopinputformat”。这是一个包装器,允许用户将所有现有的Hadoop输入格式与Flink一起使用。
本节展示了一些将Flink连接到其他系统的示例。[了解有关Flink中Hadoop兼容性的更多信息](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/hadoop_compatibility.html)。
Flink中的Avro支持
Flink对[Apache Avro](http://avro.apache.org/)有广泛的内置支持。这使得用flink可以很容易地从avro文件中读取数据。此外,Flink的序列化框架能够处理从avro模式生成的类。请确保包含对项目pom.xml的Flink Avro依赖性。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.7.1</version>
</dependency>
要从avro文件中读取数据,必须指定“AvroinputFormat”。
Example:
AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
DataSet<User> usersDS = env.createInput(users);
请注意,“user”是由avro生成的POJO。Flink还允许对这些pojos执行基于字符串的键选择。例如:
usersDS.groupBy("name")
请注意,使用Flink可以使用“genericData.record”类型,但不推荐使用。因为记录包含完整的模式,所以它的数据非常密集,因此使用起来可能很慢。
Flink的pojo字段选择也适用于从avro生成的POJOS。但是,只有将字段类型正确写入生成的类中,才能使用。如果字段类型为“object”,则不能将该字段用作联接或分组键。像这样在avro中指定一个字段“”name“:”type_double_test“,”type“:”double“,”很好,但是将它指定为只有一个字段(
”name“:”type_double_test“,”type“:[”double“],”)的联合类型将生成一个“object”类型的字段。请注意,可以指定可以为空的类型(
“name”:“type_double_test”,“type”:[“null”,“double”],`)!
访问Microsoft Azure表存储
注:此示例从Flink 0.6开始工作-孵化
此示例使用“hadoop input format”包装器使用现有的hadoop输入格式实现访问[azure的表存储](https://azure.microsoft.com/en-us/documentation/articles/storage-introduction/)。
1.下载并编译“azure tables hadoop”项目。项目开发的输入格式在Maven Central中尚不可用,因此,我们必须自己构建项目。执行以下命令:
git clone https://github.com/mooso/azure-tables-hadoop.git
cd azure-tables-hadoop
mvn clean install
2.使用快速入门设置新的Flink项目:
curl https://flink.apache.org/q/quickstart.sh | bash
3.将以下依赖项(在“依赖项”部分)添加到您的“pom.xml”文件中:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>com.microsoft.hadoop</groupId>
<artifactId>microsoft-hadoop-azure</artifactId>
<version>0.0.4</version>
</dependency>
Flink Hadoop Compatibility
是一个提供Hadoop输入格式包装器的Flink包。Microsoft Hadoop Azure
正在将我们以前构建的项目添加到我们的项目中。
这个项目现在已经准备好开始编码了。我们建议将项目导入到一个IDE中,如Eclipse或Intellij。(作为Maven项目导入!)注意到“Java.java”文件的代码。这是一个为一个Flink的工作的空demo。
将以下代码粘贴到其中:
import java.util.Map;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import com.microsoft.hadoop.azure.AzureTableConfiguration;
import com.microsoft.hadoop.azure.AzureTableInputFormat;
import com.microsoft.hadoop.azure.WritableEntity;
import com.microsoft.windowsazure.storage.table.EntityProperty;
public class AzureTableExample {
public static void main(String[] args) throws Exception {
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// create a AzureTableInputFormat, using a Hadoop input format wrapper
HadoopInputFormat<Text, WritableEntity> hdIf = new HadoopInputFormat<Text, WritableEntity>(new AzureTableInputFormat(), Text.class, WritableEntity.class, new Job());
// set the Account URI, something like: https://apacheflink.table.core.windows.net
hdIf.getConfiguration().set(AzureTableConfiguration.Keys.ACCOUNT_URI.getKey(), "TODO");
// set the secret storage key here
hdIf.getConfiguration().set(AzureTableConfiguration.Keys.STORAGE_KEY.getKey(), "TODO");
// set the table name here
hdIf.getConfiguration().set(AzureTableConfiguration.Keys.TABLE_NAME.getKey(), "TODO");
DataSet<Tuple2<Text, WritableEntity>> input = env.createInput(hdIf);
// a little example how to use the data in a mapper.
DataSet<String> fin = input.map(new MapFunction<Tuple2<Text,WritableEntity>, String>() {
@Override
public String map(Tuple2<Text, WritableEntity> arg0) throws Exception {
System.err.println("--------------------------------\nKey = "+arg0.f0);
WritableEntity we = arg0.f1;
for(Map.Entry<String, EntityProperty> prop : we.getProperties().entrySet()) {
System.err.println("key="+prop.getKey() + " ; value (asString)="+prop.getValue().getValueAsString());
}
return arg0.f0.toString();
}
});
// emit result (this works only locally)
fin.print();
// execute program
env.execute("Azure Example");
}
}
该示例演示了如何访问一个Azure表并将数据转换为Flink的“数据集”(更具体地说,集合的类型是“data set<;tuple2<;text,writableEntity>;gt;`)。使用“dataset”,可以将所有已知的转换应用到数据集。
访问MongoDB
This GitHub repository documents how to use MongoDB with Apache Flink (starting from 0.7-incubating).