一、基本概念

1、Flink简介
Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。
Flink是一个有状态的分布式计算框架,处理对象是有界的和无界的数据流
任何数据都可以转化为有界流或无界流来处理

    1. 有界流(流处理):有始无终
  • 2.无界流(批处理):有始有终

Flink使用教程 - 图1
2、Flink Application
了解 Flink 应用开发需要先理解 Flink 的 Streams、State、Time 等基础处理语义以及 Flink 兼顾灵活性和方便性的多层次 API。

  • Streams:流,分为有限数据流与无限数据流,unbounded stream 是有始无终的数据流,即无限数据流;而 bounded stream 是限定大小的有始有终的数据集合,即有限数据流,二者的区别在于无限数据流的数据会随时间的推演而持续增加,计算持续进行且不存在结束的状态,相对的有限数据流数据大小固定,计算最终会完成并处于结束的状态。
  • State,状态是计算过程中的数据信息,在容错恢复和 Checkpoint 中有重要的作用,流计算在本质上是 Incremental Processing,因此需要不断查询保持状态;另外,为了确保 Exactly- once 语义,需要数据能够写入到状态中;而持久化存储,能够保证在整个分布式系统运行失败或者挂掉的情况下做到 Exactly- once,这是状态的另外一个价值。
  • Time,分为 Event time、Ingestion time、Processing time,Flink 的无限数据流是一个持续的过程,时间是我们判断业务状态是否滞后,数据处理是否及时的重要依据。
  • API,API 通常分为三层,由上而下可分为 SQL / Table API、DataStream API、ProcessFunction 三层,API 的表达能力及业务抽象能力都非常强大,但越接近 SQL 层,表达能力会逐步减弱,抽象能力会增强,反之,ProcessFunction 层 API 的表达能力非常强,可以进行多种灵活方便的操作,但抽象能力也相对越小。

二、架构

Flink使用教程 - 图2

  • 分层API

Flink使用教程 - 图3

三、使用

3.1 flink程序编程步骤

  • 设置运行环境
  • 配置数据源,获取数据
  • 一系列操作转换数据
  • 配置数据输出,保存数据
  • 提交执行

3.2 数据的操作概览

image.png

3.3 DataSream的基本转换

image.png

3.4 基本分组策略

image.png

3.5 数据类型

  • 类型系统

image.png

  • Flink支持的数据类型
  1. Java Tuple
    Java API 提供了从Tuple1到Tuple25的类
  1. DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
  2. new Tuple2<String, Integer>("hello", 1),
  3. new Tuple2<String, Integer>("world", 2));
  4. wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
  5. @Override
  6. public Integer map(Tuple2<String, Integer> value) throws Exception {
  7. return value.f1;
  8. }
  9. });
  10. wordCounts.keyBy(0); // also valid .keyBy("f0")
  1. Java POJOs

java的类在满足以下条件的时候会被Flink视为特殊的POJO数据类型:

  • 这个类必须是公有的
  • 它必须有一个公有的无参构造函数
  • 所有参数都需要是公有的或提供get/set方法
  • 参数的数据类型必须是Flink支持的
public class WordWithCount {
    public String word;
    public int count;
    public WordWithCount() {}
    public WordWithCount(String word, int count) {
        this.word = word;
        this.count = count;
    }
}
DataStream<WordWithCount> wordCounts = env.fromElements(
    new WordWithCount("hello", 1),
    new WordWithCount("world", 2));
wordCounts.keyBy("word"); // key by field expression "word"
  1. Primitive Types

Flink支持Java所有的基本数据类型,例如Integer、String、Double

  1. values

Flink 支持大部分的Java类

  1. Hadoop Writables
  2. Special Types

3.6 批处理(DataSet)-有界流数据(bounded)

  • 小例子-worldcount

    //1、创建有界流(批处理)的执行环境
      val env = ExecutionEnvironment.getExecutionEnvironment
      val params = ParameterTool.fromArgs(args)
          //2、配置数据源
      val inputPath = params.get("D:\\workspace\\idea\\flinkdemo\\src\\main\\resources\\wc.txt")
        //3、获取数据并转换数据
      val result = env.readTextFile(inputPath)
          .flatMap(_.split(" "))
          .map((_,1))
          .groupBy(0)
          .sum(1)
    
        //4、配置数据输出
        //    result.print()
        //    result.print
              //数据转换+数据输出(打印到控制台)
          result.setParallelism(1).sortPartition(_._2,Order.DESCENDING).print
              //提交执行
        //env.execute("wordcount")
    
  • 懒执行(Lazy Evaluation)

当Flink程序主方法执行的时候,数据加载和数据转换并没有立即执行,当execute()触发后执行

3.7 流处理(DataStream) - 无界流数据(unbounded)

  • worldcount
    //1、创建无界流(流处理)的执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
      //    从外部接收参数
    val params = ParameterTool.fromArgs(args)
    if(args.length <= 2){
     println("输入参数 [host] [port]")
     println("--host 监听ip地址")
     println("--port 监听端口号")
    }else{
    //2、配置输入源
     val host = params.get("host")
     val port = params.getInt("port")
    //获取数据
     val textDataStream = env.socketTextStream(host,port)
      //3、操作数据
     val wcStream = textDataStream.flatMap(_.split("\\s")).filter(_.nonEmpty).map((_, 1)).keyBy(0).sum(1)
      //4、配置数据输出
     wcStream.print
    //5、提交执行
     env.execute("wc")
    

3.8 Table API & SQL、

1、Create a TableEnvironment

  • 方式一:FLINK STREAMING QUERY ```scala import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.scala.StreamTableEnvironment

val fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build() val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment val fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings) // or val fsTableEnv = TableEnvironment.create(fsSettings)


- 方式二:FLINK BATCH QUERY
```scala
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.scala.BatchTableEnvironment

val fbEnv = ExecutionEnvironment.getExecutionEnvironment
val fbTableEnv = BatchTableEnvironment.create(fbEnv)
  • 方式三:BLINK STREAMING QUERY ```scala import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.scala.StreamTableEnvironment

val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings) // or val bsTableEnv = TableEnvironment.create(bsSettings)


- 方法四:BLINK BATCH QUERY
```scala
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val bbTableEnv = TableEnvironment.create(bbSettings)

如果/ lib目录中只有一个规划器jar,则可以使用useAnyPlanner(用于python的use_any_planner)来创建特定的EnvironmentSettings。

2、Register Tables in the Catalog

  • Register a Table
//扫描内存中/应用中已注册的表
// table is the result of a simple projection query 
val projTable: Table = tableEnv.scan("X").select(...)

// register the Table projTable as table "projectedTable"
tableEnv.registerTable("projectedTable", projTable)
  • Register a TableSource
//创建数据源,可以为文本(CSV, Apache [Parquet, Avro, ORC], …)、数据库(MySQL, HBase, …)、消息队列(Apache Kafka, RabbitMQ, …)
// create a TableSource
val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)

// register the TableSource as table "CsvTable"
tableEnv.registerTableSource("CsvTable", csvSource)

四、部署

4.1 Standalone

环境要求:

  • Java 1.8.x or higher,

    4.1.1 安装jdk8+,配置java环境变量

  • 下载

链接:https://share.weiyun.com/5WPXMUQ

  • 上传服务器并安装
# 安装,默认安装目录/usr/local/java
rpm -iv jdk-8u221-linux-x64.rpm
  • 配置环境变量
export JAVA_HOME=/usr/java/jdk1.8.0_221-amd64
export CLASSPATH=.:${JAVA_HOME}/jre/lib/rt.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jar
export PATH=$PATH:${JAVA_HOME}/bin

4.1.2 安装flink

  • 下载
wget http://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.11.tgz
  • 解压
tar xzf flink-1.9.1-bin-scala_2.11.tgz
  • 配置(默认配置即可)

配置在flink-1.9.1/conf下面修改,

cd flink-1.9.1/conf

4.1.3 启动和停止集群

  • 启动
    bin/start-cluster.sh
    

启动集群后可以在webui上看到flink监控平台
url为ip:8081

  • 停止
bin/stop-cluster.sh

4.1.4 提交任务

  • webui提交
  • shell命令提交
./flink run -c com.atguigu.flink.app.BatchWcApp  /ext/flink0503-1.0-SNAPSHOT.jar  
--input /applog/flink/input.txt --output /applog/flink/output.csv

4.1.5 添加JobManager/TaskManager到正在运行的集群中

  • Adding a JobManager
bin/jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all
  • Adding a TaskManager
bin/taskmanager.sh start|start-foreground|stop|stop-all

4.2 Flink cluster on YARN

4.3 on Mesos

4.4 on Doker

4.5 on Kubernetes

五、运行时架构

5.1 flink运行时的组件

image.png

5.1.1 任务管理器

image.png

5.1.2 资源管理器

image.png

5.1.3 分发器

image.png

5.2 作业提交流程

image.png
image.png

5.3 任务调度原理

image.png

  • TaskManager和Slot

image.png

  • 执行图

image.png