1、逻辑分区
比如在聚合算子时的keyBy操作,它就是一种按照键的哈希值来进行重新分区的操作。只不过这种操作只能保证数据按key分开,至于分得均匀与否、每一个key的数据会分到那个区这些完全取法控制,所以keyBy这种操作是一种逻辑分区,也叫“软分区”。
2、物理分区
我们能正真控制的分区策略,能精准的分配数据,告诉每一个数据应该区、去那个分区。 物理分区与 keyBy 的区别在于keyBy 之后得到的是一个 KeyedStream,而物理分 区之后结果仍是 DataStream,且流中元素数据类型保持不变。从这一点也可以看出分区算子 并不对数据进行转换处理,只是定义了数据的传输方式。常见的物理分区策略 :
- 随机分区(shuffle)
- 轮询分配(Round-Robin)
- 重缩放(Rescale)
广播(broadcast)
public class TransformPartition {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Event> streamSource = env.fromElements(new Event("Mary", "./home", 1000L),new Event("Bob", "./cat", 2000L),new Event("Alice", "./prod?id=100", 3000L),new Event("Bob", "./prod?id=1", 3300L),new Event("Alice", "./prod?id=200", 3000L),new Event("Bob", "./home", 3500L),new Event("Bob", "./prod?id=2", 3800L),new Event("Bob", "./prod?id=3", 4200L));// 调用物理分区// 1、随机分区(洗牌)// streamSource.shuffle().print().setParallelism(4);// 2、轮询分区(发牌)在所有的并行子任务之间发牌// streamSource.rebalance().print().setParallelism(4);// 3、重缩放分区env.addSource(new RichParallelSourceFunction<Integer>() {@Overridepublic void run(SourceContext<Integer> ctx) throws Exception {for (int i = 0; i < 8; i++) {// 将奇数偶数分别发送到0号和1号并行分区if(i % 2 == getRuntimeContext().getIndexOfThisSubtask()){ctx.collect(i);}}}@Overridepublic void cancel() {}}).setParallelism(2).rescale().print().setParallelism(4);// 4、广播streamSource.broadcast().print().setParallelism(4);// 5、全局分区streamSource.global().print().setParallelism(4);// 6、自定义分区env.fromElements(1,2,3,4,5,6,7,8).partitionCustom(new Partitioner<Integer>() {@Overridepublic int partition(Integer key, int numPartitions) {return key % 2;}}, new KeySelector<Integer, Integer>() {@Overridepublic Integer getKey(Integer value) throws Exception {return value;}}).print().setParallelism(4);env.execute("TransformPartition");}}
