使用 Java API 的样例工程

通过简单地几步来开始编写你的 Flink Java 程序。

要求

唯一的要求是需要安装 Maven 3.0.4 (或者更高)和 Java 7.x (或者更高)

创建工程

使用下面其中一个命令来创建Flink Java工程

bash $ mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \{% unless site.is_stable %} -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %} -DarchetypeVersion={{site.version}} 这种方式允许你为新创建的工程命名。而且会以交互式地方式询问你为 groupId, artifactId 以及 package 命名。
{% if site.is_stable %} $ curl https://flink.apache.org/q/quickstart.sh | bash {% else %} $ curl https://flink.apache.org/q/quickstart-SNAPSHOT.sh | bash {% endif %}

检查工程

运行完上面的命令会在当前工作目录下创建一个新目录。如果你使用了 curl 命令来创建 Flink Java 工程,这个目录的名称是 quickstart。否则,就是你输入的 artifactId 名字:

  1. $ tree quickstart/
  2. quickstart/
  3. ├── pom.xml
  4. └── src
  5. └── main
  6. ├── java
  7. └── org
  8. └── myorg
  9. └── quickstart
  10. ├── BatchJob.java
  11. ├── SocketTextStreamWordCount.java
  12. ├── StreamingJob.java
  13. └── WordCount.java
  14. └── resources
  15. └── log4j.properties

这个工程是一个 Maven 工程, 包含四个类。 StreamingJob 和 BatchJob 是基本的框架程序,SocketTextStreamWordCount 是一个 Streaming 示例;而 WordCount 是一个 Batch 示例。需要注意的是,所有这些类的 main 方法都允许你在开发/测试模式下启动Flink。

我们推荐将这个工程导入到你的IDE中,并进行开发和测试。 如果你用的是 Eclipse,可以使用 m2e 插件导入 Maven 工程。有些Eclipse发行版 默认嵌入了这个插件,其他的需要你手动去安装。IntelliJ IDE内置就提供了对 Maven 工程的支持。

给Mac OS X用户的建议:默认的 JVM 堆内存对 Flink 来说太小了,你必须手动增加内存。这里以 Eclipse 为例,依次选择 Run Configurations -> Arguments,然后在 VM Arguments 里写入:-Xmx800m

编译工程

如果你想要 编译你的工程 , 进入到工程所在目录,并输入 mvn clean install -Pbuild-jar 命令。 你将会找到 target/original-your-artifact-id-your-version.jar 文件,它可以在任意的 Flink 集群上运行。 还有一个 fat-jar,名为 target/your-artifact-id-your-version.jar ,包含了所有添加到 Maven 工程的依赖。

下一步

编写我们自己的程序!

Quickstart 工程包含了一个 WordCount 的实现,也就是大数据处理系统的 Hello World。WordCount 的目标是计算文本中单词出现的频率。比如: 单词 “the” 或者 “house” 在所有的Wikipedia文本中出现了多少次。

样本输入:

  1. big data is big

样本输出:

  1. big 2
  2. data 1
  3. is 1

下面的代码就是 Quickstart 工程的 WordCount 实现,它使用两种操作( FlatMap 和 Reduce )处理了一些文本,并且在标准输出中打印了单词的计数结果。

  1. public class WordCount {
  2. public static void main(String[] args) throws Exception {
  3. // set up the execution environment
  4. final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  5. // get input data
  6. DataSet<String> text = env.fromElements(
  7. "To be, or not to be,--that is the question:--",
  8. "Whether 'tis nobler in the mind to suffer",
  9. "The slings and arrows of outrageous fortune",
  10. "Or to take arms against a sea of troubles,"
  11. );
  12. DataSet<Tuple2<String, Integer>> counts =
  13. // split up the lines in pairs (2-tuples) containing: (word,1)
  14. text.flatMap(new LineSplitter())
  15. // group by the tuple field "0" and sum up tuple field "1"
  16. .groupBy(0)
  17. .sum(1);
  18. // execute and print result
  19. counts.print();
  20. }
  21. }

这些操作是在专门的类中定义的,下面是 LineSplitter 类的实现。

~~~java public static final class LineSplitter implements FlatMapFunction> {

@Override public void flatMap(String value, Collector> out) { // normalize and split the line String[] tokens = value.toLowerCase().split(“\W+”);

  1. // emit the pairs
  2. for (String token : tokens) {
  3. if (token.length() > 0) {
  4. out.collect(new Tuple2<String, Integer>(token, 1));
  5. }
  6. }

} }