这几个方法用于决定上游stream的数据如何分发给下游的各个channel。

总览

rebalance Round-Robin往下游channel发送。
rescale Round-Robin往下游channel发送。
跟rebalance的不同点在于,rebalance会将数据发送发送给所有下游任务,而 rescale只将数据发送给部分下游任务。
比如上游stream的并行度为2,下游的并行度为4,那么上游1号channel中的数据会发送给下游的某两个channel,上游2号channel中的数据会发送给下游的另外两个channel。
shuffle 随机选择一个下游channel发送。
keyedBy 计算key的hashCode;再用murmurHash计算hash值,保证进一步散列;hash % maxParallelism得到下游channel的编号。

上代码,一目了然

rebalance

Round-Robin往下游channel发送。

  1. public class RebalancePartitioner<T> extends StreamPartitioner<T> {
  2. private static final long serialVersionUID = 1L;
  3. private int nextChannelToSendTo;
  4. @Override
  5. public void setup(int numberOfChannels) {
  6. super.setup(numberOfChannels);
  7. nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);
  8. }
  9. @Override
  10. public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
  11. nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
  12. return nextChannelToSendTo;
  13. }
  14. }

rescale

Round-Robin往下游channel发送。
跟rebalance的不同点在于,rebalance会将数据发送发送给所有下游任务,而 rescale只将数据发送给部分下游任务。
// TODO: 这块没怎么看懂,跟上面有什么区别,怎么就选择部分下游了?

  1. public class RescalePartitioner<T> extends StreamPartitioner<T> {
  2. private static final long serialVersionUID = 1L;
  3. private int nextChannelToSendTo = -1;
  4. @Override
  5. public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
  6. if (++nextChannelToSendTo >= numberOfChannels) {
  7. nextChannelToSendTo = 0;
  8. }
  9. return nextChannelToSendTo;
  10. }
  11. }

shuffle

随机选择一个下游channel发送。

  1. public class ShufflePartitioner<T> extends StreamPartitioner<T> {
  2. private static final long serialVersionUID = 1L;
  3. private Random random = new Random();
  4. @Override
  5. public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
  6. return random.nextInt(numberOfChannels);
  7. }
  8. }

keyedBy

计算key的hashCode;再用murmurHash计算hash值,保证进一步散列;hash % maxParallelism得到下游channel的编号。

  1. public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> implements ConfigurableStreamPartitioner {
  2. private static final long serialVersionUID = 1L;
  3. private final KeySelector<T, K> keySelector;
  4. private int maxParallelism;
  5. @Override
  6. public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
  7. K key;
  8. try {
  9. key = keySelector.getKey(record.getInstance().getValue());
  10. } catch (Exception e) {
  11. throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
  12. }
  13. return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);
  14. }
  15. }
  16. public final class KeyGroupRangeAssignment {
  17. /**
  18. * Assigns the given key to a parallel operator index.
  19. * @return the index of the parallel operator to which the given key should be routed.
  20. */
  21. public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
  22. Preconditions.checkNotNull(key, "Assigned key must not be null!");
  23. return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
  24. }
  25. /**
  26. * Assigns the given key to a key-group index.
  27. * @return the key-group to which the given key is assigned
  28. */
  29. public static int assignToKeyGroup(Object key, int maxParallelism) {
  30. Preconditions.checkNotNull(key, "Assigned key must not be null!");
  31. return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
  32. }
  33. /**
  34. * Assigns the given key to a key-group index.
  35. * @return the key-group to which the given key is assigned
  36. */
  37. public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
  38. return MathUtils.murmurHash(keyHash) % maxParallelism;
  39. }
  40. }