1、逻辑分区
比如在聚合算子时的keyBy操作,它就是一种按照键的哈希值来进行重新分区的操作。只不过这种操作只能保证数据按key分开,至于分得均匀与否、每一个key的数据会分到那个区这些完全取法控制,所以keyBy这种操作是一种逻辑分区,也叫“软分区”。
2、物理分区
我们能正真控制的分区策略,能精准的分配数据,告诉每一个数据应该区、去那个分区。 物理分区与 keyBy 的区别在于keyBy 之后得到的是一个 KeyedStream,而物理分 区之后结果仍是 DataStream,且流中元素数据类型保持不变。从这一点也可以看出分区算子 并不对数据进行转换处理,只是定义了数据的传输方式。常见的物理分区策略 :
- 随机分区(shuffle)
- 轮询分配(Round-Robin)
- 重缩放(Rescale)
广播(broadcast)
public class TransformPartition {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> streamSource = env.fromElements(new Event("Mary", "./home", 1000L),
new Event("Bob", "./cat", 2000L),
new Event("Alice", "./prod?id=100", 3000L),
new Event("Bob", "./prod?id=1", 3300L),
new Event("Alice", "./prod?id=200", 3000L),
new Event("Bob", "./home", 3500L),
new Event("Bob", "./prod?id=2", 3800L),
new Event("Bob", "./prod?id=3", 4200L)
);
// 调用物理分区
// 1、随机分区(洗牌)
// streamSource.shuffle().print().setParallelism(4);
// 2、轮询分区(发牌)在所有的并行子任务之间发牌
// streamSource.rebalance().print().setParallelism(4);
// 3、重缩放分区
env.addSource(new RichParallelSourceFunction<Integer>() {
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
for (int i = 0; i < 8; i++) {
// 将奇数偶数分别发送到0号和1号并行分区
if(i % 2 == getRuntimeContext().getIndexOfThisSubtask()){
ctx.collect(i);
}
}
}
@Override
public void cancel() {
}
}).setParallelism(2).rescale().print().setParallelism(4);
// 4、广播
streamSource.broadcast().print().setParallelism(4);
// 5、全局分区
streamSource.global().print().setParallelism(4);
// 6、自定义分区
env.fromElements(1,2,3,4,5,6,7,8)
.partitionCustom(new Partitioner<Integer>() {
@Override
public int partition(Integer key, int numPartitions) {
return key % 2;
}
}, new KeySelector<Integer, Integer>() {
@Override
public Integer getKey(Integer value) throws Exception {
return value;
}
}).print().setParallelism(4);
env.execute("TransformPartition");
}
}