1、逻辑分区

比如在聚合算子时的keyBy操作,它就是一种按照键的哈希值来进行重新分区的操作。只不过这种操作只能保证数据按key分开,至于分得均匀与否、每一个key的数据会分到那个区这些完全取法控制,所以keyBy这种操作是一种逻辑分区,也叫“软分区”。

2、物理分区

我们能正真控制的分区策略,能精准的分配数据,告诉每一个数据应该区、去那个分区。 物理分区与 keyBy 的区别在于keyBy 之后得到的是一个 KeyedStream,而物理分 区之后结果仍是 DataStream,且流中元素数据类型保持不变。从这一点也可以看出分区算子 并不对数据进行转换处理,只是定义了数据的传输方式。常见的物理分区策略 :

  • 随机分区(shuffle)
  • 轮询分配(Round-Robin)
  • 重缩放(Rescale)
  • 广播(broadcast)

    1. public class TransformPartition {
    2. public static void main(String[] args) throws Exception{
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. env.setParallelism(1);
    5. DataStreamSource<Event> streamSource = env.fromElements(new Event("Mary", "./home", 1000L),
    6. new Event("Bob", "./cat", 2000L),
    7. new Event("Alice", "./prod?id=100", 3000L),
    8. new Event("Bob", "./prod?id=1", 3300L),
    9. new Event("Alice", "./prod?id=200", 3000L),
    10. new Event("Bob", "./home", 3500L),
    11. new Event("Bob", "./prod?id=2", 3800L),
    12. new Event("Bob", "./prod?id=3", 4200L)
    13. );
    14. // 调用物理分区
    15. // 1、随机分区(洗牌)
    16. // streamSource.shuffle().print().setParallelism(4);
    17. // 2、轮询分区(发牌)在所有的并行子任务之间发牌
    18. // streamSource.rebalance().print().setParallelism(4);
    19. // 3、重缩放分区
    20. env.addSource(new RichParallelSourceFunction<Integer>() {
    21. @Override
    22. public void run(SourceContext<Integer> ctx) throws Exception {
    23. for (int i = 0; i < 8; i++) {
    24. // 将奇数偶数分别发送到0号和1号并行分区
    25. if(i % 2 == getRuntimeContext().getIndexOfThisSubtask()){
    26. ctx.collect(i);
    27. }
    28. }
    29. }
    30. @Override
    31. public void cancel() {
    32. }
    33. }).setParallelism(2).rescale().print().setParallelism(4);
    34. // 4、广播
    35. streamSource.broadcast().print().setParallelism(4);
    36. // 5、全局分区
    37. streamSource.global().print().setParallelism(4);
    38. // 6、自定义分区
    39. env.fromElements(1,2,3,4,5,6,7,8)
    40. .partitionCustom(new Partitioner<Integer>() {
    41. @Override
    42. public int partition(Integer key, int numPartitions) {
    43. return key % 2;
    44. }
    45. }, new KeySelector<Integer, Integer>() {
    46. @Override
    47. public Integer getKey(Integer value) throws Exception {
    48. return value;
    49. }
    50. }).print().setParallelism(4);
    51. env.execute("TransformPartition");
    52. }
    53. }