Flink部署的基础组件

image.png

  • JobManager
    • 集群管理者
  • TaskManagers
    • worker进程(运行用户代码)
  • 检查点存储
    • 分布式文件系统,S3 or HDFS
  • JobManager故障恢复
    • Zookeeper
    • JobManager存储checkpoint
    • Ververica Platform 在 K8s 上实现 JobManager 故障转移,不需要 ZooKeeper
  • Metrics, Logging, CI/CD
    • 可选组件
    • 用户可配置1个或多个MetricsReporters来连接到监控系统中,如
      • Prometheus, JMX, Graphite, StatsD, etc.
    • 有了logging and metrics,就可以做很多东西

Attention:所有数据流:即source、sink和流数据,在task manager之间进行数据交换。

Distributed runtime

image.png
当应用的main()方法运行时,用户进行的API调用会在内存中构建一个数据流图,叫做 job graph。

在某些部署模式下(session模式和per-job模式),main() 方法在Flink外运行,在独立client进程中运行。在这些模式里,job graph和相关的用户代码被提交到Flink cluster执行。

application模式下,应用的main()方法在JobManager中执行,而不是在client或者是一些三方部署进程中。这种模式下,client或者其他部署进程不需要拉取和上传所有job的依赖包。

JobManager有许多内部组件。

  • Dispatcher
    • 集群的rest访问地址
    • 从Clients接收job,为每个job启动各种per-job components
  • scheduler
    • 将 job graph分割成并行任务,并安排执行
    • 从ResourceManager获取资源
  • checkpoint coordinator
    • 触发checkpoint
    • 跟踪worker的心跳
    • 根据需要重启

Task Managers

  • 用户代码实际运行处
  • Task Slot
    • scheduling/调度的单元
    • 为运行应用程序的一个并行切片提供资源
    • 1个/多个core,一些内存

部署模式

  • Mini cluster(开发/测试阶段,非生产)
    • 单个JVM,包括client, job manager, and task managers
    • 方便测试与在ide中调试
  • Session cluster
    • 长期运行集群,多个job
    • 作业之间没有隔离(TaskManagers是共享)
    • 非常适合运行短期作业(例如,ad-hoc即席查询)
    • 一个不规范的作业可能导致整个集群崩溃,或者访问到集群中的某个存在的安全认证
  • Job cluster
    • 单个作业,集群与该作业同生周
    • 非常适合连续运行作业
    • Job cluster非常适合长期作业的容器化部署
  • Application cluster(new in Flink 1.11)
    • 仅从一个 Flink 应用程序执行job
    • main() 运行在cluster,而不是client
    • application jar和dependencies (including Flink itself) 可以被提前上传
    • 可以部署Flink应用到yarn,像其他基于YARN的应用一样。

实际生产直接用Application模式即可,其他模式不用关注,慢慢会被替代掉。直接对标spark on yarn即可。
yarn模式下。

DataStream API and execution

image.png

The JobGraph(逻辑图)

image.png

The ExecutionGraph(物理图)

image.png

事件保持他们的相对顺序

image.png
image.png
image.png
image.png
注意观察A和(1,2)经过了不同的路径,发生了什么?原先的顺序是A21,现在是21A。同一个partition的顺序是一直保持的。

数据交换的策略

image.png

  • Forward
    • 当前的partition保持不变,每个记录还在原来的并行分支上
  • Key-based
    • 记录按照某个key来进行分区,key是根据某种业务逻辑或算法来计算出来的
    • 同一个key一定被分到同一个partition
  • Broadcast
    • 每个partition都得到全部
  • Round-robin
    • 当前记录会被轮流发送到下一个算子的各个partition
    • 在进行重新平衡以影响并行度的变化时使用

Job components and operator chaining

image.png
Chaining

  • 记录从一个用户定义的函数传递到另一个,而不涉及 Flink 的任何较低层转换(de/ser,网络堆栈)
  • 增加吞吐量,不涉及序列化及网络传输转换
  • 仅仅适用于forward类型数据传输
  • chaining也可以被禁用。通常我们只在调试的时候采取禁用策略,比如我们在2个算子间强制通过网络传输,为了看清两个算子之间的通信。
  • 一个算子链中的一个并行实例叫做task(某些地方也叫subtask)

image.png

  • 上图示例的作业中除了sink外,其他算子的并行度都是2。
  • 每个task manager中包含一个task slot。每个task slot会被分配2-3个task去执行。
  • 每个task对应java的一个线程
  • 每个task slot的2-3个task共享这个slot的资源
  • task slot只是用于调度的抽象(逻辑概念):它们不对应于任何物理含义。
  • Task Manager刚好拥有的所有资源都被该 TM 中运行的task slot使用。