1.认识Flink
Flink是Apache软件基金会旗下的一个开源大数据处理框架。目前Flink已经成为各大公司大数据实时处理的发力重点,国内以阿里巴巴为代表的一些大型互联网企业都在全力投入,贡献了大量源码。如今,Flink已被很多人认为是大数据实时处理的方向与未来。
Flink起源于一个叫做Stratosphere的项目,它是由3所地处柏林的大学和欧洲其他一些大学共同进行的研究项目,由柏林工业大学的教授沃可尔·马尔科(Volker Markl)领衔开发。2014年4月,Stratosphere的代码被复制并捐赠给了Apache软件基金会,Flink就是在此基础上被重新设计出来的。
Flink的发展时间线
- 2014年8月,Flink的第一个版本0.6正式发布(之前的版本都在Stratosphere名下)。与此同时,Flink的几位核心开发者创办了Data Artisans公司,主要做Flink的商业应用,帮助企业部署大规模数据处理解决方案
- 2014年12月,Flink项目完成了孵化,一跃成为Apache软件基金会的顶级项目
- 2015年4月,Flink发布了里程碑式的重要版本0.9.0,很多国内外大公司也是从这时开始关注参与Flink社区的建设的。
- 2019年1月,长期对Flink投入研发的阿里巴巴以9000万欧元收购了Data Artisans公司;之后又将自己的内部版本Blink开源,继而与8月发布的Flink1.9.0版本进行了合并。自此之后,Flink遍地开花,称为了当前最火的新一代大数据处理框架之一。
2.Flink基本Demo的代码实现
上面我们了解了一下Flink的诞生基本概念,核心概念我们边用边了解,先来手过一遍一个基本Demo--wordcount
我们创建一个maven项目,pom.xml如下所示
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.flink</groupId>
<artifactId>springboot_flink</artifactId>
<version>1.0</version>
<properties>
<flink.version>1.13.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<!--fink相关依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--日志相关依赖-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
在src/main/resource目录下创建文件log4j.properties
log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
创建一个名为BatchWordCount的类
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class BatchWordCount {
public static void main(String[] args) throws Exception {
//1.创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//2.从文件读取数据:按行读取(存储的元素就是每行的文本)
DataSource<String> line = env.readTextFile("input/words.txt");
//3.转换数据格式
FlatMapOperator<String, Tuple2<String,Long>> wordAndOne = line.flatMap(
(String lineWord, Collector<Tuple2<String,Long>> out) -> {
String[] words = lineWord.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word,1L));
}
}
//当lambda表达式使用Java泛型的时候,由于泛型擦除的存在而需要显示的声明类型信息
).returns(Types.TUPLE(Types.STRING,Types.LONG));
//4.按照word分组
UnsortedGrouping<Tuple2<String,Long>> wordAndOneUG = wordAndOne.groupBy(0);
//5.分组内聚合统计
AggregateOperator<Tuple2<String,Long>> sum = wordAndOneUG.sum(1);
sum.print();
}
}
然后在项目根目录创建input文件夹,在文件夹中新建words.txt文件,内容如下
hello world
hello java
hello flink
然后我们运行main方法,得到如下结果
代码说明和注意事项如下:
- Flink同时提供了Java和Scala两种语言的API,有些类在两套API名称是一样的,在导包时要选用你使用的语言的包
- 我们的目标是将每个单词的对应个数统计出来,为此调用flatMap()方法可以对一行文字进行分词转换。将文件中的每行文字拆分成单词后,要转换成(word,count)形式的二元组,初始count都为1。return()方法指定的返回数据类型Tuple2就是Flink自带的二元组数据类型
在分组时调用了groupBy()方法,它不能使用分组选择器,只能采用位置索引或类属性名称进行分组,sum()方法也同样
需要注意的是,这种代码的实现方式是基于DataSet API的,即我们对数据的处理转换是看作数据集来进行操作的。事实上,Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没必要用两套不同的API来实现。因此从Flink1.12开始,官方推荐的做法是直接使用DataStream API,在提交任务时,通过将执行模式设为BATCH来进行批处理
flink run -Dexecution.runtime-mode-mode=BATCH BatchWordCount.jar
这样,DataSet API就已经处于软弃用的状态了,在实际应用只要维护一套DataStream API就可以了,这里是为了便于理解,依然用了DataSet API实现了批处理。
那么我们再使用流批统一后的DataStream API再来实现一遍
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class BoundedStreamWordCount {
public static void main(String[] args) throws Exception {
//1.创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.读取文件
DataStreamSource<String> lineDSS = env.readTextFile("input/words.txt");
//3.转换数据格式
SingleOutputStreamOperator<Tuple2<String,Long>> wordAndOne = lineDSS.flatMap(
(String line, Collector<String> words) -> {
Arrays.stream(line.split(" ")).forEach(words::collect);
}
).returns(Types.STRING).map(word -> Tuple2.of(word,1L)).returns(Types.TUPLE(Types.STRING,Types.LONG));
//4.分组
KeyedStream<Tuple2<String, Long>, String> wordAndOneKs = wordAndOne.keyBy(t -> t.f0);
//5.求和
SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKs.sum(1);
//6.打印
sum.print();
//7.执行
env.execute();
}
}
结果:
我们可以看到,这与批处理的结果是完全不同的。批处理针对每个单词,只会输出一个最终的统计个数;而在流处理的打印结果中,“hello”这个单词每出现一次,都会有一个频次统计 数据输出。这就是流处理的特点,数据逐个处理,每来一条数据就会处理输出一次。我们通过打印结果,可以清晰地看到单词“hello”数量增长的过程。
关于输出语句乱序与前面数字的疑问,我们要知道,Flink 是一个分布式处理引擎,所以我们的程序应该也是分 布式运行的。在开发环境里,会通过多线程来模拟 Flink 集群运行。所以这里结果前的数字, 其实就指示了本地执行的不同线程,对应着 Flink 运行时不同的并行资源。这样第一个乱序的问题也就解决了,既然是并行执行,不同线程的输出结果,自然也就无法保持输入的顺序了。 然后前面的数字代表是哪个线程执行的,由于我的电脑有12线程,所以默认并行线程有12个。这段代码不同的运行环境,得到的结果是不同的。
3.Flink在实际环境中的流处理
在实际的生产环境中,真正的数据流其实是无界的,有开始没有结束,这就需要我们保持一个监听事件的状态,持续地处理捕获的数据。为了模拟这种场景,我们就不通过文件获取数据了,而是监听数据发送端主机的指定端口,统计发送来的文本数据中出现过单词的个数。
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class StreamWordCount {
public static void main(String[] args) throws Exception {
//1.创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.读取文件
DataStreamSource<String> lineDSS = env.socketTextStream("hadoop0",9999);
//3.转换数据格式
SingleOutputStreamOperator<Tuple2<String,Long>> wordAndOne = lineDSS.flatMap(
(String line, Collector<String> words) -> {
Arrays.stream(line.split(" ")).forEach(words::collect);
}
).returns(Types.STRING).map(word -> Tuple2.of(word,1L)).returns(Types.TUPLE(Types.STRING,Types.LONG));
//4.分组
KeyedStream<Tuple2<String, Long>, String> wordAndOneKs = wordAndOne.keyBy(t -> t.f0);
//5.求和
SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKs.sum(1);
//6.打印
sum.print();
//7.执行
env.execute();
}
}
然后我们利用一下liunx的一个小工具netcat来测试一下,首先启动netcat
nc -lk 9999
然后执行main方法,结果如下:
通过这三个例子我们不难看出,Flink为我们提供了非常简便的API,基于它进行开发并不是难事,以后会逐步升入展开的去了解Flink