DataStream API教程

译者:flink.sojb.cn

在本指南中,我们将从头开始,从设置Flink项目到在Flink集群上运行流分析程序。

Wikipedia提供了一个IRC频道,其中记录了对Wiki的所有编辑。我们将在Flink中读取此通道,并计算每个用户在给定时间窗口内编辑的字节数。这很容易使用Flink在几分钟内实现,但它将为您提供一个良好的基础,从而开始自己构建更复杂的分析程序。

设置Maven项目

我们将使用Flink Maven Archetype来创建我们的项目结构。有关此内容的更多详细信息,请参阅Java API快速入门。出于我们的目的,运行命令是这样的:

  1. $ mvn archetype:generate \
  2. -DarchetypeGroupId=org.apache.flink \
  3. -DarchetypeArtifactId=flink-quickstart-java \
  4. -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \
  5. -DarchetypeVersion=1.7-SNAPSHOT \
  6. -DgroupId=wiki-edits \
  7. -DartifactId=wiki-edits \
  8. -Dversion=0.1 \
  9. -Dpackage=wikiedits \
  10. -DinteractiveMode=false

注意:对于Maven 3.0或更高版本,不再可以通过命令行指定存储库(-DarchetypeCatalog)。如果要使用SNAPSHOT存储库,则需要向settings.xml添加存储库条目。有关此更改的详细信息,请参阅Maven官方文档

您可以编辑groupIdartifactIdpackage如果你喜欢。使用上面的参数,Maven将创建一个如下所示的项目结构:

  1. $ tree wiki-edits
  2. wiki-edits/
  3. ├── pom.xml
  4. └── src
  5. └── main
  6. ├── java
  7. └── wikiedits
  8. ├── BatchJob.java
  9. ├── SocketTextStreamWordCount.java
  10. ├── StreamingJob.java
  11. └── WordCount.java
  12. └── resources
  13. └── log4j.properties

我们的pom.xml文件已经在根目录中添加了Flink依赖项,并且有几个示例Flink程序src/main/java。我们可以删除示例程序,因为我们将从头开始:

  1. $ rm wiki-edits/src/main/java/wikiedits/*.java

作为最后一步,我们需要将Flink Wikipedia连接器添加为依赖关系,以便我们可以在我们的程序中使用它。编辑它的dependencies部分pom.xml,使它看起来像这样:

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-java</artifactId>
  5. <version>${flink.version}</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.flink</groupId>
  9. <artifactId>flink-streaming-java_2.11</artifactId>
  10. <version>${flink.version}</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.apache.flink</groupId>
  14. <artifactId>flink-clients_2.11</artifactId>
  15. <version>${flink.version}</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.apache.flink</groupId>
  19. <artifactId>flink-connector-wikiedits_2.11</artifactId>
  20. <version>${flink.version}</version>
  21. </dependency>
  22. </dependencies>

注意flink-connector-wikiedits_2.11添加的依赖项。(此示例和Wikipedia连接器的灵感来自Apache Samza 的Hello Samza示例。)

编写Flink程序

这是编码时间。启动您喜欢的IDE并导入Maven项目或打开文本编辑器并创建文件src/main/java/wikiedits/WikipediaAnalysis.java

  1. package wikiedits;
  2. public class WikipediaAnalysis {
  3. public static void main(String[] args) throws Exception {
  4. }
  5. }

该计划现在非常基础,但我们会尽力填写。请注意,我不会在此处提供import语句,因为IDE可以自动添加它们。在本节结束时,如果您只想跳过并在编辑器中输入,我将使用import语句显示完整的代码。

Flink程序的第一步是创建一个StreamExecutionEnvironment (或者ExecutionEnvironment如果您正在编写批处理作业)。这可用于设置执行参数并创建从外部系统读取的源。所以让我们继续把它添加到main方法:

  1. StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();

接下来,我们将创建一个从Wikipedia IRC日志中读取的源:

  1. DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());

这创建了一个我们可以进一步处理DataStreamWikipediaEditEvent数据元。出于本示例的目的,我们感兴趣的是确定每个用户在特定时间窗口中添加或删除的字节数,比如说五秒。为此,我们首先必须指定我们要在用户名上键入流,也就是说此流上的 算子操作应考虑用户名。在我们的例子中,窗口中编辑的字节的总和应该是每个唯一的用户。对于键入流,我们必须提供一个KeySelector,如下所示:

  1. KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
  2. .keyBy(new KeySelector<WikipediaEditEvent, String>() {
  3. @Override
  4. public String getKey(WikipediaEditEvent event) {
  5. return event.getUser();
  6. }
  7. });

这为我们提供了一个WikipediaEditEvent具有StringKeys的用户名。我们现在可以指定我们希望在此流上加上窗口,并根据这些窗口中的数据元计算结果。窗口指定要在其上执行计算的Stream片。在无限的数据元流上计算聚合时需要Windows。在我们的例子中,我们将说我们想要每五秒聚合一次编辑的字节总和:

  1. DataStream<Tuple2<String, Long>> result = keyedEdits
  2. .timeWindow(Time.seconds(5))
  3. .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
  4. @Override
  5. public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
  6. acc.f0 = event.getUser();
  7. acc.f1 += event.getByteDiff();
  8. return acc;
  9. }
  10. });

第一个调用,.timeWindow()指定我们想要有五秒钟的翻滚(非重叠)窗口。第二个调用为每个唯一键指定每个窗口切片的折叠变换。在我们的例子中,我们从一个初始值开始,("", 0L)并在其中为用户添加该时间窗口中每个编辑的字节差异。生成的Stream现在包含Tuple2&lt;String, Long&gt;每五秒钟发出一次的用户。

剩下要做的就是将流打印到控制台并开始执行:

  1. result.print();
  2. see.execute();

最后一次调用是启动实际Flink作业所必需的。所有 算子操作(例如创建源,转换和接收器)仅构建内部 算子操作的图形。只有在execute()被调用时 才会在集群上抛出或在本地计算机上执行此 算子操作图。

到目前为止完整的代码是这样的:

  1. package wikiedits;
  2. import org.apache.flink.api.common.functions.FoldFunction;
  3. import org.apache.flink.api.java.functions.KeySelector;
  4. import org.apache.flink.api.java.tuple.Tuple2;
  5. import org.apache.flink.streaming.api.datastream.DataStream;
  6. import org.apache.flink.streaming.api.datastream.KeyedStream;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.streaming.api.windowing.time.Time;
  9. import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
  10. import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
  11. public class WikipediaAnalysis {
  12. public static void main(String[] args) throws Exception {
  13. StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
  14. DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
  15. KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
  16. .keyBy(new KeySelector<WikipediaEditEvent, String>() {
  17. @Override
  18. public String getKey(WikipediaEditEvent event) {
  19. return event.getUser();
  20. }
  21. });
  22. DataStream<Tuple2<String, Long>> result = keyedEdits
  23. .timeWindow(Time.seconds(5))
  24. .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
  25. @Override
  26. public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
  27. acc.f0 = event.getUser();
  28. acc.f1 += event.getByteDiff();
  29. return acc;
  30. }
  31. });
  32. result.print();
  33. see.execute();
  34. }
  35. }

您可以使用Maven在IDE或命令行上运行此示例:

  1. $ mvn clean package
  2. $ mvn exec:java -Dexec.mainClass=wikiedits.WikipediaAnalysis

第一个命令构建我们的项目,第二个命令执行我们的主类。输出应该类似于:

  1. 1> (Fenix down,114)
  2. 6> (AnomieBOT,155)
  3. 8> (BD2412bot,-3690)
  4. 7> (IgnorantArmies,49)
  5. 3> (Ckh3111,69)
  6. 5> (Slade360,0)
  7. 7> (Narutolovehinata5,2195)
  8. 6> (Vuyisa2001,79)
  9. 4> (Ms Sarah Welch,269)
  10. 4> (KasparBot,-245)

每行前面的数字告诉您输出生成的打印接收器的哪个并行实例。

这应该让您开始编写自己的Flink程序。要了解更多信息,您可以查看我们的基本概念指南和 DataStream API。如果您想了解如何在自己的机器上设置Flink群集并将结果写入Kafka,请坚持参加奖励练习。

奖金练习:在群集上运行并写入Kafka

请按照我们的本地安装教程在您的机器上设置Flink分发,并 在继续 算子操作之前参考Kafka快速入门以设置Kafka安装。

作为第一步,我们必须添加Flink Kafka连接器作为依赖关系,以便我们可以使用Kafka接收器。将其添加到pom.xml依赖项部分中的文件:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-kafka-0.8_2.11</artifactId>
  4. <version>${flink.version}</version>
  5. </dependency>

接下来,我们需要修改我们的程序。我们将移除print()水槽,而是使用Kafka水槽。新代码如下所示:

  1. result
  2. .map(new MapFunction<Tuple2<String,Long>, String>() {
  3. @Override
  4. public String map(Tuple2<String, Long> tuple) {
  5. return tuple.toString();
  6. }
  7. })
  8. .addSink(new FlinkKafkaProducer08<>("localhost:9092", "wiki-result", new SimpleStringSchema()));

还需要导入相关的类:

  1. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
  2. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  3. import org.apache.flink.api.common.functions.MapFunction;

注意我们是如何第一个转换的流Tuple2&lt;String, Long&gt;来流String使用MapFunction。我们这样做是因为将简单字符串写入Kafka更容易。然后,我们创建了一个Kafka水槽。您可能必须使主机名和端口适应您的设置。"wiki-result" 是运行我们的程序之前我们将要创建的Kafka流的名称。使用Maven构建项目因为我们需要jar文件在集群上运行:

  1. $ mvn clean package

生成的jar文件将位于target子文件夹中:target/wiki-edits-0.1.jar。我们稍后会用到它。

现在我们准备启动Flink集群并运行写入Kafka的程序。转到安装Flink的位置并启动本地群集:

  1. $ cd my/flink/directory
  2. $ bin/start-cluster.sh

我们还必须创建Kafka主题,以便我们的程序可以写入它:

  1. $ cd my/kafka/directory
  2. $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic wiki-results

现在我们准备在本地Flink集群上运行我们的jar文件:

  1. $ cd my/flink/directory
  2. $ bin/flink run -c wikiedits.WikipediaAnalysis path/to/wikiedits-0.1.jar

如果一切按计划进行,那么该命令的输出应该与此类似:

  1. 03/08/2016 15:09:27 Job execution switched to status RUNNING.
  2. 03/08/2016 15:09:27 Source: Custom Source(1/1) switched to SCHEDULED
  3. 03/08/2016 15:09:27 Source: Custom Source(1/1) switched to DEPLOYING
  4. 03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to SCHEDULED
  5. 03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to DEPLOYING
  6. 03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to RUNNING
  7. 03/08/2016 15:09:27 Source: Custom Source(1/1) switched to RUNNING

您可以看到各个算子如何开始运行。只有两个,因为出于性能原因,窗口之后的 算子操作被折叠成一个 算子操作。在Flink,我们称之为链接

您可以通过使用Kafka控制台使用者检查Kafka主题来观察程序的输出:

  1. bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic wiki-result

您还可以查看应在http:// localhost:8081上运行的Flink仪表板。您将获得群集资源和正在运行的作业的概述:

JobManager概述

如果单击正在运行的作业,您将获得一个视图,您可以在其中检查各个 算子操作,例如,查看已处理数据元的数量:

作业视图示例

这就结束了我们对Flink的小游览。如果您有任何疑问,请随时询问我们的邮件列表