spark 的 join 是怎么实现的?

当前 SparkSql 支持三种 join 算法:1、shuffle hash join, 2、broadcast hash join ,3、sort merge join。其中前两者是基于 hash join 的衍生的,只不过是在hash join 之前先进行broadcast 或则 shuffle,然后再进行 hash join,几十年前数据库就有了 hash join 的方法,spark 的这两种 join 只是加上了分布式的场景和思想而已。接下来,我们先说说什么是 hash join。

先说说 hash join

确定 Bulid Table 和 Probe Table: ,bulid table 使用 join key 构建 Hash Table,而 Probe Table 使用 join key 进行一个探测,探测成功就可以 join 在一起。
构建 Hash Table:通常情况下,小表会作为 bulid table,大表作为 probe table(探测表),依次读取 Bulid Table的数据,对于每一行数据根据 join key 进行 hash,hash 到对应的 bucket, 生成 Hash Table 中一条记录。数据缓存在内存里,如果内存放不下,需dump 到外存。
探测:再次扫描 probe table 的数据,使用相同的 hash 函数映射 HashTable 中的记录,映射成功之后再检查 join 的条件,如果匹配成功就可以将二者 join 在一起。
image.png
基本流程可以参考上图,这里有两个小问题需要关注:

  1. hash join 的性能如何? 很显然,hash join 是两个表,每个表只扫描一次,时间复杂度可认为是 O(a+b),较之最极端的 笛卡尔积 O(a*b),性能提高了不是一点半点。
  2. 为什么 bulid table 选择小表, probe table 选择大表? 因为在构建 hash table 时最好能将其全部加载到内存,这样才效率最高,这也说明了为什么 hash join 算法只适合至少有一个小表的场景,对于两个大表的join 场景并不适用,两个大表的 join ,可选用 sort merge join。

Hash join 总结: hash join 是传统数据库的单机 join 算法,在分布式环境下经过一定的分布式改造,说到底就是尽可能利用分布式计算资源进行分布式并行化计算,以提高整体的速度 。Hash join 的分布式改造一般有两种经典方案:

broadcast hash join

当有限维度表和事实表进行 join 时,为了避免 shuffle ,将有限维度表(不超过10M),也就是小表全部数据分发到大表所在的分区结点上,供事实表(大表)使用,分别并发地与大表上地分区记录进行 hash join。brocast hash join 使得excutor 上存储小表地全部数据,一定程度牺牲了空间,换取 shuffle 操作大量的耗时。

a. broadcast 阶段:将小表广播分发到大表所在的所有excutor 节点。广播算法有很多,最简单的先发给driver ,driver 在统一分发到所有的 excutor ,要不就是基于 bittorrete 的 p2p 思路。
b. hash join 阶段: 将每个 excutor 上执行单机版的 hash join,小表映射,大表试探。

shuffle hash join

一旦小表数据量较大,此时不在适合广播分发,这种情况下,利用 key 相同分区必然相同地原理,将两张表分别按照 join key 进行重新分区,这样相同 join key 的记录就会重分布到同一节点上,两张表中的数据会被重新分布到集群中的相应节点上(这个过程称为shuffle),再对表中的相应分区的数据分别进行 hash join ,这样即在一定程度上减少了 driver 广播一侧表的压力,也减少了 excutor 端拉取整张广播表的内存消耗,并充分利用了集群资源并行化。
image.png

Sort-Merge join

当两张表都非常大的时候,显然无论采用以上哪种 join 都会对计算内存造成很大压力,这是因为 join 时采用的hash join ,是将一侧表的数据完全加载到内存中,这时候就采用 sort-merge join 算法,主要分为三个阶段:

a. Shuffle 阶段:将两张大表根据 join key 重新分区,两张表的数据会分布到整个集群中,以便分布式的处理;
b. Sort 阶段:对单个分区节点的两表数据(一个节点上一个分区),分别进行排序;
c. Merge 阶段:对排序好的两张分区表数据执行 join 操作。 join 操作很简单,分别遍历两个有序序列,碰到相同 join key 就 Merge 输出,否则取更小一边继续往下遍历。

分析: 同样都需要shuffle ,sort-merge join还需要一个分区内排序过程,shuffle hash join 不是比 sort-merge join 更有效率吗?那为什么在两张大表的情况下使用 sort-merge join算法呢?这和 Spark 的shuffle 实现有关系,目前spark 的shuffle 实现都适用 sort-based shuffle 算法,因此在经过 hash shuffle 之后 partition 数据都是按照key 排序的,因此理论上可以认为是数据经过shuffle 之后不需要 sort的,可以直接 merge 。
经过以上分析,可以明确每种 join 算法都有自己的使用场景,数据库设计时最好避免大表与大表的 查询,SparkSQL 也可以根据内存资源、带宽资源适量将参数 spark.sql.autoBroadcastJoinThreshold(默认是 10M) 调大,让更多 join 实际执行为 broadcast hash join。

总结

Join 是传统数据库中的一个高级特性,尤其对于 mysql 而言更是,因为mysql 目前对 join 的支持还比较有限,只支持 Nested-Loop Join 算法,因此OLAP 场景下很少使用 mysql.不过目前Mysql 的新版本开始支持 hash join了,也许将来 Mysql 可以处理一些小规模的 OLAP 业务。
传统数据库单机模式做 join 的场景有限,所以也建议尽量少使用 join 。然而大数据领域不同,Join 是标配,OLAP 业务离不开表与表之间的关联,对 Join 的支持成熟度一定程度上决定了系统的性能,夸张点说,”得 join 者得天下 “