自 Tony Hoare 在 1961 年发表了快速排序,其优异表现迅速得到开发者青睐,并被评为20世纪十大算法之一。但目前工业界使用大部分的 unstable 排序算法,已经从过去单一的排序算法转变成混合排序算法,以应对实践场景中各式各样的序列。本文将从应用场景入手,分析两种优化快速排序的思路,并尝试量化优化算法的效果。
快速排序采用了分治的思想,具体的过程是在一个 array 中选定一个 pivot(锚点),根据这个 pivot 可以把 array 分成两个的 sub-arrays,左 sub-array 的元素都小于 pivot,右 sub-array 的元素都大于 pivot。对于这些 sub-array,我们可以做同样的处理:选出 pivot 并进一步切分出两个 sub-arrays。当某个 sub-array 只有一个元素时,其本身有序,此时便可以退出循环。如此反复,最后得到整体的有序。
如果以 head 作为 pivot,一行 Scala 代码即可完成快速排序的 out-place 实现:

  1. def quickSort(arr: Array[Int]): Array[Int] = if arr.length < 2 then arr else quickSort(arr.filter(_ < arr.head)) :+ arr.head :++ quickSort(arr.filter(_ > arr.head))

但生产中更常用的是 in-place 的快排实现,同样用 Scala 编写:

  1. def quickSort(arr: Array[Int], l: Int, r: Int): Unit = if l < r then
  2. val pivot = choosePivot(arr, l, r)
  3. val cut = partition(arr, l, r, pivot)
  4. quickSort(arr, l, cut - 1)
  5. quickSort(arr, cut, r)

无论哪种快排实现,第 1、4、5 行都是必要的,只有 choosePivot 和 partition 有优化的空间,需要重点关注。

改进 choosePivot

Hore 实现的 choosePivot 以 head 作为 pivot,有可能会遇到数据倾斜,即二分中的其中一个子数组长度为0,同时另一个子数组包含了上轮递归中所有的元素。这是快速排序的 worst case,时间复杂度会退化到 O(n^2)。

  1. def choosePivot(arr: Array[Int], from: Int, to: Int): Int = arr(head)

为避免这种情况发生,最理想是选择数列中的中位数 median 作为 pivot。

median

寻找无序数组的中位数有两个思路:分治、最大堆 + 最小堆。先看看分治思路的实现:

  1. import scala.annotation.tailrec
  2. @tailrec def findKMedian(arr: Array[Double], k: Int)(implicit choosePivot: Array[Double] => Double): Double = {
  3. val a = choosePivot(arr)
  4. val (s, b) = arr partition (a >)
  5. if s.size == k then a
  6. else if s.isEmpty then {
  7. val (s, b) = arr partition (a ==)
  8. if s.size > k then a
  9. else findKMedian(b, k - s.size)
  10. } else if s.size < k then findKMedian(b, k - s.size)
  11. else findKMedian(s, k)
  12. }
  13. def findMedian(arr: Array[Double])(implicit choosePivot: Array[Double] => Double) = findKMedian(arr, (arr.size - 1) / 2)
  14. def choosePivot(arr: Array[Double]): Double = arr(scala.util.Random.nextInt(arr.size))

这里我们实现了一个 out-place 的分治寻找中位数算法。可以发现 choosePivot 依然要求选出 pivot,我们的实现中选择了 random,但同样不可避免数据倾斜。
如果数组规模不大,我们完全可以用空间换时间:

  1. class Median() {
  2. val (hi,lo) = (PriorityQueue.empty[Int].reverse, PriorityQueue.empty[Int])
  3. def add(num: Int): Median = {
  4. lo.enqueue(num)
  5. val highestLow = lo.dequeue()
  6. hi.enqueue(highestLow)
  7. if hi.size > lo.size then {
  8. val lowestHigh = hi.dequeue()
  9. lo.enqueue(lowestHigh)
  10. }
  11. this
  12. }
  13. def find(): Int = {
  14. if lo.size > hi.size then lo.head else (hi.head + lo.head) / 2.0
  15. }
  16. }

对整个数组寻找中位数是个昂贵的操作,因此我们并不寻求找到真正的 median,只要找到一个接近中位数的元素即可缓解数据倾斜的问题。

median of three

为了寻找接近的中位数,可以通过采样的方式。median of three 顾名思义是选取3个值,并且计算其中位数作为 pivot。例如 [3,9,7,5,1],可以对 3 7 1 计算出的中位数 3 作为 pivot。这种方式相比于首个元素的方式会更加合理,因为采样了多个元素,不容易受到极端情况的影响,往往会比首个元素的方式有更好的效果。

  1. def choosePivot(arr: Array[Int], from: Int, to: Int): Int = findMedian(Array(arr(from), arr((from + to) / 2),arr(to)))

Tukey’s ninther

此方法由 John Tukey 提出,本质是在 median of three 的基础上,增加3倍采样率。median of nine 会取三个元素及其相邻两个元素的 median,然后再取这个三个 medians 的 median。比如 [1,2,3,4,5,6,7,8,9],先对 [1,2,3],[4,5,6],[7,8,9] 分别取中位数 2,5,8,然后在此基础上取中位数 5。

median of medians

该方法采样步骤先将 array 分为 n/5 个 sub-arrays,n 为 array 的长度。然后将这些 sub-arrays 的 medians 都取出,选取这些 medians 中的 median,重复以上步骤,最后得到一个 median of medians 作为最后的 pivot。

  1. def medianOfMedians(arr: Array[Double]): Double = {
  2. val medians = arr grouped 5 map medianUpTo5 toArray;
  3. if (medians.size <= 5) medianUpTo5 (medians)
  4. else medianOfMedians(medians)
  5. }
  6. def medianUpTo5(five: Array[Double]): Double = {
  7. def order2(a: Array[Double], i: Int, j: Int) =
  8. if a(i) > a(j) then val t = a(i); a(i) = a(j); a(j) = t
  9. def pairs(a: Array[Double], i: Int, j: Int, k: Int, l: Int) =
  10. if a(i) < a(k) then order2(a,j,k); a(j) else order2(a,i,l); a(i)
  11. if five.length < 2 then return five(0)
  12. order2(five,0,1)
  13. if five.length < 4 then return (
  14. if (five.length==2 || five(2) < five(0)) five(0)
  15. else if (five(2) > five(1)) five(1)
  16. else five(2)
  17. )
  18. order2(five,2,3)
  19. if five.length < 5 then pairs(five,0,1,2,3)
  20. else if five(0) < five(2) then order2(five,1,4); pairs(five,1,4,2,3)
  21. else order2(five,3,4); pairs(five,0,1,3,4)
  22. }

该方案同样是对采样率的取舍,由于加大了 pivot 的采样范围,在 array 长度较长的情况下理论表现会更好。

改进 partition

Hore’s 的 partition 实现中,使用左右指针遍历数组,并依据 pivot 交换数组中的值,当左右指针碰撞时结束循环,此时的数组局部有序。

  1. def partition(arr: Array[Int], from: Int, to: Int, pivot: Int): Int = {
  2. var (l, r) = (from, to)
  3. while l < r do
  4. while arr(l) < pivot do l += 1
  5. while arr(r) > pivot do r -= 1
  6. if l < r then swap(arr, l, r); l += 1; r -= 1
  7. l
  8. }

第 4、5 行根据 pivot 移动左右指针,是实现 partition 必要的步骤,无优化的空间。因此需要关注第6行。

分支预测

在具有长流水线的现代处理器上(Intel Haswell、Broad-well、Skylake、Kaby Lake 处理器为 14 个阶段,较早的 Pentium 4 处理器甚至是两倍以上),每个分支预测错误都会导致执行的相当长的中断,因为流水线必须清空,然后重新写入。
Kaligosi 和 Sanders 在 2006 年的论文中分析了分支错误预测对 Quicksort 的影响。他们测试了不同的简单分支预测方案,包含 static prediction、1-bit, 2-bit 预测器,在这些方案中,使用随机元素作为 pivot 的快速排序会遇到平均 cn log n + O(n) 个分支错误预测。极端情况下,使用随机 pivot 每四次比较就会出现一个错误的预测。
于是优化的方向有了:减少分支预测。Stefan Edelkamp 和 Armin Weiß 在 2016 年的 ACM 论文中提出 BlockQuickSort,原理是在 partition 一个长序列的时候,先存储需要交换的元素,后续统一放到真正的序列中。

  1. def blockPartition(arr: Array[Int], from: Int, to: Int, pivot: Int): Int = {
  2. var (l, r) = (from, to)
  3. val offsetsL, offsetsR = new Array[Int](B)
  4. var startL, startR, numL, numR = 0
  5. while r - l + 1 > 2 * B do
  6. if numL == 0 then
  7. startL = 0
  8. for i <- 0 until B do
  9. offsetsL(numL) = i
  10. numL += (if pivot >= arr(l + i) then 1 else 0)
  11. if numR == 0 then
  12. startR = 0
  13. for i <- 0 until B do
  14. offsetsR(numR) = i
  15. numR += (if pivot <= arr(l + i) then 1 else 0)
  16. val num = numL min numR
  17. for j <- 0 until num do swap(arr, l + offsetsL(startL + j), r - offsetsR(startR + j))
  18. numL -= num
  19. numR -= num
  20. if numL == 0 then l += B
  21. if numR == 0 then r -= B
  22. partition(arr, l, r, pivot)
  23. }

此处的 B 是一个常数,我们可以根据实际序列长度调优,因为当 r - l + 1 > 2 * B 条件不满足的情况下,算法会退化到 Hore’s Partition。

基准测试

场景

日常实践中遇到的序列大多是有规律的,如果按频率从大到小排序,可以得到以下场景:

basically sorted

基本有序,数列整体单调,但局部非单调。可以通过交换有序数列中的部分数字模拟生成。

unsorted

无序。不要求真随机的情况下,可以使用 Random.nextInt()模拟。

partially sorted

部分有序,数列整体非单调,但局部有序。比如 1,3,5,7,2,4,6,8,可以把有序数列中的元素 mod 8 得到。

sorted

整体有序,整体单调递增、递减。出现频率较低,直接使用 Range 生成有序序列即可。

all equal

全部元素相同。出现频率极低,直接生成相同的数字数列即可。

结论

快速排序有 in-place 和 out-place 实现,对于生产最常用的 in-place 实现我们有两个优化方向:选择 pivot 和 优化分区效率。
其中选择 pivot 可以通过采样手段寻找一个接近的中位数,众多优化算法无外乎调整采样率,实际测试下来效果最显著的是 median of three。
对于优化分区效率,需要认识到现代处理器的超长流水线(14步)导致分支预测失败代价高昂。通过缓存可以把元素交换滞后,并批量处理,从而降低分治预测失败次数。

参考文献

排序基准测试
CPU流水线
BlockQuicksort: Avoiding Branch Mispredictions in Quicksort