Flink包含8中分区策略,这8中分区策略(分区器)分别如下面所示,本文将从源码的角度一一解读每个分区器的实现方式。

  • GlobalPartitioner
  • ShufflePartitioner
  • RebalancePartitioner
  • RescalePartitioner
  • BroadcastPartitioner
  • ForwardPartitioner
  • KeyGroupStreamPartitioner
  • CustomPartitionerWrapper

继承关系图

接口

名称

ChannelSelector

实现

  1. public interface ChannelSelector<T extends IOReadableWritable> {
  2. /**
  3. * 初始化channels数量,channel可以理解为下游Operator的某个实例(并行算子的某个subtask).
  4. */
  5. void setup(int numberOfChannels);
  6. /**
  7. *根据当前的record以及Channel总数,
  8. *决定应将record发送到下游哪个Channel。
  9. *不同的分区策略会实现不同的该方法。
  10. */
  11. int selectChannel(T record);
  12. /**
  13. *是否以广播的形式发送到下游所有的算子实例
  14. */
  15. boolean isBroadcast();
  16. }

名称

StreamPartitioner

实现

  1. public abstract class StreamPartitioner<T> implements
  2. ChannelSelector<SerializationDelegate<StreamRecord<T>>>, Serializable {
  3. private static final long serialVersionUID = 1L;
  4. protected int numberOfChannels;
  5. @Override
  6. public void setup(int numberOfChannels) {
  7. this.numberOfChannels = numberOfChannels;
  8. }
  9. @Override
  10. public boolean isBroadcast() {
  11. return false;
  12. }
  13. public abstract StreamPartitioner<T> copy();
  14. }

继承关系图

Flink的八种分区策略源码解读 - 图1

GlobalPartitioner

简介

该分区器会将所有的数据都发送到下游的某个算子实例(subtask id = 0)

源码解读

  1. /**
  2. * 发送所有的数据到下游算子的第一个task(ID = 0)
  3. * @param <T>
  4. */
  5. @Internal
  6. public class GlobalPartitioner<T> extends StreamPartitioner<T> {
  7. private static final long serialVersionUID = 1L;
  8. @Override
  9. public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
  10. //只返回0,即只发送给下游算子的第一个task
  11. return 0;
  12. }
  13. @Override
  14. public StreamPartitioner<T> copy() {
  15. return this;
  16. }
  17. @Override
  18. public String toString() {
  19. return "GLOBAL";
  20. }
  21. }

图解

Flink的八种分区策略源码解读 - 图2

ShufflePartitioner

简介

随机选择一个下游算子实例进行发送

源码解读

  1. /**
  2. * 随机的选择一个channel进行发送
  3. * @param <T>
  4. */
  5. @Internal
  6. public class ShufflePartitioner<T> extends StreamPartitioner<T> {
  7. private static final long serialVersionUID = 1L;
  8. private Random random = new Random();
  9. @Override
  10. public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
  11. //产生[0,numberOfChannels)伪随机数,随机发送到下游的某个task
  12. return random.nextInt(numberOfChannels);
  13. }
  14. @Override
  15. public StreamPartitioner<T> copy() {
  16. return new ShufflePartitioner<T>();
  17. }
  18. @Override
  19. public String toString() {
  20. return "SHUFFLE";
  21. }
  22. }

图解

Flink的八种分区策略源码解读 - 图3

BroadcastPartitioner

简介

发送到下游所有的算子实例

源码解读

  1. /**
  2. * 发送到所有的channel
  3. */
  4. @Internal
  5. public class BroadcastPartitioner<T> extends StreamPartitioner<T> {
  6. private static final long serialVersionUID = 1L;
  7. /**
  8. * Broadcast模式是直接发送到下游的所有task,所以不需要通过下面的方法选择发送的通道
  9. */
  10. @Override
  11. public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
  12. throw new UnsupportedOperationException("Broadcast partitioner does not support select channels.");
  13. }
  14. @Override
  15. public boolean isBroadcast() {
  16. return true;
  17. }
  18. @Override
  19. public StreamPartitioner<T> copy() {
  20. return this;
  21. }
  22. @Override
  23. public String toString() {
  24. return "BROADCAST";
  25. }
  26. }

图解

Flink的八种分区策略源码解读 - 图4

RebalancePartitioner

简介

通过循环的方式依次发送到下游的task

源码解读

  1. /**
  2. *通过循环的方式依次发送到下游的task
  3. * @param <T>
  4. */
  5. @Internal
  6. public class RebalancePartitioner<T> extends StreamPartitioner<T> {
  7. private static final long serialVersionUID = 1L;
  8. private int nextChannelToSendTo;
  9. @Override
  10. public void setup(int numberOfChannels) {
  11. super.setup(numberOfChannels);
  12. //初始化channel的id,返回[0,numberOfChannels)的伪随机数
  13. nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);
  14. }
  15. @Override
  16. public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
  17. //循环依次发送到下游的task,比如:nextChannelToSendTo初始值为0,numberOfChannels(下游算子的实例个数,并行度)值为2
  18. //则第一次发送到ID = 1的task,第二次发送到ID = 0的task,第三次发送到ID = 1的task上...依次类推
  19. nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
  20. return nextChannelToSendTo;
  21. }
  22. public StreamPartitioner<T> copy() {
  23. return this;
  24. }
  25. @Override
  26. public String toString() {
  27. return "REBALANCE";
  28. }
  29. }

图解

Flink的八种分区策略源码解读 - 图5

RescalePartitioner

简介

基于上下游Operator的并行度,将记录以循环的方式输出到下游Operator的每个实例。
举例: 上游并行度是2,下游是4,则上游一个并行度以循环的方式将记录输出到下游的两个并行度上;上游另一个并行度以循环的方式将记录输出到下游另两个并行度上。
若上游并行度是4,下游并行度是2,则上游两个并行度将记录输出到下游一个并行度上;上游另两个并行度将记录输出到下游另一个并行度上。

源码解读

  1. @Internal
  2. public class RescalePartitioner<T> extends StreamPartitioner<T> {
  3. private static final long serialVersionUID = 1L;
  4. private int nextChannelToSendTo = -1;
  5. @Override
  6. public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
  7. if (++nextChannelToSendTo >= numberOfChannels) {
  8. nextChannelToSendTo = 0;
  9. }
  10. return nextChannelToSendTo;
  11. }
  12. public StreamPartitioner<T> copy() {
  13. return this;
  14. }
  15. @Override
  16. public String toString() {
  17. return "RESCALE";
  18. }
  19. }

图解

Flink的八种分区策略源码解读 - 图6

尖叫提示

Flink 中的执行图可以分成四层:StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图。
StreamGraph:是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。
JobGraph:StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。
ExecutionGraph:JobManager 根据 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。
物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。
而StreamingJobGraphGenerator就是StreamGraph转换为JobGraph。在这个类中,把ForwardPartitioner和RescalePartitioner列为POINTWISE分配模式,其他的为ALL_TO_ALL分配模式。代码如下:

  1. if (partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner) {
  2. jobEdge = downStreamVertex.connectNewDataSetAsInput(
  3. headVertex,
  4. // 上游算子(生产端)的实例(subtask)连接下游算子(消费端)的一个或者多个实例(subtask)
  5. DistributionPattern.POINTWISE,
  6. resultPartitionType);
  7. } else {
  8. jobEdge = downStreamVertex.connectNewDataSetAsInput(
  9. headVertex,
  10. // 上游算子(生产端)的实例(subtask)连接下游算子(消费端)的所有实例(subtask)
  11. DistributionPattern.ALL_TO_ALL,
  12. resultPartitionType);
  13. }

ForwardPartitioner

简介

发送到下游对应的第一个task,保证上下游算子并行度一致,即上有算子与下游算子是1:1的关系

源码解读

  1. /**
  2. * 发送到下游对应的第一个task
  3. * @param <T>
  4. */
  5. @Internal
  6. public class ForwardPartitioner<T> extends StreamPartitioner<T> {
  7. private static final long serialVersionUID = 1L;
  8. @Override
  9. public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
  10. return 0;
  11. }
  12. public StreamPartitioner<T> copy() {
  13. return this;
  14. }
  15. @Override
  16. public String toString() {
  17. return "FORWARD";
  18. }
  19. }

图解

Flink的八种分区策略源码解读 - 图7

尖叫提示

在上下游的算子没有指定分区器的情况下,如果上下游的算子并行度一致,则使用ForwardPartitioner,否则使用RebalancePartitioner,对于ForwardPartitioner,必须保证上下游算子并行度一致,否则会抛出异常

  1. //在上下游的算子没有指定分区器的情况下,如果上下游的算子并行度一致,则使用ForwardPartitioner,否则使用RebalancePartitioner
  2. if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
  3. partitioner = new ForwardPartitioner<Object>();
  4. } else if (partitioner == null) {
  5. partitioner = new RebalancePartitioner<Object>();
  6. }
  7. if (partitioner instanceof ForwardPartitioner) {
  8. //如果上下游的并行度不一致,会抛出异常
  9. if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
  10. throw new UnsupportedOperationException("Forward partitioning does not allow " +
  11. "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
  12. ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
  13. " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
  14. }
  15. }

KeyGroupStreamPartitioner

简介

根据key的分组索引选择发送到相对应的下游subtask

源码解读

  • org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner

    1. /**
    2. * 根据key的分组索引选择发送到相对应的下游subtask
    3. * @param <T>
    4. * @param <K>
    5. */
    6. @Internal
    7. public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> implements ConfigurableStreamPartitioner {
    8. ...
    9. @Override
    10. public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
    11. K key;
    12. try {
    13. key = keySelector.getKey(record.getInstance().getValue());
    14. } catch (Exception e) {
    15. throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
    16. }
    17. //调用KeyGroupRangeAssignment类的assignKeyToParallelOperator方法,代码如下所示
    18. return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);
    19. }
    20. ...
    21. }
  • org.apache.flink.runtime.state.KeyGroupRangeAssignment

    1. public final class KeyGroupRangeAssignment {
    2. ...
    3. /**
    4. * 根据key分配一个并行算子实例的索引,该索引即为该key要发送的下游算子实例的路由信息,
    5. * 即该key发送到哪一个task
    6. */
    7. public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
    8. Preconditions.checkNotNull(key, "Assigned key must not be null!");
    9. return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
    10. }
    11. /**
    12. *根据key分配一个分组id(keyGroupId)
    13. */
    14. public static int assignToKeyGroup(Object key, int maxParallelism) {
    15. Preconditions.checkNotNull(key, "Assigned key must not be null!");
    16. //获取key的hashcode
    17. return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
    18. }
    19. /**
    20. * 根据key分配一个分组id(keyGroupId),
    21. */
    22. public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
    23. //与maxParallelism取余,获取keyGroupId
    24. return MathUtils.murmurHash(keyHash) % maxParallelism;
    25. }
    26. //计算分区index,即该key group应该发送到下游的哪一个算子实例
    27. public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {
    28. return keyGroupId * parallelism / maxParallelism;
    29. }
    30. ...

图解

Flink的八种分区策略源码解读 - 图8

CustomPartitionerWrapper

简介

通过Partitioner实例的partition方法(自定义的)将记录输出到下游。

  1. public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> {
  2. private static final long serialVersionUID = 1L;
  3. Partitioner<K> partitioner;
  4. KeySelector<T, K> keySelector;
  5. public CustomPartitionerWrapper(Partitioner<K> partitioner, KeySelector<T, K> keySelector) {
  6. this.partitioner = partitioner;
  7. this.keySelector = keySelector;
  8. }
  9. @Override
  10. public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
  11. K key;
  12. try {
  13. key = keySelector.getKey(record.getInstance().getValue());
  14. } catch (Exception e) {
  15. throw new RuntimeException("Could not extract key from " + record.getInstance(), e);
  16. }
  17. //实现Partitioner接口,重写partition方法
  18. return partitioner.partition(key, numberOfChannels);
  19. }
  20. @Override
  21. public StreamPartitioner<T> copy() {
  22. return this;
  23. }
  24. @Override
  25. public String toString() {
  26. return "CUSTOM";
  27. }
  28. }

比如:

  1. public class CustomPartitioner implements Partitioner<String> {
  2. // key: 根据key的值来分区
  3. // numPartitions: 下游算子并行度
  4. @Override
  5. public int partition(String key, int numPartitions) {
  6. return key.length() % numPartitions;//在此处定义分区策略
  7. }
  8. }

小结

本文主要从源码层面对Flink的8中分区策略进行了一一分析,并对每一种分区策略给出了相对应的图示,方便快速理解源码。如果你觉得本文对你有用,可以关注我,了解更多精彩内容。