TM为一个JVM进程

Slot

Slot平均分配TM内存。Slot 只对内存隔离,Slot只是个线程,没有对 CPU 隔离。 同一 JVM 中的任务共享 TCP 连接和心跳消息。Task Manager 的一个 Slot 代表一个可用线程,该线程具有固定的内存。注意 默认情况下,Flink 允许子任务共享 Slot,即使它们是不同 task 的 subtask,只要它们来自相同的 job。这种共享可以有更好的资源利用率。
默认情况下,Flink 允许subtasks共享slot,条件是它们都来自同一个Job的不同task的subtask。结果可能一个slot持有该job的整个pipeline。

SlotID:TM全生命周期每个Slot固定不变。
AllocationID:也是Slot的物理ID,但是用来给Job分配用的。
SlotRequestId
ResourceID 每个组件都有:RM、JM、TM。Dispatcher没有。

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. env.setParallelism(10)

注意:这样设置的并行度是你整个程序的并行度,那么后面如果你的每个算子不单独设置并行度覆盖的话,那么后面每个算子的并行度就都是这里设置的并行度的值了。

这也说明优先级是:算子设置并行度 > env 设置并行度 > 配置文件默认并行度
Slot有两种实现SharedSlot和SimpleSlot。
slotNumber是在TM上的第几个槽位

Slot状态

Free、Releasing、Allocated、Active。

SlotTable

TimerService

allocate Slot的时候注册一个定时器,如果超时Slot将被释放变成Free状态。Slot变成Active将取消定时器。
TM管理Slot的组件

网络通信

TaskManger会和JobManager的jobmanager.rpc.port: 6123建立ESTABLISHED长连接image.png
TaskManager和JobManager 之间通过基于 Akka 的 RPC 通信,TaskManager 之间的网络协议栈依赖于更加底层的 Netty API。

不同任务(远程)之间的每个网络连接将在 Flink 的网络堆栈中获得自己的 TCP 通道。但是,如果同一任务的不同子任务被调度到同一个 TaskManager,则它们与同一个 TaskManager 的网络连接将多路复用并共享同一个 TCP 信道以减少资源使用。
image.png

反压

Credit-based 1.5之后
上游发送backlog,下游相应credit
在应用层模拟TCP流控机制。
1.8可是kafka source静态限流
调优
增大反压源头算子并行度。

内存管理

MemorySegment默认32KB,Flink最小内存分配单元。
TaskManager堆内存如下。
image.png
Network Buffers:TM启动默认分配2048个32KB的Buffer。
Memory Manager Pool:无数MemorySegment组成超大集合,默认占70%堆内存。
Free:给用户使用的。

序列化

Flink支持任意的Java或是Scala类型。Flink 在数据类型上有很大的进步,不需要实现一个特定的接口(像Hadoop中的org.apache.hadoop.io.Writable),Flink 能够自动识别数据类型。Flink 通过 Java Reflection 框架分析基于 Java 的 Flink 程序 UDF (User Define Function)的返回类型的类型信息,通过 Scala Compiler 分析基于 Scala 的 Flink 程序 UDF 的返回类型的类型信息。类型信息由 TypeInformation 类表示,TypeInformation 支持以下几种类型:

  • BasicTypeInfo: 任意Java 基本类型(装箱的)或 String 类型。
  • BasicArrayTypeInfo: 任意Java基本类型数组(装箱的)或 String 数组。
  • WritableTypeInfo: 任意 Hadoop Writable 接口的实现类。
  • TupleTypeInfo: 任意的 Flink Tuple 类型(支持Tuple1 to Tuple25)。Flink tuples 是固定长度固定类型的Java Tuple实现。
  • CaseClassTypeInfo: 任意的 Scala CaseClass(包括 Scala tuples)。
  • PojoTypeInfo: 任意的 POJO (Java or Scala),例如,Java对象的所有成员变量,要么是 public 修饰符定义,要么有 getter/setter 方法。
  • GenericTypeInfo: 任意无法匹配之前几种类型的类。

执行Task

对于TM来说,执行task就是把收到的TaskDeploymentDescriptor对象转换成一个task并执行的过程。