Skip to content

第20章 高级排序算法

学习目标

  • 掌握混合排序算法的设计原理
  • 理解TimSort的自适应特性与正确性证明
  • 掌握内省排序的递归深度控制策略
  • 理解外部排序的理论基础与实现
  • 掌握并行排序算法的基本原理

20.1 混合排序算法理论

20.1.1 设计原则

定义 20.1(混合排序) 混合排序算法结合多种基本排序策略,在不同输入规模或数据特性下切换策略,以获得最优整体性能。

定理 20.1(混合策略有效性) 若算法 $A_1$ 在规模 $n < n_0$ 时优于 $A_2$,而 $A_2$ 在大规模时更优,则混合算法可达到两者优势。

证明:设 $T_1(n)$ 和 $T_2(n)$ 分别为两算法的时间复杂度。混合算法:

$$T_{\text{hybrid}}(n) = \begin{cases} T_1(n) & n < n_0 \ T_2(n) & n \geq n_0 \end{cases}$$

满足 $T_{\text{hybrid}}(n) \leq \min(T_1(n), T_2(n))$ 对所有 $n$ 成立。 ∎

20.1.2 策略切换点

定义 20.2(切换阈值) 切换阈值 $n_0$ 是算法从一种策略切换到另一种策略的输入规模临界点。

定理 20.2(最优切换点) 对于插入排序与快速排序的混合,最优切换点 $n_0 \approx 10 \sim 20$。

证明:插入排序对小数组的常数因子更优。设插入排序比较次数为 $\frac{n(n-1)}{2}$,快速排序递归开销为 $c \cdot n \log n + d$。令两者相等:

$$\frac{n_0^2}{2} = c \cdot n_0 \log n_0 + d$$

实验表明 $n_0 \approx 16$ 时性能最优。 ∎


20.2 TimSort算法

20.2.1 算法概述

定义 20.3(TimSort) TimSort是由Tim Peters于2002年设计的自适应、稳定、混合排序算法,结合归并排序和插入排序的优点。

算法 20.1(TimSort)

TimSort(A):
    minrun = CalculateMinRun(len(A))
    runs = []
    
    // 找到自然运行序列
    i = 0
    while i < len(A):
        run = FindNaturalRun(A, i)
        if len(run) < minrun:
            run = ExtendRun(A, i, minrun)
            BinaryInsertionSort(run)
        runs.append(run)
        i += len(run)
    
    // 归并运行序列
    while len(runs) > 1:
        MaintainInvariants(runs)
    
    return A

20.2.2 自然运行序列

定义 20.4(运行序列) 运行序列是数组中已排序(递增或严格递减)的连续子序列。

定义 20.5(MinRun) MinRun是运行序列的最小长度,通常选择 $16 \leq \text{minrun} \leq 32$。

定理 20.3(MinRun选择) 最优MinRun满足:

$$\text{minrun} = \min(32, \text{最高有效位为1的最高次幂})$$

证明:选择 $16 \leq \text{minrun} \leq 32$ 确保:

  1. 插入排序在小数组上高效
  2. 运行序列数量约为 $\frac{n}{\text{minrun}}$
  3. 归并次数适中

当 $n$ 接近 $2^k$ 时,归并树平衡。 ∎

20.2.3 归并不变式

定义 20.6(TimSort不变式) 设运行序列栈为 $S = [R_1, R_2, \ldots, R_k]$,需满足:

  1. $|R_k| > |R_{k-1}| + |R_{k-2}|$
  2. $|R_{k-1}| > |R_{k-2}|$

定理 20.4(不变式维护) 维护上述不变式确保归并次数为 $O(n \log n)$。

证明:设栈中运行序列长度为 $l_1, l_2, \ldots, l_k$。由不变式:

$$l_i > l_{i+1} + l_{i+2}$$

这类似于斐波那契数列的逆序。栈深度为 $O(\log n)$,每次归并代价 $O(n)$,总代价 $O(n \log n)$。 ∎

20.2.4 Galloping模式

定义 20.7(Galloping模式) 当一个运行序列连续多次"获胜"(其元素先被取完)时,切换到Galloping模式,使用指数搜索加速。

定理 20.5(Galloping效率) 对于运行序列 $X$ 和 $Y$,若 $X$ 中有 $k$ 个连续元素小于 $Y$ 的最小元素,Galloping模式可在 $O(\log k)$ 次比较后找到位置。

证明:指数搜索先检查 $X[1], X[2], X[4], X[8], \ldots$ 直到找到大于 $Y[0]$ 的元素,然后二分查找。总比较次数:

$$2\lceil \log_2 k \rceil + 1 = O(\log k)$$

20.2.5 Python实现

python
from typing import List, TypeVar, Callable, Optional
import math

T = TypeVar('T')

class TimSort:
    MIN_MERGE = 32
    
    def __init__(self, arr: List[T], key: Optional[Callable[[T], any]] = None, reverse: bool = False):
        self.arr = arr
        self.key = key
        self.reverse = reverse
        self.min_merge = self._calc_min_run(len(arr))
    
    def _calc_min_run(self, n: int) -> int:
        r = 0
        while n >= self.MIN_MERGE:
            r |= n & 1
            n >>= 1
        return n + r
    
    def _get_key(self, x: T) -> any:
        return self.key(x) if self.key else x
    
    def _binary_insertion_sort(self, left: int, right: int) -> None:
        for i in range(left + 1, right + 1):
            current = self.arr[i]
            key_val = self._get_key(current)
            
            lo, hi = left, i - 1
            while lo <= hi:
                mid = (lo + hi) // 2
                mid_key = self._get_key(self.arr[mid])
                if (mid_key <= key_val) != self.reverse:
                    lo = mid + 1
                else:
                    hi = mid - 1
            
            for j in range(i, lo, -1):
                self.arr[j] = self.arr[j - 1]
            self.arr[lo] = current
    
    def _count_run(self, start: int) -> int:
        if start >= len(self.arr) - 1:
            return 1
        
        run_len = 2
        ascending = self._get_key(self.arr[start]) <= self._get_key(self.arr[start + 1])
        
        if ascending:
            while start + run_len < len(self.arr):
                if self._get_key(self.arr[start + run_len - 1]) > self._get_key(self.arr[start + run_len]):
                    break
                run_len += 1
        else:
            while start + run_len < len(self.arr):
                if self._get_key(self.arr[start + run_len - 1]) <= self._get_key(self.arr[start + run_len]):
                    break
                run_len += 1
            self._reverse(start, start + run_len - 1)
        
        return run_len
    
    def _reverse(self, left: int, right: int) -> None:
        while left < right:
            self.arr[left], self.arr[right] = self.arr[right], self.arr[left]
            left += 1
            right -= 1
    
    def _merge(self, left: int, mid: int, right: int) -> None:
        left_arr = self.arr[left:mid]
        right_arr = self.arr[mid:right + 1]
        
        i = j = 0
        k = left
        
        while i < len(left_arr) and j < len(right_arr):
            left_key = self._get_key(left_arr[i])
            right_key = self._get_key(right_arr[j])
            
            if (left_key <= right_key) != self.reverse:
                self.arr[k] = left_arr[i]
                i += 1
            else:
                self.arr[k] = right_arr[j]
                j += 1
            k += 1
        
        while i < len(left_arr):
            self.arr[k] = left_arr[i]
            i += 1
            k += 1
        
        while j < len(right_arr):
            self.arr[k] = right_arr[j]
            j += 1
            k += 1
    
    def sort(self) -> List[T]:
        n = len(self.arr)
        if n < 2:
            return self.arr
        
        runs = []
        i = 0
        
        while i < n:
            run_len = self._count_run(i)
            
            if run_len < self.min_merge:
                force_len = min(run_len, n - i)
                self._binary_insertion_sort(i, i + force_len - 1)
                run_len = force_len
            
            runs.append((i, i + run_len - 1))
            i += run_len
        
        while len(runs) > 1:
            new_runs = []
            for j in range(0, len(runs) - 1, 2):
                left, mid = runs[j]
                mid2, right = runs[j + 1]
                self._merge(left, mid2, right)
                new_runs.append((left, right))
            if len(runs) % 2 == 1:
                new_runs.append(runs[-1])
            runs = new_runs
        
        return self.arr

def timsort(arr: List[T], key: Optional[Callable[[T], any]] = None, reverse: bool = False) -> List[T]:
    sorter = TimSort(arr.copy(), key, reverse)
    return sorter.sort()

20.2.6 复杂度分析

定理 20.6(TimSort复杂度)

  • 最好情况:$O(n)$(已排序数组)
  • 平均情况:$O(n \log n)$
  • 最坏情况:$O(n \log n)$
  • 空间复杂度:$O(n)$

证明

  • 最好情况:数组已排序,每个运行序列长度为 $n$,无需归并,仅扫描 $O(n)$。
  • 平均/最坏情况:运行序列数量为 $O(n/\text{minrun}) = O(n)$,归并代价 $O(n \log n)$。
  • 空间:归并需要临时数组,最坏 $O(n)$。 ∎

20.3 内省排序

20.3.1 算法设计

定义 20.8(内省排序) 内省排序(Introsort)是快速排序和堆排序的混合,当递归深度超过阈值时切换到堆排序。

算法 20.2(Introsort)

Introsort(A, depth_limit):
    if len(A) <= 16:
        InsertionSort(A)
    elif depth_limit == 0:
        HeapSort(A)
    else:
        pivot = Partition(A)
        Introsort(A[0:pivot], depth_limit - 1)
        Introsort(A[pivot+1:], depth_limit - 1)

20.3.2 深度限制

定理 20.7(深度限制选择) 最优深度限制为 $2\lfloor \log_2 n \rfloor$。

证明:快速排序平均递归深度为 $O(\log n)$。最坏情况深度为 $n$。选择 $2\log n$:

  • 正常情况下不会触发堆排序
  • 避免最坏情况的 $O(n^2)$ 复杂度

切换到堆排序后保证 $O(n \log n)$。 ∎

20.3.3 Python实现

python
import math
from typing import List, TypeVar, Optional

T = TypeVar('T')

def introsort(arr: List[T]) -> List[T]:
    result = arr.copy()
    n = len(result)
    
    if n <= 1:
        return result
    
    max_depth = 2 * int(math.log2(n))
    _introsort_helper(result, 0, n - 1, max_depth)
    _insertion_sort_range(result, 0, n - 1)
    
    return result

def _introsort_helper(arr: List[T], low: int, high: int, depth: int) -> None:
    size = high - low + 1
    
    if size <= 16:
        return
    
    if depth == 0:
        _heapsort_range(arr, low, high)
        return
    
    pivot = _partition(arr, low, high)
    
    if pivot > low + 1:
        _introsort_helper(arr, low, pivot - 1, depth - 1)
    if pivot < high - 1:
        _introsort_helper(arr, pivot + 1, high, depth - 1)

def _partition(arr: List[T], low: int, high: int) -> int:
    mid = (low + high) // 2
    
    if arr[mid] < arr[low]:
        arr[low], arr[mid] = arr[mid], arr[low]
    if arr[high] < arr[low]:
        arr[low], arr[high] = arr[high], arr[low]
    if arr[mid] < arr[high]:
        arr[mid], arr[high] = arr[high], arr[mid]
    
    pivot = arr[high]
    i = low - 1
    
    for j in range(low, high):
        if arr[j] <= pivot:
            i += 1
            arr[i], arr[j] = arr[j], arr[i]
    
    arr[i + 1], arr[high] = arr[high], arr[i + 1]
    return i + 1

def _heapsort_range(arr: List[T], low: int, high: int) -> None:
    def heapify(root: int, size: int) -> None:
        largest = root
        left = 2 * (root - low) + 1 + low
        right = 2 * (root - low) + 2 + low
        
        if left <= high and left < low + size and arr[left] > arr[largest]:
            largest = left
        if right <= high and right < low + size and arr[right] > arr[largest]:
            largest = right
        
        if largest != root:
            arr[root], arr[largest] = arr[largest], arr[root]
            heapify(largest, size)
    
    size = high - low + 1
    
    for i in range(low + size // 2 - 1, low - 1, -1):
        heapify(i, size)
    
    for i in range(high, low, -1):
        arr[low], arr[i] = arr[i], arr[low]
        heapify(low, i - low)

def _insertion_sort_range(arr: List[T], low: int, high: int) -> None:
    for i in range(low + 1, high + 1):
        key = arr[i]
        j = i - 1
        while j >= low and arr[j] > key:
            arr[j + 1] = arr[j]
            j -= 1
        arr[j + 1] = key

20.3.4 复杂度分析

定理 20.8(Introsort复杂度) Introsort的时间复杂度为 $O(n \log n)$,空间复杂度为 $O(\log n)$。

证明

  • 若递归深度未超过阈值,行为同快速排序,期望 $O(n \log n)$
  • 若超过阈值,切换到堆排序,保证 $O(n \log n)$
  • 空间:递归栈深度 $O(\log n)$,堆排序原地


20.4 外部排序

20.4.1 问题定义

定义 20.9(外部排序) 外部排序处理无法全部装入内存的大规模数据,使用磁盘作为辅助存储。

定义 20.10(I/O复杂度) 外部排序的代价主要由磁盘I/O次数决定。

定理 20.9(I/O下界) 排序 $N$ 个元素,内存大小为 $M$,块大小为 $B$,I/O下界为:

$$\Omega\left(\frac{N}{B} \log_{M/B} \frac{N}{B}\right)$$

20.4.2 外部归并排序

算法 20.3(外部归并排序)

ExternalMergeSort(file, M):
    // 第一阶段:生成初始运行序列
    runs = []
    while not EOF(file):
        block = ReadNextBlock(file, M)
        SortInMemory(block)
        run_file = WriteToDisk(block)
        runs.append(run_file)
    
    // 第二阶段:多路归并
    while len(runs) > 1:
        new_runs = []
        for each group of M-1 runs:
            merged = KWayMerge(group)
            new_runs.append(merged)
        runs = new_runs
    
    return runs[0]

20.4.3 替换选择

定义 20.11(替换选择) 替换选择算法利用堆生成平均长度为 $2M$ 的初始运行序列。

定理 20.10(替换选择运行长度) 对于随机输入,替换选择生成的运行序列期望长度为 $2M$。

证明:设堆中有 $M$ 个元素。每次输出最小元素后,若新元素大于等于输出元素,加入堆;否则等待下一轮。期望一半元素满足条件,故运行长度期望为 $2M$。 ∎

20.4.4 Python实现

python
from typing import List, TypeVar, Iterator, Generator
import heapq
import tempfile
import os

T = TypeVar('T')

class ExternalSort:
    def __init__(self, memory_size: int = 1000):
        self.memory_size = memory_size
    
    def sort(self, data: Iterator[T]) -> Iterator[T]:
        runs = list(self._create_runs(data))
        
        while len(runs) > 1:
            runs = list(self._merge_pass(runs))
        
        if runs:
            yield from runs[0]
    
    def _create_runs(self, data: Iterator[T]) -> Generator[Iterator[T], None, None]:
        buffer = []
        
        for item in data:
            buffer.append(item)
            if len(buffer) >= self.memory_size:
                buffer.sort()
                yield iter(buffer)
                buffer = []
        
        if buffer:
            buffer.sort()
            yield iter(buffer)
    
    def _merge_pass(self, runs: List[Iterator[T]]) -> Generator[Iterator[T], None, None]:
        for i in range(0, len(runs), self.memory_size - 1):
            group = runs[i:i + self.memory_size - 1]
            yield self._k_way_merge(group)
    
    def _k_way_merge(self, runs: List[Iterator[T]]) -> Iterator[T]:
        heap = []
        
        for i, run in enumerate(runs):
            try:
                item = next(run)
                heap.append((item, i, run))
            except StopIteration:
                pass
        
        heapq.heapify(heap)
        
        while heap:
            item, i, run = heapq.heappop(heap)
            yield item
            
            try:
                next_item = next(run)
                heapq.heappush(heap, (next_item, i, run))
            except StopIteration:
                pass

def external_sort(data: List[T], memory_size: int = 1000) -> List[T]:
    sorter = ExternalSort(memory_size)
    return list(sorter.sort(iter(data)))

20.5 并行排序

20.5.1 并行排序模型

定义 20.12(PRAM模型) 并行随机存取机(PRAM)模型假设 $p$ 个处理器可同时访问共享内存。

定义 20.13(并行复杂度) 并行算法的复杂度由以下度量:

  • 时间复杂度 $T(n, p)$:使用 $p$ 个处理器的时间
  • 工作量 $W(n) = p \cdot T(n, p)$:总操作数
  • 加速比 $S = T_{\text{sequential}} / T_{\text{parallel}}$

20.5.2 并行归并排序

定理 20.11(并行归并排序复杂度) 使用 $p = n$ 个处理器,并行归并排序时间为 $O(\log n)$,工作量为 $O(n \log n)$。

证明

  • 并行划分:$O(1)$ 时间,$O(n)$ 工作
  • 递归排序:$O(\log n)$ 深度
  • 并行归并:$O(\log n)$ 时间

总时间 $O(\log n)$,工作量 $O(n \log n)$,最优。 ∎

20.5.3 Python实现(多线程)

python
from typing import List, TypeVar, Callable, Optional
from concurrent.futures import ThreadPoolExecutor
import math

T = TypeVar('T')

def parallel_merge_sort(arr: List[T], num_threads: int = 4) -> List[T]:
    if len(arr) <= 1:
        return arr.copy()
    
    if len(arr) < 1000 or num_threads == 1:
        return _sequential_merge_sort(arr)
    
    mid = len(arr) // 2
    
    with ThreadPoolExecutor(max_workers=2) as executor:
        left_future = executor.submit(parallel_merge_sort, arr[:mid], num_threads // 2)
        right_future = executor.submit(parallel_merge_sort, arr[mid:], num_threads // 2)
        
        left = left_future.result()
        right = right_future.result()
    
    return _merge(left, right)

def _sequential_merge_sort(arr: List[T]) -> List[T]:
    if len(arr) <= 1:
        return arr.copy()
    
    mid = len(arr) // 2
    left = _sequential_merge_sort(arr[:mid])
    right = _sequential_merge_sort(arr[mid:])
    
    return _merge(left, right)

def _merge(left: List[T], right: List[T]) -> List[T]:
    result = []
    i = j = 0
    
    while i < len(left) and j < len(right):
        if left[i] <= right[j]:
            result.append(left[i])
            i += 1
        else:
            result.append(right[j])
            j += 1
    
    result.extend(left[i:])
    result.extend(right[j:])
    return result

def parallel_quick_sort(arr: List[T], num_threads: int = 4) -> List[T]:
    if len(arr) <= 1:
        return arr.copy()
    
    if len(arr) < 1000 or num_threads == 1:
        return sorted(arr)
    
    pivot = arr[len(arr) // 2]
    left = [x for x in arr if x < pivot]
    middle = [x for x in arr if x == pivot]
    right = [x for x in arr if x > pivot]
    
    with ThreadPoolExecutor(max_workers=2) as executor:
        left_future = executor.submit(parallel_quick_sort, left, num_threads // 2)
        right_future = executor.submit(parallel_quick_sort, right, num_threads // 2)
        
        return left_future.result() + middle + right_future.result()

20.6 排序网络

20.6.1 比较网络

定义 20.14(比较器) 比较器是一个二元操作,输入 $(a, b)$ 输出 $(\min(a,b), \max(a,b))$。

定义 20.15(排序网络) 排序网络是由比较器组成的有向无环图,对任意输入都能正确排序。

定义 20.16(深度) 排序网络的深度是从输入到输出的最长路径上的比较器数量。

20.6.2 经典排序网络

定理 20.12(双调排序网络) 双调排序网络对 $n$ 个元素排序,深度为 $O(\log^2 n)$。

定理 20.13(AKS网络) AKS排序网络深度为 $O(\log n)$,但常数因子极大。

20.6.3 Python模拟

python
from typing import List, Tuple

def compare_swap(arr: List[int], i: int, j: int, ascending: bool = True) -> None:
    if (arr[i] > arr[j]) == ascending:
        arr[i], arr[j] = arr[j], arr[i]

def bitonic_sort_network(arr: List[int], low: int, cnt: int, ascending: bool = True) -> None:
    if cnt <= 1:
        return
    
    k = cnt // 2
    
    for i in range(low, low + k):
        compare_swap(arr, i, i + k, ascending)
    
    bitonic_sort_network(arr, low, k, ascending)
    bitonic_sort_network(arr, low + k, k, ascending)

def bitonic_merge_network(arr: List[int], low: int, cnt: int, ascending: bool = True) -> None:
    if cnt <= 1:
        return
    
    k = cnt // 2
    
    for i in range(low, low + k):
        compare_swap(arr, i, i + k, ascending)
    
    bitonic_merge_network(arr, low, k, ascending)
    bitonic_merge_network(arr, low + k, k, ascending)

def bitonic_sort(arr: List[int]) -> List[int]:
    result = arr.copy()
    n = len(result)
    
    padded_n = 1
    while padded_n < n:
        padded_n *= 2
    
    result.extend([float('inf')] * (padded_n - n))
    
    for size in [2**i for i in range(1, int(math.log2(padded_n)) + 1)]:
        for low in range(0, padded_n, 2 * size):
            bitonic_merge_network(result, low, size, True)
            bitonic_merge_network(result, low + size, size, False)
    
    return result[:n]

20.7 算法比较

20.7.1 高级排序算法对比

算法时间复杂度空间稳定性特点
TimSort$O(n) \sim O(n \log n)$$O(n)$稳定自适应,Python默认
Introsort$O(n \log n)$$O(\log n)$不稳定C++ STL默认
外部归并排序$O(n \log n)$ I/O$O(M)$稳定大数据排序
并行归并排序$O(\log n)$ 时间$O(n)$稳定多核并行

20.7.2 选择指南

场景推荐算法原因
通用排序TimSort自适应,稳定
内存受限Introsort原地,高效
大数据外部排序磁盘友好
多核环境并行排序利用并行性

20.8 本章小结

本章介绍了高级排序算法:

  1. 混合排序:结合多种策略,适应不同输入
  2. TimSort:自适应、稳定,Python默认排序
  3. Introsort:快速排序与堆排序混合,C++默认
  4. 外部排序:处理大规模数据的磁盘排序
  5. 并行排序:利用多处理器加速排序

参考文献

  1. Peters, T. (2002). Timsort. Python source code.
  2. Musser, D. R. (1997). Introspective sorting and selection algorithms. Software: Practice and Experience, 27(8), 983-993.
  3. Knuth, D. E. (1998). The Art of Computer Programming, Volume 3: Sorting and Searching. Addison-Wesley.
  4. Ajtai, M., Komlós, J., & Szemerédi, E. (1983). An $O(n \log n)$ sorting network. STOC, 1-9.
  5. Cormen, T. H., et al. (2009). Introduction to Algorithms, 3rd ed. MIT Press.

下一章:第21章 线性时间排序

Python技术丛书 - 江苏省宿城中等专业学校