1.认识Flink
Flink是Apache软件基金会旗下的一个开源大数据处理框架。目前Flink已经成为各大公司大数据实时处理的发力重点,国内以阿里巴巴为代表的一些大型互联网企业都在全力投入,贡献了大量源码。如今,Flink已被很多人认为是大数据实时处理的方向与未来。Flink起源于一个叫做Stratosphere的项目,它是由3所地处柏林的大学和欧洲其他一些大学共同进行的研究项目,由柏林工业大学的教授沃可尔·马尔科(Volker Markl)领衔开发。2014年4月,Stratosphere的代码被复制并捐赠给了Apache软件基金会,Flink就是在此基础上被重新设计出来的。
Flink的发展时间线
- 2014年8月,Flink的第一个版本0.6正式发布(之前的版本都在Stratosphere名下)。与此同时,Flink的几位核心开发者创办了Data Artisans公司,主要做Flink的商业应用,帮助企业部署大规模数据处理解决方案
 - 2014年12月,Flink项目完成了孵化,一跃成为Apache软件基金会的顶级项目
 - 2015年4月,Flink发布了里程碑式的重要版本0.9.0,很多国内外大公司也是从这时开始关注参与Flink社区的建设的。
 - 2019年1月,长期对Flink投入研发的阿里巴巴以9000万欧元收购了Data Artisans公司;之后又将自己的内部版本Blink开源,继而与8月发布的Flink1.9.0版本进行了合并。自此之后,Flink遍地开花,称为了当前最火的新一代大数据处理框架之一。
 
2.Flink基本Demo的代码实现
上面我们了解了一下Flink的诞生基本概念,核心概念我们边用边了解,先来手过一遍一个基本Demo--wordcount我们创建一个maven项目,pom.xml如下所示
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.flink</groupId><artifactId>springboot_flink</artifactId><version>1.0</version><properties><flink.version>1.13.0</flink.version><java.version>1.8</java.version><scala.binary.version>2.12</scala.binary.version><slf4j.version>1.7.30</slf4j.version></properties><dependencies><!--fink相关依赖--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!--日志相关依赖--><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>2.14.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>${java.version}</source><target>${java.version}</target></configuration></plugin></plugins></build></project>
在src/main/resource目录下创建文件log4j.properties
log4j.rootLogger=error,stdoutlog4j.appender.stdout=org.apache.log4j.ConsoleAppenderlog4j.appender.stdout.layout=org.apache.log4j.PatternLayoutlog4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
创建一个名为BatchWordCount的类
import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.api.java.operators.AggregateOperator;import org.apache.flink.api.java.operators.DataSource;import org.apache.flink.api.java.operators.FlatMapOperator;import org.apache.flink.api.java.operators.UnsortedGrouping;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.util.Collector;public class BatchWordCount {public static void main(String[] args) throws Exception {//1.创建执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//2.从文件读取数据:按行读取(存储的元素就是每行的文本)DataSource<String> line = env.readTextFile("input/words.txt");//3.转换数据格式FlatMapOperator<String, Tuple2<String,Long>> wordAndOne = line.flatMap((String lineWord, Collector<Tuple2<String,Long>> out) -> {String[] words = lineWord.split(" ");for (String word : words) {out.collect(Tuple2.of(word,1L));}}//当lambda表达式使用Java泛型的时候,由于泛型擦除的存在而需要显示的声明类型信息).returns(Types.TUPLE(Types.STRING,Types.LONG));//4.按照word分组UnsortedGrouping<Tuple2<String,Long>> wordAndOneUG = wordAndOne.groupBy(0);//5.分组内聚合统计AggregateOperator<Tuple2<String,Long>> sum = wordAndOneUG.sum(1);sum.print();}}
然后在项目根目录创建input文件夹,在文件夹中新建words.txt文件,内容如下
hello worldhello javahello flink
然后我们运行main方法,得到如下结果
代码说明和注意事项如下:
- Flink同时提供了Java和Scala两种语言的API,有些类在两套API名称是一样的,在导包时要选用你使用的语言的包
 - 我们的目标是将每个单词的对应个数统计出来,为此调用flatMap()方法可以对一行文字进行分词转换。将文件中的每行文字拆分成单词后,要转换成(word,count)形式的二元组,初始count都为1。return()方法指定的返回数据类型Tuple2就是Flink自带的二元组数据类型
 在分组时调用了groupBy()方法,它不能使用分组选择器,只能采用位置索引或类属性名称进行分组,sum()方法也同样
需要注意的是,这种代码的实现方式是基于DataSet API的,即我们对数据的处理转换是看作数据集来进行操作的。事实上,Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没必要用两套不同的API来实现。因此从Flink1.12开始,官方推荐的做法是直接使用DataStream API,在提交任务时,通过将执行模式设为BATCH来进行批处理
flink run -Dexecution.runtime-mode-mode=BATCH BatchWordCount.jar
这样,DataSet API就已经处于软弃用的状态了,在实际应用只要维护一套DataStream API就可以了,这里是为了便于理解,依然用了DataSet API实现了批处理。那么我们再使用流批统一后的DataStream API再来实现一遍
import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.KeyedStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;import java.util.Arrays;public class BoundedStreamWordCount {public static void main(String[] args) throws Exception {//1.创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.读取文件DataStreamSource<String> lineDSS = env.readTextFile("input/words.txt");//3.转换数据格式SingleOutputStreamOperator<Tuple2<String,Long>> wordAndOne = lineDSS.flatMap((String line, Collector<String> words) -> {Arrays.stream(line.split(" ")).forEach(words::collect);}).returns(Types.STRING).map(word -> Tuple2.of(word,1L)).returns(Types.TUPLE(Types.STRING,Types.LONG));//4.分组KeyedStream<Tuple2<String, Long>, String> wordAndOneKs = wordAndOne.keyBy(t -> t.f0);//5.求和SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKs.sum(1);//6.打印sum.print();//7.执行env.execute();}}
结果:
    我们可以看到,这与批处理的结果是完全不同的。批处理针对每个单词,只会输出一个最终的统计个数;而在流处理的打印结果中,“hello”这个单词每出现一次,都会有一个频次统计 数据输出。这就是流处理的特点,数据逐个处理,每来一条数据就会处理输出一次。我们通过打印结果,可以清晰地看到单词“hello”数量增长的过程。
关于输出语句乱序与前面数字的疑问,我们要知道,Flink 是一个分布式处理引擎,所以我们的程序应该也是分 布式运行的。在开发环境里,会通过多线程来模拟 Flink 集群运行。所以这里结果前的数字, 其实就指示了本地执行的不同线程,对应着 Flink 运行时不同的并行资源。这样第一个乱序的问题也就解决了,既然是并行执行,不同线程的输出结果,自然也就无法保持输入的顺序了。 然后前面的数字代表是哪个线程执行的,由于我的电脑有12线程,所以默认并行线程有12个。这段代码不同的运行环境,得到的结果是不同的。
3.Flink在实际环境中的流处理
在实际的生产环境中,真正的数据流其实是无界的,有开始没有结束,这就需要我们保持一个监听事件的状态,持续地处理捕获的数据。为了模拟这种场景,我们就不通过文件获取数据了,而是监听数据发送端主机的指定端口,统计发送来的文本数据中出现过单词的个数。
import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.KeyedStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;import java.util.Arrays;public class StreamWordCount {public static void main(String[] args) throws Exception {//1.创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.读取文件DataStreamSource<String> lineDSS = env.socketTextStream("hadoop0",9999);//3.转换数据格式SingleOutputStreamOperator<Tuple2<String,Long>> wordAndOne = lineDSS.flatMap((String line, Collector<String> words) -> {Arrays.stream(line.split(" ")).forEach(words::collect);}).returns(Types.STRING).map(word -> Tuple2.of(word,1L)).returns(Types.TUPLE(Types.STRING,Types.LONG));//4.分组KeyedStream<Tuple2<String, Long>, String> wordAndOneKs = wordAndOne.keyBy(t -> t.f0);//5.求和SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKs.sum(1);//6.打印sum.print();//7.执行env.execute();}}
然后我们利用一下liunx的一个小工具netcat来测试一下,首先启动netcat
nc -lk 9999
然后执行main方法,结果如下:
通过这三个例子我们不难看出,Flink为我们提供了非常简便的API,基于它进行开发并不是难事,以后会逐步升入展开的去了解Flink

