1.环境配置

  1. Flink 是一个分布式的流处理框架,所以实际应用一般都需要搭建集群环境。我们在进行 Flink 安装部署的学习时,需要准备 3 Linux 机器。具体要求如下:
  • 系统环境为 CentOS 7.5 版本。
  • 安装 Java 8。
  • 安装 Hadoop 集群,Hadoop 建议选择 Hadoop 2.7.5 以上版本。
  • 配置集群节点服务器间时间同步以及免密登录,关闭防火墙。

本书中三台服务器的具体设置如下:

  • 节点服务器 1,IP 地址为 192.168.10.102,主机名为 hadoop102。
  • 节点服务器 2,IP 地址为 192.168.10.103,主机名为 hadoop103。
  • 节点服务器 3,IP 地址为 192.168.10.104,主机名为 hadoop104。

2.本地启动

下载安装包

进入 Flink 官网,下载 1.13.0 版本安装包 flink-1.13.0-bin-scala_2.12.tgz,注意此处选用对 应 scala 版本为 scala 2.12 的安装包。

解压

  1. hadoop102 节点服务器上创建安装目录/opt/module,将 flink 安装包放在该目录下,并 执行解压命令,解压至当前目录。
  1. $ tar -zxvf flink-1.13.0-bin-scala_2.12.tgz -C /opt/module/

启动

  1. 进入解压后的目录,执行启动命令,并查看进程。
  1. cd flink-1.13.0/
  2. $ bin/start-cluster.sh
  3. Starting cluster.
  4. Starting standalonesession daemon on host hadoop102.
  5. Starting taskexecutor daemon on host hadoop102.
  6. $ jps
  7. 10369 StandaloneSessionClusterEntrypoint
  8. 10680 TaskManagerRunner
  9. 10717 Jps

关闭集群

  1. $ bin/stop-cluster.sh
  2. Stopping taskexecutor daemon (pid: 10680) on host hadoop102.
  3. Stopping standalonesession daemon (pid: 10369) on host hadoop102.

3.集群启动

可以看到,Flink 本地启动非常简单,直接执行 start-cluster.sh 就可以了。如果我们想要扩 展成集群,其实启动命令是不变的,主要是需要指定节点之间的主从关系。 Flink 是典型的 Master-Slave 架构的分布式数据处理框架,其中 Master 角色对应着 JobManager,Slave 角色则对应 TaskManager。我们对三台节点服务器的角色分配如表 3-1 所示。 启动Flink集群 - 图1

具体安装部署步骤如下:

修改集群配置

进入 conf 目录下,修改 flink-conf.yaml 文件,修改 jobmanager.rpc.address 参数为 hadoop102

  1. $ vi conf/flink-conf.yaml
  2. # JobManager 节点地址.
  3. jobmanager.rpc.address: hadoop102

修改 workers 文件,将另外两台节点服务器添加为本 Flink 集群的 TaskManager 节点

  1. $ vim workers
  2. hadoop103
  3. hadoop104

另外,在 flink-conf.yaml 文件中还可以对集群中的 JobManager 和 TaskManager 组件 进行优化配置,主要配置项如下:

  • jobmanager.memory.process.size:对 JobManager 进程可使用到的全部内存进行配置, 包括 JVM 元空间和其他开销,默认为 1600M,可以根据集群规模进行适当调整。
  • taskmanager.memory.process.size:对 TaskManager 进程可使用到的全部内存进行配置, 包括 JVM 元空间和其他开销,默认为 1600M,可以根据集群规模进行适当调整。
  • taskmanager.numberOfTaskSlots:对每个 TaskManager 能够分配的 Slot 数量进行配置, 默认为 1,可根据 TaskManager 所在的机器能够提供给 Flink 的 CPU 数量决定。所谓 Slot 就是 TaskManager 中具体运行一个任务所分配的计算资源。
  • parallelism.default:Flink 任务执行的默认并行度,优先级低于代码中进行的并行度配 置和任务提交时使用参数指定的并行度数量。

分发安装目录

  1. 配置修改完毕后,将 Flink 安装目录发给另外两个节点服务器。
  1. $ xsync /opt/module/flink-1.13.0

启动集群

在 hadoop102 节点服务器上执行 start-cluster.sh 启动 Flink 集群

  1. $ bin/start-cluster.sh

访问 Web UI

启动成功后,同样可以访问 http://hadoop102:8081 对 flink 集群和任务进行监控管理

启动Flink集群 - 图2

4.提交作业

程序打包

为方便自定义结构和定制依赖,我们可以引入插件 maven-assembly-plugin 进行打包。 在 FlinkTutorial 项目的 pom.xml 文件中添加打包插件的配置,具体如下:

  1. <build>
  2. <plugins>
  3. <plugin>
  4. <groupId>org.apache.maven.plugins</groupId>
  5. <artifactId>maven-assembly-plugin</artifactId>
  6. <version>3.0.0</version>
  7. <configuration>
  8. <descriptorRefs>
  9. <descriptorRef>jar-with-dependencies</descriptorRef>
  10. </descriptorRefs>
  11. </configuration>
  12. <executions>
  13. <execution>
  14. <id>make-assembly</id>
  15. <phase>package</phase>
  16. <goals>
  17. <goal>single</goal>
  18. </goals>
  19. </execution>
  20. </executions>
  21. </plugin>
  22. </plugins>
  23. </build>

插件配置完毕后,可以使用 IDEA 的 Maven 工具执行 package 命令,出现如下提示即 表示打包成功。

启动Flink集群 - 图3

webUI提交作业

任务打包完成后,我们打开 Flink 的 WEB UI 页面,在右侧导航栏点击“Submit New Job”,然后点击按钮“+ Add New”,选择要上传运行的 JAR 包

启动Flink集群 - 图4

启动Flink集群 - 图5

  • 主要配置程序入口主类的全类名,任务运行的并行度,任务运行所需的配置参数和保存点 路径等,如图 3-6 所示,配置完成后,即可点击按钮“Submit”,将任务提交到集群运行。

启动Flink集群 - 图6

  • 任务提交成功之后,可点击左侧导航栏的“Running Jobs”查看程序运行列表情况

启动Flink集群 - 图7

  • 点击该任务,可以查看任务运行的具体情况,也可以通过点击“Cancel Job”结束任务运行

启动Flink集群 - 图8

命令行提交

除了通过 WEB UI 界面提交任务之外,也可以直接通过命令行来提交任务。这里为方便 起见,我们可以先把 jar 包直接上传到目录 flink-1.13.0 下

(1)首先需要启动集群。 $ bin/start-cluster.sh

(2)在 hadoop102 中执行以下命令启动 netcat。 $ nc -lk 7777

(3)进入到 Flink 的安装路径下,在命令行使用 flink run 命令提交作业。

  1. $ bin/flink run -m hadoop102:8081 -c com.atguigu.wc.StreamWordCount ./FlinkTutorial-1.0-SNAPSHOT.jar

这里的参数 –m 指定了提交到的 JobManager,-c 指定了入口类。

(4)在浏览器中打开 Web UI,http://hadoop102:8081 查看应用执行情况 .

(5)在 log 日志中,也可以查看执行结果,需要找到执行该数据任务的 TaskManager 节点 查看日志。