1. 创建项目
<properties><flink.version>1.13.0</flink.version><java.version>1.8</java.version><scala.binary.version>2.12</scala.binary.version><slf4j.version>1.7.30</slf4j.version></properties>...<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>
2. 编写代码
2.1 批处理
批处理读取到文件后,统一进行处理
import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.api.java.operators.AggregateOperator;import org.apache.flink.api.java.operators.DataSource;import org.apache.flink.api.java.operators.FlatMapOperator;import org.apache.flink.api.java.operators.UnsortedGrouping;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.util.Collector;public class BatchWordCount {public static void main(String[] args) throws Exception {// 1. 创建执行环境 --> DataSet 级别 API(批处理)ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 2. 从文件读取数据 按行读取(存储的元素就是每行的文本)DataSource<String> lineDS = env.readTextFile("input/words.txt");// 3. 转换数据格式FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}).returns(Types.TUPLE(Types.STRING, Types.LONG)); //当Lambda表达式使用 Java 泛型的时候, 由于泛型擦除的存在, 需要显示的声明类型信息// 4. 按照 word 进行分组UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);// 5. 分组内聚合统计AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);// 6. 打印结果sum.print();}}------------------------------(java,1)(flink,1)(world,1)(hello,3)
2.2 流处理
一行一行读取输入的内容,一行一行的处理
2.2.1 读取文件(有界流)
package com.atguigu.chapter02;/*** Copyright (c) 2020-2030 尚硅谷 All Rights Reserved* <p>* Project: FlinkTutorial* <p>* Created by wushengran*/import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.KeyedStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;import java.util.Arrays;public class BoundedStreamWordCount {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境 --> DataStream 级别 APIStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取文件DataStreamSource<String> lineDSS = env.readTextFile("input/words.txt");// 3. 转换数据格式SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS.flatMap((String line, Collector<String> words) -> {Arrays.stream(line.split(" ")).forEach(words::collect);}).returns(Types.STRING).map(word -> Tuple2.of(word, 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG));// 4. 分组KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne.keyBy(t -> t.f0);// 5. 求和SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKS.sum(1);// 6. 打印result.print();// 7. 执行env.execute();}}------------------------------Tip:group by 后,同一个分组同一个线程处理;flink使用多线程进行异步处理。3> (world,1)2> (hello,1)4> (flink,1)2> (hello,2)2> (hello,3)1> (java,1)
Flink 是一个分布式处理引擎,所以我们的程序应该也是分布式运行的。在开发环境里,会通过多线程来模拟 Flink 集群运行。所以这里结果前的数字,其实就指示了本地执行的不同线程,对应着 Flink 运行时不同的并行资源。这样第一个乱序的问题也就解决了:既然是并行执行,不同线程的输出结果,自然也就无法保持输入的顺序了。
2.2.2 读取文本流(无界流)
import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.KeyedStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;import java.util.Arrays;public class StreamWordCount {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取文本流DataStreamSource<String> lineDSS = env.socketTextStream("hadoop102", 7777);// 3. 转换数据格式SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS.flatMap((String line, Collector<String> words) -> {Arrays.stream(line.split(" ")).forEach(words::collect);}).returns(Types.STRING).map(word -> Tuple2.of(word, 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG));// 4. 分组KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne.keyBy(t -> t.f0);// 5. 求和SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKS.sum(1);// 6. 打印result.print();// 7. 执行env.execute();}}------------------------------从 服务器 发送数据:hello flinkhello worldhello java可以看到控制台输出结果如下:4> (flink,1)2> (hello,1)3> (world,1)2> (hello,2)2> (hello,3)1> (java,1)
- 代码说明和注意事项:
- socket 文本流的读取需要配置两个参数:发送端主机名和端口号。这里代码中指定了主机“hadoop102”的 7777 端口作为发送数据的 socket 端口,读者可以根据测试环境自行配置;
- 在实际项目应用中,主机名和端口号这类信息往往可以通过配置文件,或者传入程序运行参数的方式来指定;
- socket文本流数据的发送,可以通过Linux系统自带的netcat工具进行模拟。
- 在 Linux 环境的主机 hadoop102 上,执行下列命令,发送数据进行测试:
$ nc -lk 7777
3. 总结
本章主要实现一个 Flink 开发的入门程序——词频统计 WordCount。通过批处理和流处理两种不同模式的实现,可以对 Flink 的 API 风格和编程方式有所熟悉,并且更加深刻地理解批处理和流处理的不同。另外,通过读取有界数据(文件)和无界数据(socket 文本流)进行流处理的比较,我们也可以更加直观地体会到 Flink 流处理的方式和特点。
