banner.webp

1.认识Flink

  1. FlinkApache软件基金会旗下的一个开源大数据处理框架。目前Flink已经成为各大公司大数据实时处理的发力重点,国内以阿里巴巴为代表的一些大型互联网企业都在全力投入,贡献了大量源码。如今,Flink已被很多人认为是大数据实时处理的方向与未来。
  2. Flink起源于一个叫做Stratosphere的项目,它是由3所地处柏林的大学和欧洲其他一些大学共同进行的研究项目,由柏林工业大学的教授沃可尔·马尔科(Volker Markl)领衔开发。20144月,Stratosphere的代码被复制并捐赠给了Apache软件基金会,Flink就是在此基础上被重新设计出来的。

Flink的发展时间线

  • 2014年8月,Flink的第一个版本0.6正式发布(之前的版本都在Stratosphere名下)。与此同时,Flink的几位核心开发者创办了Data Artisans公司,主要做Flink的商业应用,帮助企业部署大规模数据处理解决方案
  • 2014年12月,Flink项目完成了孵化,一跃成为Apache软件基金会的顶级项目
  • 2015年4月,Flink发布了里程碑式的重要版本0.9.0,很多国内外大公司也是从这时开始关注参与Flink社区的建设的。
  • 2019年1月,长期对Flink投入研发的阿里巴巴以9000万欧元收购了Data Artisans公司;之后又将自己的内部版本Blink开源,继而与8月发布的Flink1.9.0版本进行了合并。自此之后,Flink遍地开花,称为了当前最火的新一代大数据处理框架之一。

使用Flink的各大企业
1.png

2.Flink基本Demo的代码实现

  1. 上面我们了解了一下Flink的诞生基本概念,核心概念我们边用边了解,先来手过一遍一个基本Demo--wordcount
  2. 我们创建一个maven项目,pom.xml如下所示
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>com.flink</groupId>
  7. <artifactId>springboot_flink</artifactId>
  8. <version>1.0</version>
  9. <properties>
  10. <flink.version>1.13.0</flink.version>
  11. <java.version>1.8</java.version>
  12. <scala.binary.version>2.12</scala.binary.version>
  13. <slf4j.version>1.7.30</slf4j.version>
  14. </properties>
  15. <dependencies>
  16. <!--fink相关依赖-->
  17. <dependency>
  18. <groupId>org.apache.flink</groupId>
  19. <artifactId>flink-java</artifactId>
  20. <version>${flink.version}</version>
  21. </dependency>
  22. <dependency>
  23. <groupId>org.apache.flink</groupId>
  24. <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
  25. <version>${flink.version}</version>
  26. </dependency>
  27. <dependency>
  28. <groupId>org.apache.flink</groupId>
  29. <artifactId>flink-clients_${scala.binary.version}</artifactId>
  30. <version>${flink.version}</version>
  31. </dependency>
  32. <!--日志相关依赖-->
  33. <dependency>
  34. <groupId>org.slf4j</groupId>
  35. <artifactId>slf4j-api</artifactId>
  36. <version>${slf4j.version}</version>
  37. </dependency>
  38. <dependency>
  39. <groupId>org.slf4j</groupId>
  40. <artifactId>slf4j-log4j12</artifactId>
  41. <version>${slf4j.version}</version>
  42. </dependency>
  43. <dependency>
  44. <groupId>org.apache.logging.log4j</groupId>
  45. <artifactId>log4j-to-slf4j</artifactId>
  46. <version>2.14.0</version>
  47. </dependency>
  48. </dependencies>
  49. <build>
  50. <plugins>
  51. <plugin>
  52. <groupId>org.apache.maven.plugins</groupId>
  53. <artifactId>maven-compiler-plugin</artifactId>
  54. <version>3.8.1</version>
  55. <configuration>
  56. <source>${java.version}</source>
  57. <target>${java.version}</target>
  58. </configuration>
  59. </plugin>
  60. </plugins>
  61. </build>
  62. </project>

在src/main/resource目录下创建文件log4j.properties

  1. log4j.rootLogger=error,stdout
  2. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
  3. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
  4. log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

创建一个名为BatchWordCount的类

  1. import org.apache.flink.api.common.typeinfo.Types;
  2. import org.apache.flink.api.java.ExecutionEnvironment;
  3. import org.apache.flink.api.java.operators.AggregateOperator;
  4. import org.apache.flink.api.java.operators.DataSource;
  5. import org.apache.flink.api.java.operators.FlatMapOperator;
  6. import org.apache.flink.api.java.operators.UnsortedGrouping;
  7. import org.apache.flink.api.java.tuple.Tuple2;
  8. import org.apache.flink.util.Collector;
  9. public class BatchWordCount {
  10. public static void main(String[] args) throws Exception {
  11. //1.创建执行环境
  12. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  13. //2.从文件读取数据:按行读取(存储的元素就是每行的文本)
  14. DataSource<String> line = env.readTextFile("input/words.txt");
  15. //3.转换数据格式
  16. FlatMapOperator<String, Tuple2<String,Long>> wordAndOne = line.flatMap(
  17. (String lineWord, Collector<Tuple2<String,Long>> out) -> {
  18. String[] words = lineWord.split(" ");
  19. for (String word : words) {
  20. out.collect(Tuple2.of(word,1L));
  21. }
  22. }
  23. //当lambda表达式使用Java泛型的时候,由于泛型擦除的存在而需要显示的声明类型信息
  24. ).returns(Types.TUPLE(Types.STRING,Types.LONG));
  25. //4.按照word分组
  26. UnsortedGrouping<Tuple2<String,Long>> wordAndOneUG = wordAndOne.groupBy(0);
  27. //5.分组内聚合统计
  28. AggregateOperator<Tuple2<String,Long>> sum = wordAndOneUG.sum(1);
  29. sum.print();
  30. }
  31. }

然后在项目根目录创建input文件夹,在文件夹中新建words.txt文件,内容如下

  1. hello world
  2. hello java
  3. hello flink

然后我们运行main方法,得到如下结果
2.png
代码说明和注意事项如下:

  1. Flink同时提供了Java和Scala两种语言的API,有些类在两套API名称是一样的,在导包时要选用你使用的语言的包
  2. 我们的目标是将每个单词的对应个数统计出来,为此调用flatMap()方法可以对一行文字进行分词转换。将文件中的每行文字拆分成单词后,要转换成(word,count)形式的二元组,初始count都为1。return()方法指定的返回数据类型Tuple2就是Flink自带的二元组数据类型
  3. 在分组时调用了groupBy()方法,它不能使用分组选择器,只能采用位置索引或类属性名称进行分组,sum()方法也同样

    需要注意的是,这种代码的实现方式是基于DataSet API的,即我们对数据的处理转换是看作数据集来进行操作的。事实上,Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没必要用两套不同的API来实现。因此从Flink1.12开始,官方推荐的做法是直接使用DataStream API,在提交任务时,通过将执行模式设为BATCH来进行批处理

  1. flink run -Dexecution.runtime-mode-mode=BATCH BatchWordCount.jar
  1. 这样,DataSet API就已经处于软弃用的状态了,在实际应用只要维护一套DataStream API就可以了,这里是为了便于理解,依然用了DataSet API实现了批处理。
  2. 那么我们再使用流批统一后的DataStream API再来实现一遍
  1. import org.apache.flink.api.common.typeinfo.Types;
  2. import org.apache.flink.api.java.tuple.Tuple2;
  3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  4. import org.apache.flink.streaming.api.datastream.KeyedStream;
  5. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.util.Collector;
  8. import java.util.Arrays;
  9. public class BoundedStreamWordCount {
  10. public static void main(String[] args) throws Exception {
  11. //1.创建流式执行环境
  12. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  13. //2.读取文件
  14. DataStreamSource<String> lineDSS = env.readTextFile("input/words.txt");
  15. //3.转换数据格式
  16. SingleOutputStreamOperator<Tuple2<String,Long>> wordAndOne = lineDSS.flatMap(
  17. (String line, Collector<String> words) -> {
  18. Arrays.stream(line.split(" ")).forEach(words::collect);
  19. }
  20. ).returns(Types.STRING).map(word -> Tuple2.of(word,1L)).returns(Types.TUPLE(Types.STRING,Types.LONG));
  21. //4.分组
  22. KeyedStream<Tuple2<String, Long>, String> wordAndOneKs = wordAndOne.keyBy(t -> t.f0);
  23. //5.求和
  24. SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKs.sum(1);
  25. //6.打印
  26. sum.print();
  27. //7.执行
  28. env.execute();
  29. }
  30. }

结果:
3.png
我们可以看到,这与批处理的结果是完全不同的。批处理针对每个单词,只会输出一个最终的统计个数;而在流处理的打印结果中,“hello”这个单词每出现一次,都会有一个频次统计 数据输出。这就是流处理的特点,数据逐个处理,每来一条数据就会处理输出一次。我们通过打印结果,可以清晰地看到单词“hello”数量增长的过程。

  1. 关于输出语句乱序与前面数字的疑问,我们要知道,Flink 是一个分布式处理引擎,所以我们的程序应该也是分 布式运行的。在开发环境里,会通过多线程来模拟 Flink 集群运行。所以这里结果前的数字, 其实就指示了本地执行的不同线程,对应着 Flink 运行时不同的并行资源。这样第一个乱序的问题也就解决了,既然是并行执行,不同线程的输出结果,自然也就无法保持输入的顺序了。 然后前面的数字代表是哪个线程执行的,由于我的电脑有12线程,所以默认并行线程有12个。这段代码不同的运行环境,得到的结果是不同的。

3.Flink在实际环境中的流处理

  1. 在实际的生产环境中,真正的数据流其实是无界的,有开始没有结束,这就需要我们保持一个监听事件的状态,持续地处理捕获的数据。为了模拟这种场景,我们就不通过文件获取数据了,而是监听数据发送端主机的指定端口,统计发送来的文本数据中出现过单词的个数。
  1. import org.apache.flink.api.common.typeinfo.Types;
  2. import org.apache.flink.api.java.tuple.Tuple2;
  3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  4. import org.apache.flink.streaming.api.datastream.KeyedStream;
  5. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.util.Collector;
  8. import java.util.Arrays;
  9. public class StreamWordCount {
  10. public static void main(String[] args) throws Exception {
  11. //1.创建流式执行环境
  12. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  13. //2.读取文件
  14. DataStreamSource<String> lineDSS = env.socketTextStream("hadoop0",9999);
  15. //3.转换数据格式
  16. SingleOutputStreamOperator<Tuple2<String,Long>> wordAndOne = lineDSS.flatMap(
  17. (String line, Collector<String> words) -> {
  18. Arrays.stream(line.split(" ")).forEach(words::collect);
  19. }
  20. ).returns(Types.STRING).map(word -> Tuple2.of(word,1L)).returns(Types.TUPLE(Types.STRING,Types.LONG));
  21. //4.分组
  22. KeyedStream<Tuple2<String, Long>, String> wordAndOneKs = wordAndOne.keyBy(t -> t.f0);
  23. //5.求和
  24. SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKs.sum(1);
  25. //6.打印
  26. sum.print();
  27. //7.执行
  28. env.execute();
  29. }
  30. }

然后我们利用一下liunx的一个小工具netcat来测试一下,首先启动netcat

  1. nc -lk 9999

然后执行main方法,结果如下:
flink.gif
通过这三个例子我们不难看出,Flink为我们提供了非常简便的API,基于它进行开发并不是难事,以后会逐步升入展开的去了解Flink