DataStream API 教程

在本指南中,我们将从头开始,从设置Flink项目到在Flink集群上运行流分析程序。

Wikipedia提供了一个IRC通道,其中记录了对wiki的所有编辑。我们将在Flink中读取这个通道,并计算每个用户在给定时间窗口内编辑的字节数。这很容易使用Flink在几分钟内实现,但是它将为您自己开始构建更复杂的分析程序提供良好的基础。

构建 Maven 项目

我们将使用Flink Maven原型来创建项目结构。请参见Java API Quickstart 了解更多细节。出于我们的目的,运行的命令如下:

  1. $ mvn archetype:generate \
  2. -DarchetypeGroupId=org.apache.flink \
  3. -DarchetypeArtifactId=flink-quickstart-java \
  4. -DarchetypeVersion=1.7.1 \
  5. -DgroupId=wiki-edits \
  6. -DartifactId=wiki-edits \
  7. -Dversion=0.1 \
  8. -Dpackage=wikiedits \
  9. -DinteractiveMode=false

如果您愿意,可以编辑groupId, artifactIdpackage。使用上述参数,Maven将创建一个如下所示的项目结构:

  1. $ tree wiki-edits
  2. wiki-edits/
  3. ├── pom.xml
  4. └── src
  5. └── main
  6. ├── java
  7. └── wikiedits
  8. ├── BatchJob.java
  9. └── StreamingJob.java
  10. └── resources
  11. └── log4j.properties

这是我们的pom.xml。已经在根目录中添加了Flink依赖项的xml文件,以及src/main/java中的几个示例Flink程序。我们可以删除示例程序,因为我们要从头开始:

  1. $ rm wiki-edits/src/main/java/wikiedits/*.java

最后一步,我们需要将Flink Wikipedia连接器作为依赖项添加,以便在程序中使用它。编辑pom.xml 中的dependencies部分。是这样的:

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-java</artifactId>
  5. <version>${flink.version}</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.flink</groupId>
  9. <artifactId>flink-streaming-java_2.11</artifactId>
  10. <version>${flink.version}</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.apache.flink</groupId>
  14. <artifactId>flink-clients_2.11</artifactId>
  15. <version>${flink.version}</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.apache.flink</groupId>
  19. <artifactId>flink-connector-wikiedits_2.11</artifactId>
  20. <version>${flink.version}</version>
  21. </dependency>
  22. </dependencies>

注意添加的flink-connector-wikiedits_2.11依赖项。(这个示例和Wikipedia连接器的灵感来自Apache Samza的 Hello Samza 示例。)

编写一个Flink程序

它的编码时间。启动您最喜欢的IDE并导入Maven项目,或者打开文本编辑器并创建文件src/main/java/wikiedits/WikipediaAnalysis.java:

  1. package wikiedits;
  2. public class WikipediaAnalysis {
  3. public static void main(String[] args) throws Exception {
  4. }
  5. }

这个程序现在很简单,但我们会边走边填。注意,我不会在这里给出import语句,因为ide可以自动添加它们。在本节的最后,我将展示带有import语句的完整代码,如果您只是想跳过它并在编辑器中输入它的话。

Flink程序的第一步是创建一个StreamExecutionEnvironment(如果您正在编写批处理作业,则为ExecutionEnvironment)。这可以用于设置执行参数和创建用于从外部系统读取的源。让我们把这个添加到主方法中:

  1. StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();

接下来,我们将创建一个源代码,从维基百科IRC日志:

  1. DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());

这将创建一个WikipediaEditEvent元素的DataStream,我们可以进一步处理它。对于本例的目的,我们感兴趣的是确定每个用户在某个时间窗口(假设为5秒)中添加或删除的字节数。为此,我们首先必须指定我们想要在用户名上键入流,也就是说,该流上的操作应该考虑到用户名。在我们的示例中,窗口中已编辑字节的总和应该是每个惟一用户的。要键入一个流,我们必须提供一个“键选择器”,像这样:

  1. KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
  2. .keyBy(new KeySelector<WikipediaEditEvent, String>() {
  3. @Override
  4. public String getKey(WikipediaEditEvent event) {
  5. return event.getUser();
  6. }
  7. });

这给了我们一个WikipediaEditEvent流,它有一个String键,用户名。现在,我们可以指定希望将窗口应用于此流,并基于这些窗口中的元素计算结果。窗口指定要对其执行计算的流片。在计算无限元素流上的聚合时,需要使用Windows。在我们的例子中,我们会说我们想要每5秒聚合编辑字节的总和:

  1. DataStream<Tuple2<String, Long>> result = keyedEdits
  2. .timeWindow(Time.seconds(5))
  3. .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
  4. @Override
  5. public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
  6. acc.f0 = event.getUser();
  7. acc.f1 += event.getByteDiff();
  8. return acc;
  9. }
  10. });

第一个调用.timeWindow()指定我们希望滚动(非重叠)窗口的时间为5秒。第二个调用为每个惟一键在每个窗口切片上指定 Fold transformation。在我们的示例中,我们从一个初始值("", 0L) 开始,并将用户在该时间窗口中的每次编辑的字节差添加到该值中。生成的流现在为每5秒发出一次的每个用户包含一个Tuple2&lt;String, Long&gt;

唯一要做的就是将流打印到控制台并开始执行:

  1. result.print();
  2. see.execute();

最后一次调用是启动实际Flink作业所必需的。所有的操作,例如创建源、转换和接收器,只构建内部操作的图。只有在调用execute() 时,才会在集群上抛出或在本地机器上执行此操作图。

目前完整的代码是:

  1. package wikiedits;
  2. import org.apache.flink.api.common.functions.FoldFunction;
  3. import org.apache.flink.api.java.functions.KeySelector;
  4. import org.apache.flink.api.java.tuple.Tuple2;
  5. import org.apache.flink.streaming.api.datastream.DataStream;
  6. import org.apache.flink.streaming.api.datastream.KeyedStream;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.streaming.api.windowing.time.Time;
  9. import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
  10. import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
  11. public class WikipediaAnalysis {
  12. public static void main(String[] args) throws Exception {
  13. StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
  14. DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
  15. KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
  16. .keyBy(new KeySelector<WikipediaEditEvent, String>() {
  17. @Override
  18. public String getKey(WikipediaEditEvent event) {
  19. return event.getUser();
  20. }
  21. });
  22. DataStream<Tuple2<String, Long>> result = keyedEdits
  23. .timeWindow(Time.seconds(5))
  24. .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
  25. @Override
  26. public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
  27. acc.f0 = event.getUser();
  28. acc.f1 += event.getByteDiff();
  29. return acc;
  30. }
  31. });
  32. result.print();
  33. see.execute();
  34. }
  35. }

您可以在IDE或命令行上运行这个示例,使用Maven:

  1. $ mvn clean package
  2. $ mvn exec:java -Dexec.mainClass=wikiedits.WikipediaAnalysis

第一个命令构建我们的项目,第二个命令执行我们的主类。输出应该类似于这样:

  1. 1> (Fenix down,114)
  2. 6> (AnomieBOT,155)
  3. 8> (BD2412bot,-3690)
  4. 7> (IgnorantArmies,49)
  5. 3> (Ckh3111,69)
  6. 5> (Slade360,0)
  7. 7> (Narutolovehinata5,2195)
  8. 6> (Vuyisa2001,79)
  9. 4> (Ms Sarah Welch,269)
  10. 4> (KasparBot,-245)

每行前面的数字告诉您输出是在打印接收器的哪个并行实例上生成的。

这将使您开始编写自己的Flink程序。要了解更多信息,您可以查看我们关于basic conceptsDataStream API的指南。如果您想了解如何在自己的机器上设置Flink集群,并将结果写入Kafka,请继续进行额外的练习。

额外的练习:在集群上运行并写信给Kafka

请按照我们的local setup教程在您的机器上设置Flink发行版,并在继续之前参考Kafka quickstart设置Kafka安装。

作为第一步,我们必须将Flink Kafka连接器作为依赖项添加,以便能够使用Kafka接收器。把这个加到pom.xml中。依赖项部分中的xml ‘文件:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
  4. <version>${flink.version}</version>
  5. </dependency>

接下来,我们需要修改我们的程序。我们将删除print() 接收器,而是使用Kafka接收器。新代码如下:

  1. result
  2. .map(new MapFunction<Tuple2<String,Long>, String>() {
  3. @Override
  4. public String map(Tuple2<String, Long> tuple) {
  5. return tuple.toString();
  6. }
  7. })
  8. .addSink(new FlinkKafkaProducer011<>("localhost:9092", "wiki-result", new SimpleStringSchema()));

相关类也需要导入:

  1. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
  2. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  3. import org.apache.flink.api.common.functions.MapFunction;

注意我们如何首先使用MapFunction将Tuple2&lt;String, Long&gt;的流转换为String 的流。我们这样做是因为向Kafka编写普通字符串更容易。然后,我们创建一个卡夫卡水槽。您可能需要根据您的设置调整主机名和端口。"wiki-result"是我们在运行程序之前要创建的Kafka流的名称。使用Maven构建项目,因为我们需要jar文件在集群上运行:

  1. $ mvn clean package

生成的jar文件将位于target子文件夹中:target/wiki-edits-0.1.jar。我们以后会用到这个。

现在,我们准备启动一个Flink集群并运行在其上写入Kafka的程序。转到您安装Flink的位置并启动本地集群:

  1. $ cd my/flink/directory
  2. $ bin/start-cluster.sh

我们还需要创建Kafka主题,这样我们的程序就可以写:

  1. $ cd my/kafka/directory
  2. $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wiki-results

现在我们可以在本地Flink集群上运行jar文件了:

  1. $ cd my/flink/directory
  2. $ bin/flink run -c wikiedits.WikipediaAnalysis path/to/wikiedits-0.1.jar

如果一切按照计划进行,那么该命令的输出应该类似于以下内容:

  1. 03/08/2016 15:09:27 Job execution switched to status RUNNING.
  2. 03/08/2016 15:09:27 Source: Custom Source(1/1) switched to SCHEDULED
  3. 03/08/2016 15:09:27 Source: Custom Source(1/1) switched to DEPLOYING
  4. 03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to SCHEDULED
  5. 03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to DEPLOYING
  6. 03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to RUNNING
  7. 03/08/2016 15:09:27 Source: Custom Source(1/1) switched to RUNNING

您可以看到各个操作符是如何开始运行的。只有两个操作,因为出于性能原因,窗口之后的操作被折叠成一个操作。在Flink中,我们称之为 chaining

您可以通过使用Kafka控制台消费者查看Kafka主题来观察程序的输出:

  1. bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic wiki-result

您还可以查看应该在http://localhost:8081上运行的Flink仪表板。您将获得集群资源和正在运行的作业的概述:

JobManager Overview

如果您单击正在运行的作业,您将看到一个视图,您可以在其中检查各个操作,例如,查看处理的元素的数量:

Example Job View

我们的Flink之旅到此结束。如果您有任何问题,请毫不犹豫地向我们的 Mailing Lists提问。