typora-copy-images-to: 大数据技术栈实践私教级教程-13.Flink typora-root-url: 大数据技术栈实践私教级教程-13.Flink

Flink概述

Flink是什么

Apache Flink是一个框架分布式处理引擎,用于对无界有界数据流进行状态计算。

国内使用Flink的企业

image-20220101204304105.png

为什么选择Flink

  • 流数据更真实的反应了我们的生活方式
  • 传统的数据架构是基于有限数据集的
  • 我们的数据

    • 低延迟
    • 高吞吐
    • 结果的准确性和良好的容错性

      需要处理流数据的行业

  • 电商和市场营销

    • 数据报表、广告投放、业务流程需要
  • 物联网
    • 传感器实时数据的采集和显示、实时报警
  • 电信
    • 基站流量调配
  • 银行金融

    • 实时结算和通知推送,实时检测异常行为

      数据处理架构发展演变

      传统数据处理架构

  • 事务处理架构

image-20220101211253753.png
特点是:实时性很好,但是能够同时处理的数据有限

  • 分析处理架构

image-20220101211658859.png
将数据从业务数据库复制到数仓,再进行分析和查询
特点:从不同地方提取数据,处理海量数据。过程很慢。

第一代流处理器(Strom)

image-20220101212037818.png

将事务处理架构和分析处理架构进行结合,事务数据不需要放在关系型数据库放在本地内存里面,把数据保存为本地状态,结合状态做计算,状态自身根据需要进行更新。对本地内存进行周期性检查存储在硬盘。这样可以做到低延迟高吞吐还具有良好的可扩展性。
第一代流处理器就是这么设计的,但是有个问题就是不能满足数据的有序性。

第二代流处理器Lambda架构

image-20220101212455530.png
用两套系统(流处理、批处理),同时保证低延迟和结果准确。先快速得到一个近似结果(流处理),最后给出一个精准结果(批处理)。
问题是实现一个需求就需要实现和维护两个系统。

第三代流处理器架构

image-20220104095334302.png
Flink:低延迟、高吞吐、数据有序、操作简单。

Flink的主要特点

事件驱动

image-20220104095648328.png

基于流的世界观

在Flink的世界中,一切都是由流组成的,离线数据是有界流,实时数据是无界流
image-20220104100105243.png

分层API

image-20220104101059878.png

  • 越顶层越抽象,表达含义越简明,使用越方便
  • 越底层越具体,表达含义越丰富,使用越灵活

    其他特点

  • 支持事件时间(event-time)和处理时间(processing-time)语义

  • 精确一次(exactly-once)的状态一致性保证
  • 低延迟,每秒处理数百万个事件,毫秒级延迟
  • 与众多常用存储系统的连接
  • 高可用,动态扩展,实现7*24小时全天候运行

    Flink VS SparkStreaming

    image-20220104105024491.png

  • 数据模型

    • spark采用RDD模型,spark streaming的DStream实际上也就是一组组小批数据RDD的集合
    • flink基本数据模型是数据流,以及事件(Event)序列
  • 运行架构

    • spark是批计算,将DAG划分为不同的stage,一个完成后可以计算下一个
    • flink是标准的执行流模式,一个事件在一个节点处理完成后可以直接发望下一个节点进行处理

      Flink开发

      批处理开发

  • pom文件

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.apache.flink</groupId>
    4. <artifactId>flink-java</artifactId>
    5. <version>1.10.1</version>
    6. </dependency>
    7. <dependency>
    8. <groupId>org.apache.flink</groupId>
    9. <artifactId>flink-streaming-java_2.12</artifactId>
    10. <version>1.10.1</version>
    11. </dependency>
    12. </dependencies>
  • WordCount

    public class WordCount {
    
      public static void main(String[] args) throws Exception {
          // 创建执行环境
          ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
          // 从文件读取数据
          String inputPath = "E:\\坚果云同步\\我的坚果云\\大数据\\尚硅谷系列博客\\代码\\flink\\src\\main\\resources\\hello.txt";
          DataSource<String> inputDataSet = env.readTextFile(inputPath);
    
          // 对数据集进行处理,按照空格隔开
          AggregateOperator<Tuple2<String, Integer>> result = inputDataSet.flatMap(new MyFlatMapper())
                  // 按照第一个位置的word分组
                  .groupBy(0)
                  // 按照第二个位置上的数据求和
                  .sum(1);
    
          result.print();
      }
    
      public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {
    
          @Override
          public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
              // 按空格分词
              String[] words = s.split(" ");
              for (String word : words) {
                  collector.collect(new Tuple2<>(word, 1));
              }
          }
      }
    }
    
  • hello.txt

    hello 1
    hello 2
    hello 3
    hello 4
    hello 5
    hello 6
    hello 7
    hello 8
    
  • 执行结果

image-20220104111713298.png

流处理开发(有界)

  • pom

    <dependencies>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-java</artifactId>
          <version>1.10.1</version>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-java_2.12</artifactId>
          <version>1.10.1</version>
      </dependency>
    </dependencies>
    
  • StreamWordCount ```java public class StreamWordCount {

    public static void main(String[] args) throws Exception {

      // 创建流处理执行环境
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      // 设置并行数
      env.setParallelism(1);
    
      // 从文件读取数据
       String inputPath = "E:\\坚果云同步\\我的坚果云\\大数据\\尚硅谷系列博客\\代码\\flink\\src\\main\\resources\\hello.txt";
       DataStreamSource<String> inputDataSet = env.readTextFile(inputPath);
      // 从socket读取文本流数据
      //DataStreamSource<String> inputDataSet = env.socketTextStream("127.0.0.1", 7777);
    
      // 基于数据流进行转换计算
      SingleOutputStreamOperator<Tuple2<String, Integer>> result = inputDataSet.flatMap(new MyFlatMapper())
              // 按照第一个位置的word分组
              .keyBy(0)
              // 按照第二个位置上的数据求和
              .sum(1);
    
      result.print();
    
      // 执行任务
      env.execute("开始执行");
    

    }

    public static class MyFlatMapper implements FlatMapFunction> {

      @Override
      public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
          // 按空格分词
          String[] words = s.split(" ");
          for (String word : words) {
              collector.collect(new Tuple2<>(word, 1));
          }
      }
    

    }

}


- 执行结果

![image-20220104112057303.png](https://cdn.nlark.com/yuque/0/2022/png/12468834/1641347442543-12becdf2-69cc-443d-abb3-5ca9b9e65090.png#clientId=u24d87621-714d-4&from=drop&id=u5eab27d3&margin=%5Bobject%20Object%5D&name=image-20220104112057303.png&originHeight=525&originWidth=130&originalType=binary&ratio=1&size=16923&status=done&style=none&taskId=ufd98275b-11c4-4d52-ad71-87bcc8694a9)

<a name="lteT0"></a>
## 流处理开发(无界)
连接127.0.0.1的7777端口获取数据。
```java
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
String host = parameterTool.get("host");
int port = parameterTool.getInt("port");

DataStreamSource<String> inputDataSet = env.socketTextStream(host,port);

Flink部署

下载地址:Index of /dist/flink/flink-1.10.1 (apache.org)
版本选择:1.10.1

tar -xvzf flink-1.10.1-bin-scala_2.12.tgz -C /opt/module/

Standalone模式

  • 配置flink-conf.yaml ```java jobmanager.rpc.address: localhost

jobmanager.rpc.port: 6123

jobmanager.heap.size: 1024m

taskmanager.memory.process.size: 1728m

表示1个TaskManager节点只有1个槽位,只有1个线程池。代表了并行最大的能力

taskmanager.numberOfTaskSlots: 1

默认并行度,真实执行时候执行的线程数量

parallelism.default: 1


- 修改flink-conf.yaml

`jobmanager.rpc.address: hadoop102 `

- 修改slaves文件

`hadoop103 hadoop104`

- 分发到hadoop103、hadoop104

`xsync /opt/module/flink-1.10.1/`

- 启动集群

`. /opt/module/flink-1.10.1/bin/start-cluster.sh`

- 查看进程
- 查看web

访问hadoop102:8081
<a name="Be5Qj"></a>
## Web提交任务
```java
com.example.wordcount.StreamWordCount
 --host hadoop102 --port 7777

hadoop102启动7777端口:
nc -l 7777
输入你好 Flink回车

命令行提交任务

  • 提交命令 ```cpp ./bin/flink run -c com.example.wordcount.StreamWordCount -p 3 /opt/module/flink-1.10.1/data/flink-1.0-SNAPSHOT.jar —host hadoop102 —port 7777

- 查看job
```java
./bin/flink list ============================ Waiting for response... ------------------ Running/Restarting Jobs ------------------- 04.01.2022 15:39:44 : d9001f548a0bc25fb7201c184cc6e749 : 开始执行 (RUNNING) -------------------------------------------------------------- No scheduled jobs.
  • 停止job

./bin/flink cancel d9001f548a0bc25fb7201c184cc6e749 =========================== Cancelling job d9001f548a0bc25fb7201c184cc6e749. Cancelled job d9001f548a0bc25fb7201c184cc6e749.

Slot分配不够处理方式

  • 调大集群资源

taskmanager.numberOfTaskSlots = 4

  • 并行度调低

    Yarn模式

    以Yarn模式部署Flink时候,要求Flink是由Hadoop支持的版本,Hadoop环境要求保证版本在2.2以上,并且集群中安装由HDFS服务。
    Flink提供了两种yarn上运行的模式,分别为Session-Cluster和Per-Job-Cluster模式。

    Session-Cluster模式

    Session-Cluster模式需要先启动集群,然后再提交作业,接着会向yarn申请一些空间后,资源保持不变。如果资源满了,写一个作业无法提交,只能等到yarn中其中一个作业执行完毕后,释放资源,下个作业才能正常提交。所有作业共享Dispatcher和ResourceManager;共享资源;适合规模小执行时间短的作业。
    在yarn中初始化一个flink集群,开辟指定的资源,以后提交任务都是向这里提交,这个flink集群会常驻在yarn集群中,除非手动停止。

    创建Session

  • 启动Hadoop集群

  • 启动yarn-session

./yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d
-n(—container): TaskManager的数量,不指定的话,动态分配,根据yarn的资源决定上限。
-s(—slot):每个TaskManager的slot数量,默认一个slot一个core,默认每个taskManager的slot个数为1,有时可以多一些taskManager,做冗余。
-jm:JobManager的内存
-tm:TaskManager的内存
-nm:yarn的appManager(yarn的ui上的名字)

执行任务

Flink创建了yarn的Session,提交的任务默认在Session上执行。没有yarn的Session,找Standalone默认的集群。
./bin/flink run -c com.example.wordcount.StreamWordCount -p 3 /opt/module/flink-1.10.1/data/flink-1.0-SNAPSHOT.jar —host hadoop102 —port 7777

取消Session

yarn application —kill application_1212313123123121_0001

Per-Job-Cluster模式

一个Job会对应一个集群,每提交一个作业会根据自身情况,都会单独向yarn申请资源,申请作业执行完成,一个作业的失败与否并不会影响下一个作业的正常提交和运行。独享Dispatcher和ResourceManager;按需要接收资源申请;适合规模大长时间运行的作业。
每次提交都会创建一个新的Flink集群,任务之间相互独立,互补影响,方便管理。任务执行完成之后创建的集群也会消失。

启动Hadoop集群

不启动Session,直接执行job(添加-m yarm-cluster)

./bin/flink run -m yarm-cluster -c com.example.wordcount.StreamWordCount -p 3 /opt/module/flink-1.10.1/data/flink-1.0-SNAPSHOT.jar —host hadoop102 —port 7777

Flink架构

Flink运行时组件

  • JobManager作业管理器
    • 控制一个应用程序的主进程,每一个应用程序都被不同的JobManager所控制执行
    • 会先接收要执行的应用程序,这个应用程序包括:作业图JobGraph逻辑数据流图logical dataflow graph打包了多有类、库和其他资源的jar包
    • JobManager会把所有的JobGraph转换成一个物理层面的数据流图,这个图被叫做“执行图”,包含了所有可以并发执行的任务
    • JobManager会向ResourceManager请求执行任务需要的资源,也就是任务管理器上的slot,一旦获取到了足够的slot,就会将执行图分发到真正运行他们的TaskManager上。运行过程中,JobManager会负责所有需要中央协调的操作,比如 检查点的协调
  • TaskManager任务管理器
    • Flink的工作进程,通常在Flink中会有多个TaskManager的运行,每一个TaskManager都包含了一定数量的Slot。Slot的数量限制了TaskManager能够执行的任务数量。
    • 启动之后,TaskManager会将资源管理器注册到他的插槽,收到资源管理器的指令后,TaskManager会将一个或者多个插槽提供给JobManager调用。JobManager就可以向插槽分配任务来执行。
    • 在执行过程中,一个TaskManager可以根据其他运行同一应用程序的TaskManager交换数据。
  • ResourceManager资源管理器
    • 主要负责管理任务管理器的slot,TaskManager插槽是Flink中定义的处理资源单元
    • Flink为不同环境和资源管理工具提供了不同的资源管理器,比如yarn、Mesos、K8s以及Standalone部署
    • 当JobManager申请插槽资源时,ResourceManager会将由空闲的插槽的TaskManager分配给JobManager。如果ResourceManager没有足够的插槽来满足JobManager的请求,它还可以向资源提供平台发起会话,以提供启动TaskManager进程的容器。
  • Dispacher分发器

    • 可以跨作业运行,它为应用提供了Rest接口
    • 当一个应用被提交执行时,分发器将会启动并将应用移交给一个JobManager
    • Dispatcher也会启动一个web UI,用来方便地展示和监控作业执行地信息
    • Dispatcher在架构中可能并不是必需地,这取决于应用提交运行地方式

      任务提交流程

      通用提交流程

      yarn提交流程

      任务调度原理

      思考

  • 怎么实现并行计算

    • 设置并行度,拆成几个并行的Task,分配到不同的slot上,多线程执行
  • 并行的任务,需要占用多少slot
    • 在代码中和提交任务-p参数时候都可以设置并行度。
    • 并行度:当前算子的子任务的个数被称为并行度。
    • 一个stream的并行度取决于所有算子中最大的并行度
  • 一个流处理程序,到底包含多少个任务
    • Flink中每一个TaskManager都是一个JVM进程,独立运行一个或多个子任务
    • 为了控制一个TaskManager能够接收多少个Task,TaskManager通过task slot来进行控制。
    • 默认情况下,Flink允许子任务共享slot,即便他们是不同任务的子任务,这样的结果是一个slot可以保存作业的整个管道。可以通过.slotSharingGroup(“2”);配置不同的共享组,不同的共享组一定占用不同的slot。
    • Task slot是静态的概念,是指TaskManager具有的并发执行能力。