QuickSelect with BFPRT to Conquer Top-K Problem


 

QuickSelect with BFPRT to Conquer Top-K Problem

使用QuickSelect结合BDPRT算法解决Top-K问题

0x01. 简介

top-K问题:在一堆数中求其前 k 大或前 k 小的问题,简称 TOP-K 问题,例如在LeetCode上就有这么一道题:215. Kth Largest Element in an Array。在首次接触 TOP-K 问题时,第一反应估计是用快速排序对所有数据进行一次排序,然后取其前 k 即可,但是这么做有两个问题:

  1. 快速排序的平均复杂度为 O(nlgn),但最坏时间复杂度为 O(n2),不能始终保证较好的复杂度;
  2. 我们只需要前 k 大的,而对其余不需要的数也进行了排序,浪费了大量排序时间。

除此之外,也可以选择堆排序,维护一个大小为 k 的堆,时间复杂度为 O(nlogk)。那是否还存在更有效的方法呢?目前用于解决TOP-K问题较为普遍的算法是带有BFPRT 算法QuickSelect算法

QuickSelect算法:用于在一个乱序的集合中找到第k小的元素,即用于解决top-K问题。与快速排序算法一样,作者也是Tony Hoare,同时它在实际应用中也十分高效,有着良好的平均性能,不过也有着较差的最坏性能。

大体上,quickselect算法使用了与快排一样的过程,选择一个集合元素作为中轴点,基于中轴点把数据分成相应地大于或小于中轴点的两块,然而不同于快排在两个分块中递归排序,quickselect算法在确定要查找的元素在哪一块后,会只在那一块中递归选择,这样就将平均复杂度从 O(log n) 降到了 O(n),尽管最坏复杂度还是 O(n2)。

与快排一样,quickselect算法通常也是被实现为原地算法,除了用于解决top-K问题,它还可以用于局部排序,本文不作讨论。

BFPRT 算法:又称为中位数的中位数算法(Median of medians),该算法由 Blum、Floyd、Pratt、Rivest、Tarjan 提出,最坏时间复杂度为O(n),是一种非精确的选择算法,主要用于为quickselect算法其他选择算法提供一个良好的中轴点,从而解决top-k问题BFPRT 算法能在有限的线性级的时间内找到一个大致合适的中位数,当该中位数在quickselect算法中用作一个优化过的中轴点,quickselect算法在最坏情况下的复杂度将从平方级降到线性级。

0x02. 实现与解决

为了保证算法的线性运行时间,我们需要设计另一种方法,使所选的主元保证“接近”给定数字的中值。通过选择一个接近中值的主元,我们可以保证,在每次迭代中,我们将元素的搜索数量减少(几乎)一半。

  1. n个元素划为⌊n / 5⌋组,每组5个,至多只有一组由n mod 5个元素组成。
  2. 寻找这⌈n / 5⌉个组中每一个组的中位数,这个过程可以用插入排序。
  3. 对步骤2中的⌈n / 5⌉个中位数,重复步骤1和步骤2,递归下去,直到剩下一个数字。
  4. 最终剩下的数字即为pivot,把大于它的数全放左边,小于等于它的数全放右边。
  5. 判断pivot的位置与k的大小,有选择的对左边或右边递归。

上面的步骤中1~3步为BFPRT 算法的过程,用于为quickselect算法选择一个好的中轴点,而4~5步则是quickselect算法的内容,其实BFPRT 算法就相当于是quickselect算法的一个辅助插件一般。下面先介绍QuickSelect算法的分区与选择过程。

1. 分区

在快排中有一个子过程叫partition,它可以在线性时间内将数据分成大于和小于中轴点的两块,下面这个算法实现自维基百科中Quickselect词条中的partition函数,它会根据中轴点的最初下标将数据分界,确定中轴点最终排定位置。

/**
 * 利用主元下标 pivotIndex 进行对数组 arr[left, right] 划分,并返回
 * 划分后的分界线下标。
 * 类似快速排序的逻辑,都是将数组的元素与中轴点进行比较,但是并没有进行交换操作,
 * 只是分界后确定中轴点在数组中排定的位置
 */
private static int partition(int[] arr, int left, int right, int pivotIndex) {
    swap(arr, pivotIndex, right); // 把主元放置于末尾
    int partitionIndex = left; // 跟踪划分的分界线
    for (int i = left; i < right; i++) {
        if (arr[i] > arr[right]) {  // 如果求第k小元素,将>改为<
            swap(arr, partitionIndex++, i); // 比主元大的都放在左侧
        }
    }
    swap(arr, partitionIndex, right); // 最后把主元换回来
    return partitionIndex;
}

这就是Lomuto分区策略,它比Hoare的原始分区策略简单而高效。

2. 选择

在快排中,我们需要对左右两个分区进行递归排序,这带来了 O(n log n) 的最好时间复杂度,然而,当使用类似的方法进行选择时,由于中轴点已经排定,尽管其前后的元素都是乱序的,我们可以据此知道需要查找的元素位于哪一块,所以只需要在待查找元素所在的分区这种进行递归查找即可,由此可以写出下面的实现。

/**
 * 返回数组 arr[left, right] 的第 k 小数的下标
 * @param arr 待处理的数组
 * @param left 待处理的数列在数组中的起始索引
 * @param right 待处理的数列在数组中的终止索引
 * @param k 第k小
 * @return 第k小的数在数组中的索引
 */
public static int select(int[] arr, int left, int right, int k) {
    // 得到中位数的中位数下标(即主元下标)
    int pivotIndex = getPivotIndex(arr, left, right);
    // 进行划分,返回划分边界,即枢轴点的位置
    int partitionIndex = partition(arr, left, right, pivotIndex);
    // 由于枢轴点已经排定,如果其位置与待选择的k位置一样,即返回,否则继续递归查找
    int num = partitionIndex - left + 1;
    if (num == k)
        return partitionIndex;
    else if (num > k)  // 往左边继续找
        return select(arr, left, partitionIndex - 1, k);
    else    // 往右边继续找
        return select(arr, partitionIndex + 1, right, k - num);
}

注意与快速排序的相似之处,正如基于最小值的选择算法是部分选择排序一样,这也是部分快速排序,只生成和划分O(log n) / O(n)个分区。这个简单的过程具有预期的线性性能,并且像快速排序一样,在实践中具有相当好的性能。同时它也是一种原地算法,如果能够优化函数尾部递归调用,或者使用循环消除尾部递归,则只需要常量级内存开销:

/**
 * 返回数组 arr[left, right] 的第 k 小数的下标
 * @param arr 待处理的数组
 * @param left 待处理的数列在数组中的起始索引
 * @param right 待处理的数列在数组中的终止索引
 * @param k 第k小
 * @return 第k小的数在数组中的索引
 */
public int select(int[] arr, int left, int right, int k) {
    int pivotIndex, partitionIndex, num;
    for (;;) {
        if (left == right) return left;
        // 得到中位数的中位数下标(即主元下标)
        pivotIndex = getPivotIndex(arr, left, right);
        // 进行划分,返回划分边界,即枢轴点的位置
        partitionIndex = partition(arr, left, right, pivotIndex);
        // 由于枢轴点已经排定,如果其位置与待选择的k位置一样,即返回,否则继续递归查找
        num = partitionIndex - left + 1;

        if (num == k) return partitionIndex;
        else if (num > k)  // 往左边继续找
            right = partitionIndex - 1;
        else  { // 往右边继续找
            k -= num;
            left = partitionIndex + 1;
        }
    }
}

与快速排序类似,快速选择算法在平均状况下有着不错的表现,但是对于中轴点的选择十分敏感。如果中轴点选择上佳,搜索范围每次能够指数级减少,这样一来所耗时间是线性的(即O(n))。但如果中轴点选择非常不好,使得每次只能将搜索范围减少一个元素,则最糟的总体时间复杂度是平方级的(O(n2)):例如对一个升序排列的数组搜索其最大值,而每次都选择第一个元素作为中轴点。

根据中轴点的选择算法的差异,快速排序衍变出很多变体,其中最简单的快速排序变体是每次随机选择中轴点,这样可以达到近乎线性的复杂度。更为确定的做法是采用取三者中位数的中轴点选择策略,这样对已部分排序的数据依然能够达到线性复杂度。但是,特定人为设置的数组在此方法下仍然会导致最差时间复杂度,如大卫·穆塞尔所描述的取三者中位数杀手数列,这成为他发表反省式选择算法的动机。

利用中位数的中位数算法,可以在最坏情形下依然保证线性时间复杂度。但是这一方法中的基准值计算十分复杂,实际应用中并不常见。改进方法是在快速选择算法的基础上,使用中位数的中位数算法处理极端特例,这样可以保证平均状态与最差情形下的时间复杂度都是线性的,这也是反省式选择算法的做法。下面我们先来看中位数的中位数算法如何实现。

3. BFPRT 算法

下面的子过程是实际的BFPRT 算法,即中位数的中位数算法,由于它用于确定一个好的中轴点的位置,所以此处命名为getPivotIndex。可以看到,它将输入数组(长度为n)分组,每组最多有5个元素,用子过程subroutine计算每组的中位数,然后在这n/5个中位数中递归计算它们的中位数,即计算中位数的中位数。

/**
 * 中位数的中位数算法
 * 数组 arr[left, right] 每五个元素作为一组,并计算每组的中位数,\
 * 最后返回这些中位数的中位数下标(即主元下标)。
 *
 * 末尾返回语句最后一个参数多加一个 1 的作用其实就是向上取整的意思,
 * 这样可以始终保持 k 大于 0。
 * @param arr 待处理的数组
 * @param left 待处理的数列在数组中的起始索引
 * @param right 待处理的数列在数组中的终止索引
 * @return 中位数的中位数下标
 */
private static int getPivotIndex(int[] arr, int left, int right) {
    if (right - left < 5)
        return insertionSort(arr, left, right);
    int subRight = left - 1;
    // 每五个作为一组,求出中位数,并把这些中位数全部依次移动到数组左边
    for (int i = left; i + 4 <= right; i += 5) {
        int index = insertionSort(arr, i, i + 4);
        swap(arr, ++subRight, index);
    }
    // 利用 select 得到这些中位数的中位数下标(即主元下标)
    return select(arr, left, subRight, ((subRight - left + 1) >> 1) + 1);
}

可以看到在medianOfMedians的函数的最后调用了quickselect,而在quickselect中又调用了medianOfMedians,这就是所谓的 Mutual recursion。其中用于计算每组中位数的子过程subroutine可以使用插入排序,即先对每组的元素进行排序,然后返回排好序的子集中的中位数,下面的实现只返回子集的中位数索引。

/**
 * 对数组 array[left, right] 进行插入排序(从大到小)
 * @param arr 待处理的数组
 * @param left 待处理的数列在数组中的起始索引
 * @param right 待处理的数列在数组中的终止索引
 * @return [left, right] 的中位数。
 */
private static int insertionSort(int[] arr, int left, int right) {
    for (int i = left + 1; i <= right; i++) {
        // 如果求第k小元素,将>改为<
        for (int j = i; j > left && arr[j] > arr[j-1]; j--) {
            swap(arr, j-1, j);
        }
    }
    return ((right - left) >> 1) + left;
}

0x03. 测试与比较

为了检验BFPRT算法QuickSelect算法组合的效率,我将之与单轴快排Dual-Pivot Quick Sort进行了比较,下面是测试代码,先创建len长的数组,数组的每个元素初值与其索引一致,然后使用《Algorithm》第二章中用到的的一个工具 StdRandom.java 对数组进行打乱重排,这样做主要是为了不予考虑数据重复的情况,因为当数据存在重复时,排序跟选择的结果很可能会不一致。接着将原数组拷贝一份,用于后面的单轴排序,因为和双轴快排一样,它也是一个原地算法。

@Test
public void compareEfficiency() {
    int k = 10, len = 60000000; // 1 <= k <= arr.size
    int[] arr = new int[len], arr1 = new int[len];

    for (int i = 0; i < len; i++) {
        arr[i] = i;
    }

    StdRandom.shuffle(arr);
    System.arraycopy(arr, 0, arr1, 0, len);

    System.out.println("数组长度:" + len + "\t\tK:" + k);
    System.out.println(String.format("%-12s%-12s%-8s", "算法", "用时", "第k大的数"));
    // 因为是以 k 为划分,所以还可以求出第 k 大值
    long start1 = System.currentTimeMillis();
    int topK1 = arr[QuickSelectWithBFPRT.select(arr, 0, len-1, k)];
    long end1 = System.currentTimeMillis();
    System.out.println(String.format("%-12s%-12s%-8s", "快速选择", (end1 - start1), topK1));

    long start2 = System.currentTimeMillis();
    Arrays.sort(arr);
    int topK2 = arr[arr.length - k];
    long end2 = System.currentTimeMillis();
    System.out.println(String.format("%-12s%-12s%-8s", "双轴快排", (end2 - start2), topK2));

    long start3 = System.currentTimeMillis();
    SinglePivotQuick.sort(arr1);
    int topK3 = arr1[k - 1];
    long end3 = System.currentTimeMillis();
    System.out.println(String.format("%-12s%-12s%-8s", "单轴快排", (end3 - start3), topK3));
}

下面是分别使用快速选择算法、双轴快排和单轴快排在100万、5000万和6000万条乱序数据中找到第k小元素的时间对比。

数组长度:1000000		K:3
  算法         用时        第k大的数   
快速选择        75          999997  
双轴快排        211         999997  
单轴快排        190         999997  

数组长度:50000000		K:10
  算法          用时        第k大的数   
快速选择        1717        49999990
双轴快排        5670        49999990
单轴快排        7311        49999990

数组长度:60000000		K:10
 算法          用时         第k大的数   
快速选择        2395        59999990
双轴快排        7067        59999990
单轴快排        9214        59999990

通过比较上面的输出序列,可以看到两点

  • 相比于快速排序算法,结合了中位数的中位数的快速选择算法的效率明显更高。
  • 当待排序数组的元素个数较少时,双轴快排的效率往往会低于单轴快排,只有当待排序数组的元素个数提升到一定数量级时,双轴快排的优势才得以发挥。

除了具有更高效率外,快速选择算法也具有较好的空间分配,下面是使用快速选择结合中位数的中枢算法提交解决 215. Kth Largest Element in an Array 的空间分配情况(当然机器在每个时刻处理速度存在差异):

img

0x04. 参考

0x05. 相关文章

Java, Security developer https://jordonyang.github.io/ Guangzhou, China 本站所有文章如未说明均为原创,请勿随意转载,如需转载请联系我 (linfengit@qq.com)