MapReduce的Shuffle

在shuffle的时候,需要将各个节点里面的相同的key拉取到同一个节点进行task处理,例如join 、group by,如果某个key对应的数据量非常大,那么必然这个key对应的数据进行处理的时候就会产生数据倾斜。
image.png

每个 Map 任务的计算结果都会写入到本地文件系统,等 Map 任务快要计算完成的时候,MapReduce 计算框架会启动 shuffle 过程,在 Map 任务进程调用一个 Partitioner 接口,对 Map 产生的每个 进行 Reduce 分区选择,然后通过 HTTP 通信发送给对应的 Reduce 进程。这样不管 Map 位于哪个服务器节点,相同的 Key 一定会被发送给相同的 Reduce 进程。
Reduce 任务进程对收到的 进行排序和合并,相同的 Key 放在一起,组成一个 传递给 Reduce 执行。

map 输出的 shuffle 到哪个 Reduce 进程是这里的关键,它是由Partitioner 来实现,MapReduce 框架默认的 Partitioner 用 Key 的哈希值对 Reduce 任务数量取模,相同的 Key 一定会落在相同的 Reduce 任务 ID 上。从实现上来看的话,这样的 Partitioner 代码只需要一行。

  1. /** Use {@link Object#hashCode()} to partition. */
  2. public int getPartition(K2 key, V2 value, int numReduceTasks) {
  3. return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  4. }

对 shuffle 的理解,只需要记住这一点:分布式计算需要将不同服务器上的相关数据合并到一起进行下一步计算,这就是 shuffle。

离线计算的Shuffle - 图2

Spark中的Shuffle

image.png