word-count

批处理

对于批处理而言,输入的应该是收集好的数据集。这里我们可以将要统计的文字,写入一 个文本文档,然后读取这个文件处理数据就可以了。
(1)在工程根目录下新建一个 input 文件夹,并在下面创建文本文件 words.txt
(2)在 words.txt 中输入一些文字,例如:

  1. hello world
  2. hello flink
  3. hello java

(3)在 com.why.wordCount 包下新建 Java 类 BatchWordCount,在静态 main 方法中编 写测试代码。
我们进行单词频次统计的基本思路是:
先逐行读入文件数据,然后将每一行文字拆分成单 词;接着按照单词分组,统计每组数据的个数,就是对应单词的频次。

  1. package com.why.wordCount;
  2. import org.apache.flink.api.common.typeinfo.Types;
  3. import org.apache.flink.api.java.ExecutionEnvironment;
  4. import org.apache.flink.api.java.operators.AggregateOperator;
  5. import org.apache.flink.api.java.operators.DataSource;
  6. import org.apache.flink.api.java.operators.FlatMapOperator;
  7. import org.apache.flink.api.java.operators.UnsortedGrouping;
  8. import org.apache.flink.api.java.tuple.Tuple2;
  9. import org.apache.flink.util.Collector;
  10. public class BatchWordCount {
  11. public static void main(String[] args) throws Exception {
  12. // 1. 创建执行环境
  13. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  14. // 2. 从文件读取数据 按行读取(存储的元素就是每行的文本)
  15. DataSource<String> lineDS = env.readTextFile("input/word.txt");
  16. // 3. 转换数据格式
  17. FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS
  18. .flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
  19. String[] words = line.split(" ");
  20. for (String word : words) {
  21. out.collect(Tuple2.of(word, 1L));
  22. }
  23. }).returns(Types.TUPLE(Types.STRING, Types.LONG)); //当 Lambda 表达式使用 Java 泛型的时候, 由于泛型擦除的存在, 需要显示的声明类型信息
  24. // 4. 按照 word 进行分组
  25. UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);
  26. // 5. 分组内聚合统计
  27. AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);
  28. // 6. 打印结果
  29. sum.print();
  30. }
  31. }

代码说明和注意事项:
① Flink 在执行应用程序前应该获取执行环境对象,也就是运行时上下文环境。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
② Flink 同时提供了 Java 和 Scala 两种语言的 API,有些类在两套 API 中名称是一样的。 所以在引入包时,如果有 Java 和 Scala 两种选择,要注意选用 Java 的包。
③ 直接调用执行环境的 readTextFile 方法,可以从文件中读取数据。
④我们的目标是将每个单词对应的个数统计出来,所以调用 flatmap 方法可以对一行文字 进行分词转换。将文件中每一行文字拆分成单词后,要转换成(word,count)形式的二元组,初始 count 都为 1。returns 方法指定的返回数据类型 Tuple2,就是 Flink 自带的二元组数据类型。
⑤ 在分组时调用了 groupBy 方法,它不能使用分组选择器,只能采用位置索引或属性名 称进行分组。

  1. // 使用索引定位
  2. dataStream.groupBy(0)
  3. // 使用类属性名称
  4. dataStream.groupBy("id")

⑤ 在分组之后调用 sum 方法进行聚合,同样只能指定聚合字段的位置索引或属性名称。
(4) 运行程序,控制台会打印出结果:

  1. (java,1)
  2. (flink,1)
  3. (world,1)
  4. (hello,3)

可以看到,我们将文档中的所有单词的频次,全部统计出来,以二元组的形式在控制台打印输出了。
需要注意的是,这种代码的实现方式,是基于 DataSet API 的,也就是我们对数据的处理 转换,是看作数据集来进行操作的。事实上 Flink 本身是流批统一的处理架构,批量的数据集
本质上也是流,没有必要用两套不同的 API 来实现。所以从 Flink 1.12 开始,官方推荐的做法 是直接使用 DataStream API,在提交任务时通过将执行模式设为 BATCH 来进行批处理:

  1. $ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

这样,DataSet API 就已经处于“软弃用”(soft deprecated)的状态,在实际应用中我们只 要维护一套 DataStream API 就可以了。这里只是为了方便大家理解,我们依然用 DataSet API 做了批处理的实现

流处理

我们已经知道,用 DataSet API 可以很容易地实现批处理;与之对应,流处理当然可以用 DataStream API 来实现。对于 Flink 而言,流才是整个处理逻辑的底层核心,所以流批统一之 后的 DataStream API 更加强大,可以直接处理批处理和流处理的所有场景。
DataStream API 作为“数据流”的处理接口,又怎样处理批数据呢? 回忆一下上一章中我们讲到的 Flink 世界观。在 Flink 的视角里,一切数据都可以认为是 流,流数据是无界流,而批数据则是有界流。所以批处理,其实就可以看作有界流的处理。
对于流而言,我们会在获取输入数据后立即处理,这个过程是连续不断的。当然,有时我 们的输入数据可能会有尽头,这看起来似乎就成了一个有界流;但是它跟批处理是截然不同的 ——在输入结束之前,我们依然会认为数据是无穷无尽的,处理模式也仍旧是连续逐个处理。
下面我们就针对不同类型的输入数据源,用具体的代码来实现流处理。
1. 读取文件
我们同样试图读取文档 words.txt 中的数据,并统计每个单词出现的频次。这是一个“有 界流”的处理,整体思路与之前的批处理非常类似,代码模式也基本一致。
(1) 在 com.atguigu.wc 包下新建 Java 类 BoundedStreamWordCount,在静态 main 方法中 编写测试代码。具体代码实现如下:

  1. package com.why.wordCount;
  2. import org.apache.flink.api.common.typeinfo.Types;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  5. import org.apache.flink.streaming.api.datastream.KeyedStream;
  6. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.util.Collector;
  9. import java.util.Arrays;
  10. public class BoundedStreamWordCount {
  11. public static void main(String[] args) throws Exception {
  12. // 1. 创建流式执行环境
  13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14. // 2. 读取文件
  15. DataStreamSource<String> lineDSS = env.readTextFile("input/word.txt");
  16. // 3. 转换数据格式
  17. SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS
  18. .flatMap((String line, Collector<String> words) -> {
  19. Arrays.stream(line.split(" ")).forEach(words::collect);
  20. })
  21. .returns(Types.STRING)
  22. .map(word -> Tuple2.of(word, 1L))
  23. .returns(Types.TUPLE(Types.STRING, Types.LONG));
  24. // 4. 分组
  25. KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne.keyBy(t -> t.f0);
  26. // 5. 求和
  27. SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKS
  28. .sum(1);
  29. // 6. 打印
  30. result.print();
  31. // 7. 执行
  32. env.execute();
  33. }
  34. }

主要观察与批处理程序 BatchWordCount 的不同:
⚫ 创建执行环境的不同,流处理程序使用的是 StreamExecutionEnvironment。
⚫ 每一步处理转换之后,得到的数据对象类型不同。
⚫ 分组操作调用的是 keyBy 方法,可以传入一个匿名函数作为键选择器 (KeySelector),指定当前分组的 key 是什么。
⚫ 代码末尾需要调用 env 的 execute 方法,开始执行任务。
(2) 运行程序,控制台输出结果如下:

  1. 4> (hello,1)
  2. 2> (java,1)
  3. 7> (world,1)
  4. 4> (hello,2)
  5. 4> (hello,3)
  6. 10> (flink,1)

我们可以看到,这与批处理的结果是完全不同的。批处理针对每个单词,只会输出一个最 终的统计个数;而在流处理的打印结果中,“hello”这个单词每出现一次,都会有一个频次统计 数据输出。这就是流处理的特点,数据逐个处理,每来一条数据就会处理输出一次。我们通过 打印结果,可以清晰地看到单词“hello”数量增长的过程。 看到这里大家可能又会有新的疑惑:我们读取文件,第一行应该是“hello flink”,怎么这 里输出的第一个单词是“world”呢?每个输出的结果二元组,前面都有一个数字,这又是什 么呢?
我们可以先做个简单的解释。Flink 是一个分布式处理引擎,所以我们的程序应该也是分 布式运行的。在开发环境里,会通过多线程来模拟 Flink 集群运行。所以这里结果前的数字, 其实就指示了本地执行的不同线程,对应着 Flink 运行时不同的并行资源。这样第一个乱序的 问题也就解决了:既然是并行执行,不同线程的输出结果,自然也就无法保持输入的顺序了。 另外需要说明,这里显示的编号为 1~4,是由于运行电脑的 CPU 是 4 核,所以默认模拟 的并行线程有 4 个。这段代码不同的运行环境,得到的结果会是不同的。关于 Flink 程序并行 执行的数量,可以通过设定“并行度”(Parallelism)来进行配置,我们会在后续章节详细讲解 这些内容。

读取文本流

在实际的生产环境中,真正的数据流其实是无界的,有开始却没有结束,这就要求我们需 要保持一个监听事件的状态,持续地处理捕获的数据。 为了模拟这种场景,我们就不再通过读取文件来获取数据了,而是监听数据发送端主机的 指定端口,统计发送来的文本数据中出现过的单词的个数。具体实现上,我们只要对 BoundedStreamWordCount 代码中读取数据的步骤稍做修改,就可以实现对真正无界流的处理。 (1)新建一个 Java 类 StreamWordCount,将 BoundedStreamWordCount 代码中读取文件 数据的 readTextFile 方法,替换成读取 socket 文本流的方法 socketTextStream。具体代码实现如 下:

  1. package com.why.wordCount;
  2. import org.apache.flink.api.common.typeinfo.Types;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  5. import org.apache.flink.streaming.api.datastream.KeyedStream;
  6. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.util.Collector;
  9. import java.util.Arrays;
  10. public class StreamWordCount {
  11. public static void main(String[] args) throws Exception {
  12. // 1. 创建流式执行环境
  13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14. // 2. 读取文本流
  15. DataStreamSource<String> lineDSS = env.socketTextStream("hadoop102",
  16. 7777);
  17. // 3. 转换数据格式
  18. SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS
  19. .flatMap((String line, Collector<String> words) -> {
  20. Arrays.stream(line.split(" ")).forEach(words::collect);
  21. })
  22. .returns(Types.STRING)
  23. .map(word -> Tuple2.of(word, 1L))
  24. .returns(Types.TUPLE(Types.STRING, Types.LONG));
  25. // 4. 分组
  26. KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne.keyBy(t -> t.f0);
  27. // 5. 求和
  28. SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKS.sum(1);
  29. // 6. 打印
  30. result.print();
  31. // 7. 执行
  32. env.execute();
  33. }
  34. }

代码说明和注意事项:
⚫ socket 文本流的读取需要配置两个参数:发送端主机名和端口号。这里代码 中指定了主机“hadoop102”的 7777 端口作为发送数据的 socket 端口,读者可以根据 测试环境自行配置。
⚫ 在实际项目应用中,主机名和端口号这类信息往往可以通过配置文件,或者 传入程序运行参数的方式来指定。
⚫ socket文本流数据的发送,可以通过Linux系统自带的netcat工具进行模拟。
(2)在 Linux 环境的主机 hadoop102 上,执行下列命令,发送数据进行测试:

  1. [atguigu@hadoop102 ~]$ nc -lk 7777

(3)启动 StreamWordCount 程序
我们会发现程序启动之后没有任何输出、也不会退出。这是正常的——因为 Flink 的流处 理是事件驱动的,当前程序会一直处于监听状态,只有接收到数据才会执行任务、输出统计结果:
(4)从 hadoop102 发送数据:

  1. hello flink
  2. hello world
  3. hello java

可以看到控制台输出结果如下:

  1. 4> (flink,1)
  2. 2> (hello,1)
  3. 3> (world,1)
  4. 2> (hello,2)
  5. 2> (hello,3)
  6. 1> (java,1)

我们会发现,输出的结果与之前读取文件的流处理非常相似。而且可以非常明显地看到, 每输入一条数据,就有一次对应的输出。具体对应关系是:输入“hello flink”,就会输出两条 统计结果(flink,1)和(hello,1);之后再输入“hello world”,同样会将 hello 和 world 的个 数统计输出,hello 的个数会对应增长为 2。

本章总结

本章主要实现一个 Flink 开发的入门程序——词频统计 WordCount。通过批处理和流处理 两种不同模式的实现,可以对 Flink 的 API 风格和编程方式有所熟悉,并且更加深刻地理解批 处理和流处理的不同。另外,通过读取有界数据(文件)和无界数据(socket 文本流)进行流 处理的比较,我们也可以更加直观地体会到 Flink 流处理的方式和特点。 这是我们 Flink 长征路上的第一步,是后续学习的基础。有了这番初体验,想必大家会发 现 Flink 提供了非常易用的 API,基于它进行开发并不是难事。之后我们会逐步深入展开,为 大家打开 Flink 神奇世界的大门。

flink部署

  1. 在上一章中,我们在集成开发环境里编写 Flink 代码,然后运行测试。细心的读者应该会发现:对于读取文本流的流处理程序,运行之后其实并不会去直接执行代码中定义好的操作—因为这时还没有数据;只有在输入数据之后,才会触发分词转换、分组统计的一系列处理操 作。可明明我们的代码顺序执行,会调用到 flatMapkeyBy sum 等一系列处理方法,这是 怎么回事呢? <br /> 这涉及 Flink 作业提交运行的原理。我们编写的代码,对应着在 Flink 集群上执行的一个 作业;所以我们在本地执行代码,其实是先模拟启动一个 Flink 集群,然后将作业提交到集群上,创建好要执行的任务等待数据输入。 <br /> 这里需要提到 Flink 中的几个关键组件:客户端(Client)、作业管理器(JobManager)和 任务管理器(TaskManager)。我们的代码,实际上是由客户端获取并做转换,之后提交给 JobManger 的。所以 JobManager 就是 Flink 集群里的“管事人”,对作业进行中央调度管理; 而它获取到要执行的作业后,会进一步处理转换,然后分发任务给众多的 TaskManager。这里的 TaskManager,就是真正“干活的人”,数据的处理操作都是它们来做的<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/2945099/1650272276559-401aa421-1acc-41d6-bc12-fe6df11ff9a9.png#clientId=u5535c7b7-d6c4-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=247&id=ufd98ddb9&margin=%5Bobject%20Object%5D&name=image.png&originHeight=309&originWidth=553&originalType=binary&ratio=1&rotation=0&showTitle=false&size=29053&status=done&style=none&taskId=u675f483f-3485-4205-9142-ed0db6e81bb&title=&width=442.4)

快速启动一个 Flink 集群

环境配置

Flink 是一个分布式的流处理框架,所以实际应用一般都需要搭建集群环境。我们在进行Flink 安装部署的学习时,需要准备 3 台 Linux 机器。具体要求如下:
⚫ 系统环境为 CentOS 7.5 版本。
⚫ 安装 Java 8。

  1. yum install -y java-1.8.0-openjdk-devel.x86_64

⚫ 安装 Hadoop 集群,Hadoop 建议选择 Hadoop 2.7.5 以上版本。
⚫ 配置集群节点服务器间时间同步以及免密登录,关闭防火墙。
本书中三台服务器的具体设置如下:
⚫ 节点服务器 1,IP 地址为 192.168.10.102,主机名为 hadoop102。
⚫ 节点服务器 2,IP 地址为 192.168.10.103,主机名为 hadoop103。
⚫ 节点服务器 3,IP 地址为 192.168.10.104,主机名为 hadoop104。

本地启动

最简单的启动方式,其实是不搭建集群,直接本地启动。本地部署非常简单,直接解压安 装包就可以使用,不用进行任何配置;一般用来做一些简单的测试。
具体安装步骤如下:

1. 下载安装包

进入 Flink 官网,下载 1.13.0 版本安装包 flink-1.13.0-bin-scala_2.12.tgz,注意此处选用对应 scala 版本为 scala 2.12 的安装包。

  1. https://dlcdn.apache.org/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.12.tgz

2. 解压

在 hadoop102 节点服务器上创建安装目录/opt/module,将 flink 安装包放在该目录下,并执行解压命令,解压至当前目录。

  1. $ tar -zxvf flink-1.13.0-bin-scala_2.12.tgz -C /opt/module/
  2. flink-1.13.0/
  3. flink-1.13.0/log/
  4. flink-1.13.0/LICENSE
  5. flink-1.

3. 启动

进入解压后的目录,执行启动命令,并查看进程。

  1. $ cd flink-1.13.0/
  2. $ bin/start-cluster.sh
  3. Starting cluster.
  4. Starting standalonesession daemon on host hadoop102.
  5. Starting taskexecutor daemon on host hadoop102.
  6. $ jps
  7. 10369 StandaloneSessionClusterEntrypoint
  8. 10680 TaskManagerRunner
  9. 10717 Jps

4. 访问 Web UI

启动成功后,访问 http://hadoop102:8081,可以对 flink 集群和任务进行监控管理,如图所示。
image.png

5. 关闭集群

如果想要让 Flink 集群停止运行,可以执行以下命令:

  1. $ bin/stop-cluster.sh
  2. Stopping taskexecutor daemon (pid: 10680) on host hadoop102.
  3. Stopping standalonesession daemon (pid: 10369) on host hadoop102.

集群启动

  1. 可以看到,Flink 本地启动非常简单,直接执行 start-cluster.sh 就可以了。如果我们想要扩 <br />展成集群,其实启动命令是不变的,主要是需要指定节点之间的主从关系。 <br /> Flink 是典型的 Master-Slave 架构的分布式数据处理框架,其中 Master 角色对应着JobManagerSlave 角色则对应 TaskManager。我们对三台节点服务器的角色分配<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/2945099/1650273046953-0252bd8a-fbcd-4230-bc2d-142257ce9503.png#clientId=u5535c7b7-d6c4-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=46&id=uf69c6dbd&margin=%5Bobject%20Object%5D&name=image.png&originHeight=58&originWidth=718&originalType=binary&ratio=1&rotation=0&showTitle=false&size=9782&status=done&style=none&taskId=u34bc4125-ad03-4a00-a46e-9da1d0b05ce&title=&width=574.4)<br />具体安装部署步骤如下:

1. 下载并解压安装包

具体操作与上节相同。

2. 修改集群配置

(1)进入 conf 目录下,修改 flink-conf.yaml 文件,修改 jobmanager.rpc.address 参数为hadoop102

  1. $ cd conf/
  2. $ vim flink-conf.yaml
  3. # JobManager 节点地址.
  4. jobmanager.rpc.address: hadoop102

这就指定了 hadoop102 节点服务器为 JobManager 节点。
(2)修改 workers 文件,将另外两台节点服务器添加为本 Flink 集群的 TaskManager 节点

  1. $ vim workers
  2. hadoop103
  3. hadoop104

这样就指定了 hadoop103 和 hadoop104 为 TaskManager 节点。
(3)另外,在 flink-conf.yaml 文件中还可以对集群中的 JobManager 和 TaskManager 组件进行优化配置,主要配置项如下:
⚫ jobmanager.memory.process.size:对 JobManager 进程可使用到的全部内存进行配置,包括JVM 元空间和其他开销,默认为 1600M,可以根据集群规模进行适当调整。
⚫ taskmanager.memory.process.size:对 TaskManager 进程可使用到的全部内存进行配置,包括JVM 元空间和其他开销,默认为 1600M,可以根据集群规模进行适当调整。
⚫ taskmanager.numberOfTaskSlots:对每个 TaskManager 能够分配的 Slot 数量进行配置,默认为1,可根据 TaskManager 所在的机器能够提供给 Flink 的 CPU 数量决定。所谓 Slot 就是 TaskManager 中具体运行一个任务所分配的计算资源。
⚫ parallelism.default:Flink 任务执行的默认并行度,优先级低于代码中进行的并行度配置和任务提交时使用参数指定的并行度数量。
关于 Slot 和并行度的概念,我们会在下一章做详细讲解。

3. 分发安装目录

配置修改完毕后,将 Flink 安装目录发给另外两个节点服务器。

  1. $ scp -r ./flink-1.13.0 atguigu@hadoop103:/opt/module
  2. $ scp -r ./flink-1.13.0 atguigu@hadoop104:/opt/module

4. 启动集群

(1)在 hadoop102 节点服务器上执行 start-cluster.sh 启动 Flink 集群:

  1. $ bin/start-cluster.sh
  2. Starting cluster.
  3. Starting standalonesession daemon on host hadoop102.
  4. Starting taskexecutor daemon on host hadoop103.
  5. Starting taskexecutor daemon on host hadoop104.

(2)查看进程情况:

  1. [atguigu@hadoop102 flink-1.13.0]$ jps
  2. 13859 Jps
  3. 13782 StandaloneSessionClusterEntrypoint
  4. [atguigu@hadoop103 flink-1.13.0]$ jps
  5. 12215 Jps
  6. 12124 TaskManagerRunner
  7. [atguigu@hadoop104 flink-1.13.0]$ jps
  8. 11602 TaskManagerRunner
  9. 11694 Jps

5. 访问 Web UI

启动成功后,同样可以访问 http://hadoop102:8081 对 flink 集群和任务进行监控管理
image.png

向集群提交作业

  1. 在上一章中,我们已经编写了词频统计的批处理和流处理的示例程序,并在开发环境的模拟集群上做了运行测试。现在既然已经有了真正的集群环境,那接下来我们就要把作业提交上 去执行了。 <br /> 本节我们将以流处理的程序为例,演示如何将任务提交到集群中进行执行。具体步骤如下。

1. 程序打包

(1)为方便自定义结构和定制依赖,我们可以引入插件 maven-assembly-plugin 进行打包。在 FlinkTutorial 项目的 pom.xml 文件中添加打包插件的配置,具体如下:

  1. <build>
  2. <plugins>
  3. <plugin>
  4. <groupId>org.apache.maven.plugins</groupId>
  5. <artifactId>maven-assembly-plugin</artifactId>
  6. <version>3.0.0</version>
  7. <configuration>
  8. <descriptorRefs>
  9. <descriptorRef>jar-with-dependencies</descriptorRef>
  10. </descriptorRefs>
  11. </configuration>
  12. <executions>
  13. <execution>
  14. <id>make-assembly</id>
  15. <phase>package</phase>
  16. <goals>
  17. <goal>single</goal>
  18. </goals>
  19. </execution>
  20. </executions>
  21. </plugin>
  22. </plugins>
  23. </build>

打 包 完 成 后 , 在 target 目 录 下 即 可 找 到 所 需 JAR 包 , JAR 包 会 有 两 个 ,FlinkTutorial-1.0-SNAPSHOT.jar 和 FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar,因为集群中已经具备任务运行所需的所有依赖,所以建议使用 FlinkTutorial-1.0-SNAPSHOT.jar。

2. 在 Web UI 上提交作业

(1)任务打包完成后,我们打开 Flink 的 WEB UI 页面,在右侧导航栏点击“Submit New Job”,然后点击按钮“+ Add New”,选择要上传运行的 JAR 包
image.png
image.png
image.png

3. 命令行提交作业

  1. $ bin/start-cluster.sh
  1. $ nc -lk 7777
  1. $ bin/flink run -m hadoop102:8081 -c com.atguigu.wc.StreamWordCount ./FlinkTutorial-1.0-SNAPSHOT.jar

这里的参数 –m 指定了提交到的 JobManager,-c 指定了入口类。
(4)在浏览器中打开 Web UI,http://hadoop102:8081 查看应用执行情况
image.png
用 netcat 输入数据,可以在 TaskManager 的标准输出(Stdout)看到对应的统计结果。
(5)在 log 日志中,也可以查看执行结果,需要找到执行该数据任务的 TaskManager 节点查看日志。

  1. $ cat flink-atguigu-taskexecutor-0-hadoop102.out
  2. SLF4J: Class path contains multiple SLF4J bindings.
  3. SLF4J: Found binding in
  4. [jar:file:/opt/module/flink-1.13.0/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j
  5. /impl/StaticLoggerBinder.class]
  6. SLF4J: Found binding in
  7. [jar:file:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.
  8. 25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  9. SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
  10. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
  11. (hello,1)
  12. (hello,2)
  13. (flink,1)
  14. (hello,3)
  15. (scala,1)

部署模式

在一些应用场景中,对于集群资源分配和占用的方式,可能会有特定的需求。Flink 为各 种场景提供了不同的部署模式,主要有以下三种:
⚫ 会话模式(Session Mode)
⚫ 单作业模式(Per-Job Mode)
⚫ 应用模式(Application Mode)
它们的区别主要在于:
集群的生命周期以及资源的分配方式;以及应用的 main 方法到底在哪里执行—客户端(Client)还是 JobManager。接下来我们就做一个展开说明。

会话模式(Session Mode)

  1. 会话模式其实最符合常规思维。我们需要先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业。集群启动时所有资源就都已经确定,所有提交的作业会竞争集群中的资源。<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/2945099/1650274362512-b8fbb0cd-e9bd-4398-97d5-f2440aa98208.png#clientId=u5535c7b7-d6c4-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=214&id=uf681686f&margin=%5Bobject%20Object%5D&name=image.png&originHeight=268&originWidth=703&originalType=binary&ratio=1&rotation=0&showTitle=false&size=19582&status=done&style=none&taskId=u235bd34e-4be9-4eb3-925a-8eddbb3a9f7&title=&width=562.4)<br /> 这样的好处很明显,我们只需要一个集群,就像一个大箱子,所有的作业提交之后都塞进去;集群的生命周期是超越于作业之上的,铁打的营盘流水的兵,作业结束了就释放资源,集群依然正常运行。当然缺点也是显而易见的:因为资源是共享的,所以资源不够了,提交新的作业就会失败。另外,同一个 TaskManager 上可能运行了很多作业,如果其中一个发生故障导 致 TaskManager 宕机,那么所有作业都会受到影响。<br /> 会话模式比较适合于单个规模小、执行时间短的大量作业。

单作业模式(Per-Job Mode)

会话模式因为资源共享会导致很多问题,所以为了更好地隔离资源,我们可以考虑为每个提交的作业启动一个集群,这就是所谓的单作业(Per-Job)模式
image.png
单作业模式也很好理解,就是严格的一对一,集群只为这个作业而生。同样由客户端运行应用程序,然后启动集群,作业被提交给 JobManager,进而分发给 TaskManager 执行。作业完成后,集群就会关闭,所有资源也会释放。这样一来,每个作业都有它自己的 JobManager管理,占用独享的资源,即使发生故障,它的 TaskManager 宕机也不会影响其他作业。
这些特性使得单作业模式在生产环境运行更加稳定,所以是实际应用的首选模式。
需要注意的是,Flink 本身无法直接这样运行,所以单作业模式一般需要借助一些资源管理框架来启动集群,比如 YARN、Kubernetes。

应用模式(Application Mode)

  1. 前面提到的两种模式下,应用代码都是在客户端上执行,然后由客户端提交给 JobManager的。但是这种方式客户端需要占用大量网络带宽,去下载依赖和把二进制数据发送给JobManager;加上很多情况下我们提交作业用的是同一个客户端,就会加重客户端所在节点的资源消耗。<br /> 所以解决办法就是,我们不要客户端了,直接把应用提交到 JobManger 上运行。而这也就代表着,我们需要为每一个提交的应用单独启动一个 JobManager,也就是创建一个集群。这个 JobManager 只为执行这一个应用而存在,执行结束之后 JobManager 也就关闭了,这就是所谓的应用模式,<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/2945099/1650274580805-064d5b60-54e9-41f0-851c-56d8a0e52f36.png#clientId=u5535c7b7-d6c4-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=161&id=u11ca6fe2&margin=%5Bobject%20Object%5D&name=image.png&originHeight=201&originWidth=720&originalType=binary&ratio=1&rotation=0&showTitle=false&size=26397&status=done&style=none&taskId=u0746256c-af90-4d78-ad59-f5fbfa20104&title=&width=576)<br /> 应用模式与单作业模式,都是提交作业之后才创建集群;单作业模式是通过客户端来提交的,客户端解析出的每一个作业对应一个集群;而应用模式下,是直接由 JobManager 执行应用程序的,并且即使应用包含了多个作业,也只创建一个集群。<br />总结一下,在会话模式下,集群的生命周期独立于集群上运行的任何作业的生命周期,并且提交的所有作业共享资源。而单作业模式为每个提交的作业创建一个集群,带来了更好的资源隔离,这时集群的生命周期与作业的生命周期绑定。最后,应用模式为每个应用程序创建一个会话集群,在 JobManager 上直接调用应用程序的 main()方法。<br /> 我们所讲到的部署模式,相对是比较抽象的概念。实际应用时,一般需要和资源管理平台结合起来,选择特定的模式来分配资源、部署应用。接下来,我们就针对不同的资源提供者(Resource Provider)的场景,具体介绍 Flink 的部署方式。

独立模式(Standalone)

独立模式(Standalone)是部署 Flink 最基本也是最简单的方式:所需要的所有 Flink 组件,都只是操作系统上运行的一个 JVM 进程。
独立模式是独立运行的,不依赖任何外部的资源管理平台;当然独立也是有代价的:如果 资源不足,或者出现故障,没有自动扩展或重分配资源的保证,必须手动处理。所以独立模式一般只用在开发测试或作业非常少的场景下。
另外,我们也可以将独立模式的集群放在容器中运行。Flink 提供了独立模式的容器化部署方式,可以在 Docker 或者 Kubernetes 上进行部署。

会话模式部署

可以发现,独立模式的特点是不依赖外部资源管理平台,而会话模式的特点是先启动集群、后提交作业。所以,我们在 以上例子 用的就是独立模式(Standalone)的会话模式部署。

单作业模式部署

Flink 本身无法直接以单作业方式启动集群,一般需要借助一些资 源管理平台。所以 Flink 的独(Standalone)集群并不支持单作业模式部署。

应用模式部署

应用模式下不会提前创建集群,所以不能调用 start-cluster.sh 脚本。我们可以使用同样在bin 目录下的 standalone-job.sh 来创建一个 JobManager。
具体步骤如下:

  1. $ cp ./FlinkTutorial-1.0-SNAPSHOT.jar lib/
  1. $ ./bin/standalone-job.sh start --job-classname com.atguigu.wc.StreamWordCount

这里我们直接指定作业入口类,脚本会到 lib 目录扫描所有的 jar 包。

  1. $ ./bin/taskmanager.sh start
  1. $ ./bin/standalone-job.sh stop
  2. $ ./bin/taskmanager.sh stop

高可用(High Availability )

分布式除了提供高吞吐,另一大好处就是有更好的容错性。对于 Flink 而言,因为一般会有多个 TaskManager,即使运行时出现故障,也不需要将全部节点重启,只要尝试重启故障节点就可以了。但是我们发现,针对一个作业而言,管理它的 JobManager 却只有一个,这同样有可能出现单点故障。为了实现更好的可用性,我们需要 JobManager 做一些主备冗余,这就是所谓的高可用(High Availability,简称 HA)。
我们可以通过配置,让集群在任何时候都有一个主 JobManager 和多个备用 JobManagers,这样主节点故障时就由备用节点来接管集群,接管后作业就可以继续正常运行。主备 JobManager 实例之间没有明显的区别,每个 JobManager 都可以充当主节点或者备节点
image.png
具体配置如下:

  1. high-availability: zookeeper
  2. high-availability.storageDir: hdfs://hadoop102:9820/flink/standalone/ha
  3. high-availability.zookeeper.quorum:
  4. hadoop102:2181,hadoop103:2181,hadoop104:2181
  5. high-availability.zookeeper.path.root: /flink-standalone
  6. high-availability.cluster-id: /cluster_atguigu
  1. hadoop102:8081
  2. hadoop103:8081

(3)分发修改后的配置文件到其他节点服务器。
(4)在/etc/profile.d/my_env.sh 中配置环境变量

  1. export HADOOP_CLASSPATH=`hadoop classpath`

注意:
⚫ 需要提前保证 HAOOP_HOME 环境变量配置成功
⚫ 分发到其他节点
具体部署方法如下:
(1)首先启动 HDFS 集群和 Zookeeper 集群。

  1. $ bin/start-cluster.sh
  1. http://hadoop102:8081
  2. http://hadoop103:8081
  1. [zk: localhost:2181(CONNECTED) 1] get
  2. /flink-standalone/cluster_atguigu/leader/rest_server_lock
  1. [zk: localhost:2181(CONNECTED) 7] get
  2. /flink-standalone/cluster_atguigu/leader/rest_server_lock

注意: 不管是不是 leader,从 WEB UI 上是看不到区别的, 都可以提交应用。

YARN 模式

独立(Standalone)模式由 Flink 自身提供资源,无需其他框架,这种方式降低了和其他
第三方资源框架的耦合性,独立性非常强。但我们知道,Flink 是大数据计算框架,不是资源
调度框架,这并不是它的强项;所以还是应该让专业的框架做专业的事,和其他资源调度框架
集成更靠谱。而在目前大数据生态中,国内应用最为广泛的资源管理平台就是 YARN 了。所
以接下来我们就将学习,在强大的 YARN 平台上 Flink 是如何集成部署的。
整体来说,YARN 上部署的过程是:客户端把 Flink 应用提交给 Yarn 的 ResourceManager,
Yarn 的 ResourceManager 会向 Yarn 的 NodeManager 申请容器。在这些容器上,Flink 会部署
JobManager 和 TaskManager 的实例,从而启动集群。Flink 会根据运行在 JobManger 上的作业
所需要的 Slot 数量动态分配 TaskManager 资源。

相关准备和配置

在 Flink1.8.0 之前的版本,想要以 YARN 模式部署 Flink 任务时,需要 Flink 是有 Hadoop
支持的。从 Flink 1.8 版本开始,不再提供基于 Hadoop 编译的安装包,若需要 Hadoop 的环境
支持,需要自行在官网下载 Hadoop 相关版本的组件 flink-shaded-hadoop-2-uber-2.7.5-10.0.jar,
并将该组件上传至 Flink 的 lib 目录下。在 Flink 1.11.0 版本之后,增加了很多重要新特性,其
中就包括增加了对Hadoop3.0.0以及更高版本Hadoop的支持,不再提供“flink-shaded-hadoop-
jar 包,而是通过配置环境变量完成与 YARN 集群的对接。
在将 Flink 任务部署至 YARN 集群之前,需要确认集群是否安装有 Hadoop,保证 Hadoop
版本至少在 2.2 以上,并且集群中安装有 HDFS 服务。
*具体配置步骤如下:

(1)按照 3.1 节所述,下载并解压安装包,并将解压后的安装包重命名为 flink-1.13.0-yarn,
本节的相关操作都将默认在此安装路径下执行。

  1. $ sudo vim /etc/profile.d/my_env.sh
  2. HADOOP_HOME=/opt/module/hadoop-2.7.5
  3. export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
  4. export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
  5. export HADOOP_CLASSPATH=`hadoop classpath`

这里必须保证设置了环境变量 HADOOP_CLASSPATH。

  1. [atguigu@hadoop102 ~]$ start-dfs.sh
  2. [atguigu@hadoop103 ~]$ start-yarn.sh
  1. [atguigu@hadoop102 ~]$ jps
  2. 5190 Jps
  3. 5062 NodeManager
  4. 4408 NameNode
  5. 4589 DataNode
  6. [atguigu@hadoop103 ~]$ jps
  7. 5425 Jps
  8. 4680 ResourceManager
  9. 5241 NodeManager
  10. 4447 DataNode
  11. [atguigu@hadoop104 ~]$ jps
  12. 4731 NodeManager
  13. 4333 DataNode
  14. 4861 Jps
  15. 4478 SecondaryNameNode

(4)进入 conf 目录,修改 flink-conf.yaml 文件,修改以下配置,这些配置项的含义在进行 Standalone 模式配置的时候进行过讲解,若在提交命令中不特定指明,这些配置将作为默认配置。

  1. $ cd /opt/module/flink-1.13.0-yarn/conf/
  2. $ vim flink-conf.yaml
  3. jobmanager.memory.process.size: 1600m
  4. taskmanager.memory.process.size: 1728m
  5. taskmanager.numberOfTaskSlots: 8
  6. parallelism.default: 1

会话模式部署

YARN 的会话模式与独立集群略有不同,需要首先申请一个 YARN 会话(YARN session)来启动 Flink 集群。具体步骤如下:
1. 启动集群
(1)启动 hadoop 集群(HDFS, YARN)。
(2)执行脚本命令向 YARN 集群申请资源,开启一个 YARN 会话,启动 Flink 集群。

  1. $ bin/yarn-session.sh -nm test

可用参数解读:
⚫ -d:分离模式,如果你不想让 Flink YARN 客户端一直前台运行,可以使用这个参数,即使关掉当前对话窗口,YARN session 也可以后台运行。
⚫ -jm(—jobManagerMemory):配置 JobManager 所需内存,默认单位 MB。
⚫ -nm(—name):配置在 YARN UI 界面上显示的任务名。
⚫ -qu(—queue):指定 YARN 队列名。
⚫ -tm(—taskManager):配置每个 TaskManager 所使用内存。
注意:Flink1.11.0 版本不再使用-n 参数和-s 参数分别指定 TaskManager 数量和 slot 数量,YARN 会按照需求动态分配 TaskManager 和 slot。所以从这个意义上讲,YARN 的会话模式也不会把集群资源固定,同样是动态分配的。YARN Session 启动之后会给出一个 web UI 地址以及一个 YARN application ID,如下所示, 用户可以通过 web UI 或者命令行两种方式提交作业。

  1. 2021-06-03 15:54:27,069 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - YARN application has been deployed successfully.
  2. 2021-06-03 15:54:27,070 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface hadoop104:39735 of application
  3. 'application_1622535605178_0003'.JobManager Web Interface: http://hadoop104:39735
  1. 提交作业
    (1)通过 Web UI 提交作业
    这种方式比较简单,与上文所述 Standalone 部署模式基本相同。
    (2)通过命令行提交作业
    ① 将 Standalone 模式讲解中打包好的任务运行 JAR 包上传至集群
    ② 执行以下命令将该任务提交到已经开启的 Yarn-Session 中运行。
    1. $ bin/flink run
    2. -c com.atguigu.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar
    客户端可以自行确定 JobManager 的地址,也可以通过-m 或者-jobmanager 参数指定JobManager 的地址,JobManager 的地址在 YARN Session 的启动页面中可以找到。
    ③ 任务提交成功后,可在 YARN 的 Web UI 界面查看运行情况。
    image.png
    从图中可以看到我们创建的 Yarn-Session 实际上是一个 Yarn 的Application,并且有唯一的 Application ID。
    ④也可以通过 Flink 的 Web UI 页面查看提交任务的运行情况,如图 3-15 所示。
    image.png

    3.4.3 单作业模式部署

    在 YARN 环境中,由于有了外部平台做资源调度,所以我们也可以直接向 YARN 提交一个单独的作业,从而启动一个 Flink 集群。
    1. $ bin/flink run -d -t yarn-per-job -c com.atguigu.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar
    1. $ bin/flink run -m yarn-cluster -c com.atguigu.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar
    注意这里是通过参数-m yarn-cluster 指定向 YARN 集群提交任务。
    (2)在 YARN 的 ResourceManager 界面查看执行情况。
    image.png
    点击可以打开 Flink Web UI 页面进行监控
    image.png
    1. $ ./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
    2. $ ./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
    3. <jobId>
    这里的 application_XXXX_YY 是当前应用的 ID,是作业的 ID。注意如果取消作业,整个 Flink 集群也会停掉。

    应用模式部署

    应用模式同样非常简单,与单作业模式类似,直接执行 flink run-application 命令即可。
    1. $ bin/flink run-application -t yarn-application -c com.atguigu.wc.StreamWordCount
    2. FlinkTutorial-1.0-SNAPSHOT.jar
    1. $ ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
    2. $ ./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
    1. $ ./bin/flink run-application -t yarn-application
    2. -Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir"
    3. hdfs://myhdfs/jars/my-application.jar
    这种方式下 jar 可以预先上传到 HDFS,而不需要单独发送到集群,这就使得作业提交更加轻量了。

    高可用

    YARN 模式的高可用和独立模式(Standalone)的高可用原理不一样。
    Standalone 模式中, 同时启动多个 JobManager, 一个为“领导者”(leader),其他为“后备” (standby), 当 leader 挂了, 其他的才会有一个成为 leader。
    而 YARN 的高可用是只启动一个 Jobmanager, 当这个 Jobmanager 挂了之后, YARN 会再次启动一个, 所以其实是利用的 YARN 的重试次数来实现的高可用。
    1. <property>
    2. <name>yarn.resourcemanager.am.max-attempts</name>
    3. <value>4</value>
    4. <description>
    5. The maximum number of application master execution attempts.
    6. </description>
    7. </property>
    注意: 配置完不要忘记分发, 和重启 YARN。
    1. yarn.application-attempts: 3
    2. high-availability: zookeeper
    3. high-availability.storageDir: hdfs://hadoop102:9820/flink/yarn/ha
    4. high-availability.zookeeper.quorum:
    5. hadoop102:2181,hadoop103:2181,hadoop104:2181
    6. high-availability.zookeeper.path.root: /flink-yarn
    (3)启动 yarn-session。
    (4)杀死 JobManager, 查看复活情况。
    注意: yarn-site.xml 中配置的是 JobManager 重启次数的上限, flink-conf.xml 中的次数应该小于这个值

    K8S 模式