1. 创建项目

  1. <properties>
  2. <flink.version>1.13.0</flink.version>
  3. <java.version>1.8</java.version>
  4. <scala.binary.version>2.12</scala.binary.version>
  5. <slf4j.version>1.7.30</slf4j.version>
  6. </properties>
  7. ...
  8. <dependency>
  9. <groupId>org.apache.flink</groupId>
  10. <artifactId>flink-java</artifactId>
  11. <version>${flink.version}</version>
  12. </dependency>
  13. <dependency>
  14. <groupId>org.apache.flink</groupId>
  15. <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
  16. <version>${flink.version}</version>
  17. </dependency>
  18. <dependency>
  19. <groupId>org.apache.flink</groupId>
  20. <artifactId>flink-clients_${scala.binary.version}</artifactId>
  21. <version>${flink.version}</version>
  22. </dependency>

2. 编写代码

2.1 批处理

批处理读取到文件后,统一进行处理

  1. import org.apache.flink.api.common.typeinfo.Types;
  2. import org.apache.flink.api.java.ExecutionEnvironment;
  3. import org.apache.flink.api.java.operators.AggregateOperator;
  4. import org.apache.flink.api.java.operators.DataSource;
  5. import org.apache.flink.api.java.operators.FlatMapOperator;
  6. import org.apache.flink.api.java.operators.UnsortedGrouping;
  7. import org.apache.flink.api.java.tuple.Tuple2;
  8. import org.apache.flink.util.Collector;
  9. public class BatchWordCount {
  10. public static void main(String[] args) throws Exception {
  11. // 1. 创建执行环境 --> DataSet 级别 API(批处理)
  12. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  13. // 2. 从文件读取数据 按行读取(存储的元素就是每行的文本)
  14. DataSource<String> lineDS = env.readTextFile("input/words.txt");
  15. // 3. 转换数据格式
  16. FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS
  17. .flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
  18. String[] words = line.split(" ");
  19. for (String word : words) {
  20. out.collect(Tuple2.of(word, 1L));
  21. }
  22. })
  23. .returns(Types.TUPLE(Types.STRING, Types.LONG)); //当Lambda表达式使用 Java 泛型的时候, 由于泛型擦除的存在, 需要显示的声明类型信息
  24. // 4. 按照 word 进行分组
  25. UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);
  26. // 5. 分组内聚合统计
  27. AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);
  28. // 6. 打印结果
  29. sum.print();
  30. }
  31. }
  32. ------------------------------
  33. (java,1)
  34. (flink,1)
  35. (world,1)
  36. (hello,3)

2.2 流处理

一行一行读取输入的内容,一行一行的处理

2.2.1 读取文件(有界流)

  1. package com.atguigu.chapter02;
  2. /**
  3. * Copyright (c) 2020-2030 尚硅谷 All Rights Reserved
  4. * <p>
  5. * Project: FlinkTutorial
  6. * <p>
  7. * Created by wushengran
  8. */
  9. import org.apache.flink.api.common.typeinfo.Types;
  10. import org.apache.flink.api.java.tuple.Tuple2;
  11. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  12. import org.apache.flink.streaming.api.datastream.KeyedStream;
  13. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  14. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  15. import org.apache.flink.util.Collector;
  16. import java.util.Arrays;
  17. public class BoundedStreamWordCount {
  18. public static void main(String[] args) throws Exception {
  19. // 1. 创建流式执行环境 --> DataStream 级别 API
  20. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  21. // 2. 读取文件
  22. DataStreamSource<String> lineDSS = env.readTextFile("input/words.txt");
  23. // 3. 转换数据格式
  24. SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS
  25. .flatMap((String line, Collector<String> words) -> {
  26. Arrays.stream(line.split(" ")).forEach(words::collect);
  27. })
  28. .returns(Types.STRING)
  29. .map(word -> Tuple2.of(word, 1L))
  30. .returns(Types.TUPLE(Types.STRING, Types.LONG));
  31. // 4. 分组
  32. KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne
  33. .keyBy(t -> t.f0);
  34. // 5. 求和
  35. SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKS
  36. .sum(1);
  37. // 6. 打印
  38. result.print();
  39. // 7. 执行
  40. env.execute();
  41. }
  42. }
  43. ------------------------------
  44. Tipgroup by 后,同一个分组同一个线程处理;flink使用多线程进行异步处理。
  45. 3> (world,1)
  46. 2> (hello,1)
  47. 4> (flink,1)
  48. 2> (hello,2)
  49. 2> (hello,3)
  50. 1> (java,1)

Flink 是一个分布式处理引擎,所以我们的程序应该也是分布式运行的。在开发环境里,会通过多线程来模拟 Flink 集群运行。所以这里结果前的数字,其实就指示了本地执行的不同线程,对应着 Flink 运行时不同的并行资源。这样第一个乱序的问题也就解决了:既然是并行执行,不同线程的输出结果,自然也就无法保持输入的顺序了。

2.2.2 读取文本流(无界流)

  1. import org.apache.flink.api.common.typeinfo.Types;
  2. import org.apache.flink.api.java.tuple.Tuple2;
  3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  4. import org.apache.flink.streaming.api.datastream.KeyedStream;
  5. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.util.Collector;
  8. import java.util.Arrays;
  9. public class StreamWordCount {
  10. public static void main(String[] args) throws Exception {
  11. // 1. 创建流式执行环境
  12. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  13. // 2. 读取文本流
  14. DataStreamSource<String> lineDSS = env.socketTextStream("hadoop102", 7777);
  15. // 3. 转换数据格式
  16. SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS
  17. .flatMap((String line, Collector<String> words) -> {
  18. Arrays.stream(line.split(" ")).forEach(words::collect);
  19. })
  20. .returns(Types.STRING)
  21. .map(word -> Tuple2.of(word, 1L))
  22. .returns(Types.TUPLE(Types.STRING, Types.LONG));
  23. // 4. 分组
  24. KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne
  25. .keyBy(t -> t.f0);
  26. // 5. 求和
  27. SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKS
  28. .sum(1);
  29. // 6. 打印
  30. result.print();
  31. // 7. 执行
  32. env.execute();
  33. }
  34. }
  35. ------------------------------
  36. 服务器 发送数据:
  37. hello flink
  38. hello world
  39. hello java
  40. 可以看到控制台输出结果如下:
  41. 4> (flink,1)
  42. 2> (hello,1)
  43. 3> (world,1)
  44. 2> (hello,2)
  45. 2> (hello,3)
  46. 1> (java,1)
  1. 代码说明和注意事项:
  • socket 文本流的读取需要配置两个参数:发送端主机名和端口号。这里代码中指定了主机“hadoop102”的 7777 端口作为发送数据的 socket 端口,读者可以根据测试环境自行配置;
  • 在实际项目应用中,主机名和端口号这类信息往往可以通过配置文件,或者传入程序运行参数的方式来指定;
  • socket文本流数据的发送,可以通过Linux系统自带的netcat工具进行模拟。
  1. 在 Linux 环境的主机 hadoop102 上,执行下列命令,发送数据进行测试:
    1. $ nc -lk 7777

3. 总结

本章主要实现一个 Flink 开发的入门程序——词频统计 WordCount。通过批处理和流处理两种不同模式的实现,可以对 Flink 的 API 风格和编程方式有所熟悉,并且更加深刻地理解批处理和流处理的不同。另外,通过读取有界数据(文件)和无界数据(socket 文本流)进行流处理的比较,我们也可以更加直观地体会到 Flink 流处理的方式和特点。