这几个方法用于决定上游stream的数据如何分发给下游的各个channel。
总览
| rebalance | Round-Robin往下游channel发送。 |
|---|---|
| rescale | Round-Robin往下游channel发送。 跟rebalance的不同点在于,rebalance会将数据发送发送给所有下游任务,而 rescale只将数据发送给部分下游任务。 比如上游stream的并行度为2,下游的并行度为4,那么上游1号channel中的数据会发送给下游的某两个channel,上游2号channel中的数据会发送给下游的另外两个channel。 |
| shuffle | 随机选择一个下游channel发送。 |
| keyedBy | 计算key的hashCode;再用murmurHash计算hash值,保证进一步散列;hash % maxParallelism得到下游channel的编号。 |
上代码,一目了然
rebalance
Round-Robin往下游channel发送。
public class RebalancePartitioner<T> extends StreamPartitioner<T> {private static final long serialVersionUID = 1L;private int nextChannelToSendTo;@Overridepublic void setup(int numberOfChannels) {super.setup(numberOfChannels);nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);}@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;return nextChannelToSendTo;}}
rescale
Round-Robin往下游channel发送。
跟rebalance的不同点在于,rebalance会将数据发送发送给所有下游任务,而 rescale只将数据发送给部分下游任务。
// TODO: 这块没怎么看懂,跟上面有什么区别,怎么就选择部分下游了?
public class RescalePartitioner<T> extends StreamPartitioner<T> {private static final long serialVersionUID = 1L;private int nextChannelToSendTo = -1;@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {if (++nextChannelToSendTo >= numberOfChannels) {nextChannelToSendTo = 0;}return nextChannelToSendTo;}}
shuffle
随机选择一个下游channel发送。
public class ShufflePartitioner<T> extends StreamPartitioner<T> {private static final long serialVersionUID = 1L;private Random random = new Random();@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {return random.nextInt(numberOfChannels);}}
keyedBy
计算key的hashCode;再用murmurHash计算hash值,保证进一步散列;hash % maxParallelism得到下游channel的编号。
public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> implements ConfigurableStreamPartitioner {private static final long serialVersionUID = 1L;private final KeySelector<T, K> keySelector;private int maxParallelism;@Overridepublic int selectChannel(SerializationDelegate<StreamRecord<T>> record) {K key;try {key = keySelector.getKey(record.getInstance().getValue());} catch (Exception e) {throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);}return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);}}public final class KeyGroupRangeAssignment {/*** Assigns the given key to a parallel operator index.* @return the index of the parallel operator to which the given key should be routed.*/public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {Preconditions.checkNotNull(key, "Assigned key must not be null!");return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));}/*** Assigns the given key to a key-group index.* @return the key-group to which the given key is assigned*/public static int assignToKeyGroup(Object key, int maxParallelism) {Preconditions.checkNotNull(key, "Assigned key must not be null!");return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);}/*** Assigns the given key to a key-group index.* @return the key-group to which the given key is assigned*/public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {return MathUtils.murmurHash(keyHash) % maxParallelism;}}
