第20章 高级排序算法
学习目标
- 掌握混合排序算法的设计原理
- 理解TimSort的自适应特性与正确性证明
- 掌握内省排序的递归深度控制策略
- 理解外部排序的理论基础与实现
- 掌握并行排序算法的基本原理
20.1 混合排序算法理论
20.1.1 设计原则
定义 20.1(混合排序) 混合排序算法结合多种基本排序策略,在不同输入规模或数据特性下切换策略,以获得最优整体性能。
定理 20.1(混合策略有效性) 若算法 $A_1$ 在规模 $n < n_0$ 时优于 $A_2$,而 $A_2$ 在大规模时更优,则混合算法可达到两者优势。
证明:设 $T_1(n)$ 和 $T_2(n)$ 分别为两算法的时间复杂度。混合算法:
$$T_{\text{hybrid}}(n) = \begin{cases} T_1(n) & n < n_0 \ T_2(n) & n \geq n_0 \end{cases}$$
满足 $T_{\text{hybrid}}(n) \leq \min(T_1(n), T_2(n))$ 对所有 $n$ 成立。 ∎
20.1.2 策略切换点
定义 20.2(切换阈值) 切换阈值 $n_0$ 是算法从一种策略切换到另一种策略的输入规模临界点。
定理 20.2(最优切换点) 对于插入排序与快速排序的混合,最优切换点 $n_0 \approx 10 \sim 20$。
证明:插入排序对小数组的常数因子更优。设插入排序比较次数为 $\frac{n(n-1)}{2}$,快速排序递归开销为 $c \cdot n \log n + d$。令两者相等:
$$\frac{n_0^2}{2} = c \cdot n_0 \log n_0 + d$$
实验表明 $n_0 \approx 16$ 时性能最优。 ∎
20.2 TimSort算法
20.2.1 算法概述
定义 20.3(TimSort) TimSort是由Tim Peters于2002年设计的自适应、稳定、混合排序算法,结合归并排序和插入排序的优点。
算法 20.1(TimSort)
TimSort(A):
minrun = CalculateMinRun(len(A))
runs = []
// 找到自然运行序列
i = 0
while i < len(A):
run = FindNaturalRun(A, i)
if len(run) < minrun:
run = ExtendRun(A, i, minrun)
BinaryInsertionSort(run)
runs.append(run)
i += len(run)
// 归并运行序列
while len(runs) > 1:
MaintainInvariants(runs)
return A20.2.2 自然运行序列
定义 20.4(运行序列) 运行序列是数组中已排序(递增或严格递减)的连续子序列。
定义 20.5(MinRun) MinRun是运行序列的最小长度,通常选择 $16 \leq \text{minrun} \leq 32$。
定理 20.3(MinRun选择) 最优MinRun满足:
$$\text{minrun} = \min(32, \text{最高有效位为1的最高次幂})$$
证明:选择 $16 \leq \text{minrun} \leq 32$ 确保:
- 插入排序在小数组上高效
- 运行序列数量约为 $\frac{n}{\text{minrun}}$
- 归并次数适中
当 $n$ 接近 $2^k$ 时,归并树平衡。 ∎
20.2.3 归并不变式
定义 20.6(TimSort不变式) 设运行序列栈为 $S = [R_1, R_2, \ldots, R_k]$,需满足:
- $|R_k| > |R_{k-1}| + |R_{k-2}|$
- $|R_{k-1}| > |R_{k-2}|$
定理 20.4(不变式维护) 维护上述不变式确保归并次数为 $O(n \log n)$。
证明:设栈中运行序列长度为 $l_1, l_2, \ldots, l_k$。由不变式:
$$l_i > l_{i+1} + l_{i+2}$$
这类似于斐波那契数列的逆序。栈深度为 $O(\log n)$,每次归并代价 $O(n)$,总代价 $O(n \log n)$。 ∎
20.2.4 Galloping模式
定义 20.7(Galloping模式) 当一个运行序列连续多次"获胜"(其元素先被取完)时,切换到Galloping模式,使用指数搜索加速。
定理 20.5(Galloping效率) 对于运行序列 $X$ 和 $Y$,若 $X$ 中有 $k$ 个连续元素小于 $Y$ 的最小元素,Galloping模式可在 $O(\log k)$ 次比较后找到位置。
证明:指数搜索先检查 $X[1], X[2], X[4], X[8], \ldots$ 直到找到大于 $Y[0]$ 的元素,然后二分查找。总比较次数:
$$2\lceil \log_2 k \rceil + 1 = O(\log k)$$
∎
20.2.5 Python实现
from typing import List, TypeVar, Callable, Optional
import math
T = TypeVar('T')
class TimSort:
MIN_MERGE = 32
def __init__(self, arr: List[T], key: Optional[Callable[[T], any]] = None, reverse: bool = False):
self.arr = arr
self.key = key
self.reverse = reverse
self.min_merge = self._calc_min_run(len(arr))
def _calc_min_run(self, n: int) -> int:
r = 0
while n >= self.MIN_MERGE:
r |= n & 1
n >>= 1
return n + r
def _get_key(self, x: T) -> any:
return self.key(x) if self.key else x
def _binary_insertion_sort(self, left: int, right: int) -> None:
for i in range(left + 1, right + 1):
current = self.arr[i]
key_val = self._get_key(current)
lo, hi = left, i - 1
while lo <= hi:
mid = (lo + hi) // 2
mid_key = self._get_key(self.arr[mid])
if (mid_key <= key_val) != self.reverse:
lo = mid + 1
else:
hi = mid - 1
for j in range(i, lo, -1):
self.arr[j] = self.arr[j - 1]
self.arr[lo] = current
def _count_run(self, start: int) -> int:
if start >= len(self.arr) - 1:
return 1
run_len = 2
ascending = self._get_key(self.arr[start]) <= self._get_key(self.arr[start + 1])
if ascending:
while start + run_len < len(self.arr):
if self._get_key(self.arr[start + run_len - 1]) > self._get_key(self.arr[start + run_len]):
break
run_len += 1
else:
while start + run_len < len(self.arr):
if self._get_key(self.arr[start + run_len - 1]) <= self._get_key(self.arr[start + run_len]):
break
run_len += 1
self._reverse(start, start + run_len - 1)
return run_len
def _reverse(self, left: int, right: int) -> None:
while left < right:
self.arr[left], self.arr[right] = self.arr[right], self.arr[left]
left += 1
right -= 1
def _merge(self, left: int, mid: int, right: int) -> None:
left_arr = self.arr[left:mid]
right_arr = self.arr[mid:right + 1]
i = j = 0
k = left
while i < len(left_arr) and j < len(right_arr):
left_key = self._get_key(left_arr[i])
right_key = self._get_key(right_arr[j])
if (left_key <= right_key) != self.reverse:
self.arr[k] = left_arr[i]
i += 1
else:
self.arr[k] = right_arr[j]
j += 1
k += 1
while i < len(left_arr):
self.arr[k] = left_arr[i]
i += 1
k += 1
while j < len(right_arr):
self.arr[k] = right_arr[j]
j += 1
k += 1
def sort(self) -> List[T]:
n = len(self.arr)
if n < 2:
return self.arr
runs = []
i = 0
while i < n:
run_len = self._count_run(i)
if run_len < self.min_merge:
force_len = min(run_len, n - i)
self._binary_insertion_sort(i, i + force_len - 1)
run_len = force_len
runs.append((i, i + run_len - 1))
i += run_len
while len(runs) > 1:
new_runs = []
for j in range(0, len(runs) - 1, 2):
left, mid = runs[j]
mid2, right = runs[j + 1]
self._merge(left, mid2, right)
new_runs.append((left, right))
if len(runs) % 2 == 1:
new_runs.append(runs[-1])
runs = new_runs
return self.arr
def timsort(arr: List[T], key: Optional[Callable[[T], any]] = None, reverse: bool = False) -> List[T]:
sorter = TimSort(arr.copy(), key, reverse)
return sorter.sort()20.2.6 复杂度分析
定理 20.6(TimSort复杂度)
- 最好情况:$O(n)$(已排序数组)
- 平均情况:$O(n \log n)$
- 最坏情况:$O(n \log n)$
- 空间复杂度:$O(n)$
证明:
- 最好情况:数组已排序,每个运行序列长度为 $n$,无需归并,仅扫描 $O(n)$。
- 平均/最坏情况:运行序列数量为 $O(n/\text{minrun}) = O(n)$,归并代价 $O(n \log n)$。
- 空间:归并需要临时数组,最坏 $O(n)$。 ∎
20.3 内省排序
20.3.1 算法设计
定义 20.8(内省排序) 内省排序(Introsort)是快速排序和堆排序的混合,当递归深度超过阈值时切换到堆排序。
算法 20.2(Introsort)
Introsort(A, depth_limit):
if len(A) <= 16:
InsertionSort(A)
elif depth_limit == 0:
HeapSort(A)
else:
pivot = Partition(A)
Introsort(A[0:pivot], depth_limit - 1)
Introsort(A[pivot+1:], depth_limit - 1)20.3.2 深度限制
定理 20.7(深度限制选择) 最优深度限制为 $2\lfloor \log_2 n \rfloor$。
证明:快速排序平均递归深度为 $O(\log n)$。最坏情况深度为 $n$。选择 $2\log n$:
- 正常情况下不会触发堆排序
- 避免最坏情况的 $O(n^2)$ 复杂度
切换到堆排序后保证 $O(n \log n)$。 ∎
20.3.3 Python实现
import math
from typing import List, TypeVar, Optional
T = TypeVar('T')
def introsort(arr: List[T]) -> List[T]:
result = arr.copy()
n = len(result)
if n <= 1:
return result
max_depth = 2 * int(math.log2(n))
_introsort_helper(result, 0, n - 1, max_depth)
_insertion_sort_range(result, 0, n - 1)
return result
def _introsort_helper(arr: List[T], low: int, high: int, depth: int) -> None:
size = high - low + 1
if size <= 16:
return
if depth == 0:
_heapsort_range(arr, low, high)
return
pivot = _partition(arr, low, high)
if pivot > low + 1:
_introsort_helper(arr, low, pivot - 1, depth - 1)
if pivot < high - 1:
_introsort_helper(arr, pivot + 1, high, depth - 1)
def _partition(arr: List[T], low: int, high: int) -> int:
mid = (low + high) // 2
if arr[mid] < arr[low]:
arr[low], arr[mid] = arr[mid], arr[low]
if arr[high] < arr[low]:
arr[low], arr[high] = arr[high], arr[low]
if arr[mid] < arr[high]:
arr[mid], arr[high] = arr[high], arr[mid]
pivot = arr[high]
i = low - 1
for j in range(low, high):
if arr[j] <= pivot:
i += 1
arr[i], arr[j] = arr[j], arr[i]
arr[i + 1], arr[high] = arr[high], arr[i + 1]
return i + 1
def _heapsort_range(arr: List[T], low: int, high: int) -> None:
def heapify(root: int, size: int) -> None:
largest = root
left = 2 * (root - low) + 1 + low
right = 2 * (root - low) + 2 + low
if left <= high and left < low + size and arr[left] > arr[largest]:
largest = left
if right <= high and right < low + size and arr[right] > arr[largest]:
largest = right
if largest != root:
arr[root], arr[largest] = arr[largest], arr[root]
heapify(largest, size)
size = high - low + 1
for i in range(low + size // 2 - 1, low - 1, -1):
heapify(i, size)
for i in range(high, low, -1):
arr[low], arr[i] = arr[i], arr[low]
heapify(low, i - low)
def _insertion_sort_range(arr: List[T], low: int, high: int) -> None:
for i in range(low + 1, high + 1):
key = arr[i]
j = i - 1
while j >= low and arr[j] > key:
arr[j + 1] = arr[j]
j -= 1
arr[j + 1] = key20.3.4 复杂度分析
定理 20.8(Introsort复杂度) Introsort的时间复杂度为 $O(n \log n)$,空间复杂度为 $O(\log n)$。
证明:
- 若递归深度未超过阈值,行为同快速排序,期望 $O(n \log n)$
- 若超过阈值,切换到堆排序,保证 $O(n \log n)$
- 空间:递归栈深度 $O(\log n)$,堆排序原地
∎
20.4 外部排序
20.4.1 问题定义
定义 20.9(外部排序) 外部排序处理无法全部装入内存的大规模数据,使用磁盘作为辅助存储。
定义 20.10(I/O复杂度) 外部排序的代价主要由磁盘I/O次数决定。
定理 20.9(I/O下界) 排序 $N$ 个元素,内存大小为 $M$,块大小为 $B$,I/O下界为:
$$\Omega\left(\frac{N}{B} \log_{M/B} \frac{N}{B}\right)$$
20.4.2 外部归并排序
算法 20.3(外部归并排序)
ExternalMergeSort(file, M):
// 第一阶段:生成初始运行序列
runs = []
while not EOF(file):
block = ReadNextBlock(file, M)
SortInMemory(block)
run_file = WriteToDisk(block)
runs.append(run_file)
// 第二阶段:多路归并
while len(runs) > 1:
new_runs = []
for each group of M-1 runs:
merged = KWayMerge(group)
new_runs.append(merged)
runs = new_runs
return runs[0]20.4.3 替换选择
定义 20.11(替换选择) 替换选择算法利用堆生成平均长度为 $2M$ 的初始运行序列。
定理 20.10(替换选择运行长度) 对于随机输入,替换选择生成的运行序列期望长度为 $2M$。
证明:设堆中有 $M$ 个元素。每次输出最小元素后,若新元素大于等于输出元素,加入堆;否则等待下一轮。期望一半元素满足条件,故运行长度期望为 $2M$。 ∎
20.4.4 Python实现
from typing import List, TypeVar, Iterator, Generator
import heapq
import tempfile
import os
T = TypeVar('T')
class ExternalSort:
def __init__(self, memory_size: int = 1000):
self.memory_size = memory_size
def sort(self, data: Iterator[T]) -> Iterator[T]:
runs = list(self._create_runs(data))
while len(runs) > 1:
runs = list(self._merge_pass(runs))
if runs:
yield from runs[0]
def _create_runs(self, data: Iterator[T]) -> Generator[Iterator[T], None, None]:
buffer = []
for item in data:
buffer.append(item)
if len(buffer) >= self.memory_size:
buffer.sort()
yield iter(buffer)
buffer = []
if buffer:
buffer.sort()
yield iter(buffer)
def _merge_pass(self, runs: List[Iterator[T]]) -> Generator[Iterator[T], None, None]:
for i in range(0, len(runs), self.memory_size - 1):
group = runs[i:i + self.memory_size - 1]
yield self._k_way_merge(group)
def _k_way_merge(self, runs: List[Iterator[T]]) -> Iterator[T]:
heap = []
for i, run in enumerate(runs):
try:
item = next(run)
heap.append((item, i, run))
except StopIteration:
pass
heapq.heapify(heap)
while heap:
item, i, run = heapq.heappop(heap)
yield item
try:
next_item = next(run)
heapq.heappush(heap, (next_item, i, run))
except StopIteration:
pass
def external_sort(data: List[T], memory_size: int = 1000) -> List[T]:
sorter = ExternalSort(memory_size)
return list(sorter.sort(iter(data)))20.5 并行排序
20.5.1 并行排序模型
定义 20.12(PRAM模型) 并行随机存取机(PRAM)模型假设 $p$ 个处理器可同时访问共享内存。
定义 20.13(并行复杂度) 并行算法的复杂度由以下度量:
- 时间复杂度 $T(n, p)$:使用 $p$ 个处理器的时间
- 工作量 $W(n) = p \cdot T(n, p)$:总操作数
- 加速比 $S = T_{\text{sequential}} / T_{\text{parallel}}$
20.5.2 并行归并排序
定理 20.11(并行归并排序复杂度) 使用 $p = n$ 个处理器,并行归并排序时间为 $O(\log n)$,工作量为 $O(n \log n)$。
证明:
- 并行划分:$O(1)$ 时间,$O(n)$ 工作
- 递归排序:$O(\log n)$ 深度
- 并行归并:$O(\log n)$ 时间
总时间 $O(\log n)$,工作量 $O(n \log n)$,最优。 ∎
20.5.3 Python实现(多线程)
from typing import List, TypeVar, Callable, Optional
from concurrent.futures import ThreadPoolExecutor
import math
T = TypeVar('T')
def parallel_merge_sort(arr: List[T], num_threads: int = 4) -> List[T]:
if len(arr) <= 1:
return arr.copy()
if len(arr) < 1000 or num_threads == 1:
return _sequential_merge_sort(arr)
mid = len(arr) // 2
with ThreadPoolExecutor(max_workers=2) as executor:
left_future = executor.submit(parallel_merge_sort, arr[:mid], num_threads // 2)
right_future = executor.submit(parallel_merge_sort, arr[mid:], num_threads // 2)
left = left_future.result()
right = right_future.result()
return _merge(left, right)
def _sequential_merge_sort(arr: List[T]) -> List[T]:
if len(arr) <= 1:
return arr.copy()
mid = len(arr) // 2
left = _sequential_merge_sort(arr[:mid])
right = _sequential_merge_sort(arr[mid:])
return _merge(left, right)
def _merge(left: List[T], right: List[T]) -> List[T]:
result = []
i = j = 0
while i < len(left) and j < len(right):
if left[i] <= right[j]:
result.append(left[i])
i += 1
else:
result.append(right[j])
j += 1
result.extend(left[i:])
result.extend(right[j:])
return result
def parallel_quick_sort(arr: List[T], num_threads: int = 4) -> List[T]:
if len(arr) <= 1:
return arr.copy()
if len(arr) < 1000 or num_threads == 1:
return sorted(arr)
pivot = arr[len(arr) // 2]
left = [x for x in arr if x < pivot]
middle = [x for x in arr if x == pivot]
right = [x for x in arr if x > pivot]
with ThreadPoolExecutor(max_workers=2) as executor:
left_future = executor.submit(parallel_quick_sort, left, num_threads // 2)
right_future = executor.submit(parallel_quick_sort, right, num_threads // 2)
return left_future.result() + middle + right_future.result()20.6 排序网络
20.6.1 比较网络
定义 20.14(比较器) 比较器是一个二元操作,输入 $(a, b)$ 输出 $(\min(a,b), \max(a,b))$。
定义 20.15(排序网络) 排序网络是由比较器组成的有向无环图,对任意输入都能正确排序。
定义 20.16(深度) 排序网络的深度是从输入到输出的最长路径上的比较器数量。
20.6.2 经典排序网络
定理 20.12(双调排序网络) 双调排序网络对 $n$ 个元素排序,深度为 $O(\log^2 n)$。
定理 20.13(AKS网络) AKS排序网络深度为 $O(\log n)$,但常数因子极大。
20.6.3 Python模拟
from typing import List, Tuple
def compare_swap(arr: List[int], i: int, j: int, ascending: bool = True) -> None:
if (arr[i] > arr[j]) == ascending:
arr[i], arr[j] = arr[j], arr[i]
def bitonic_sort_network(arr: List[int], low: int, cnt: int, ascending: bool = True) -> None:
if cnt <= 1:
return
k = cnt // 2
for i in range(low, low + k):
compare_swap(arr, i, i + k, ascending)
bitonic_sort_network(arr, low, k, ascending)
bitonic_sort_network(arr, low + k, k, ascending)
def bitonic_merge_network(arr: List[int], low: int, cnt: int, ascending: bool = True) -> None:
if cnt <= 1:
return
k = cnt // 2
for i in range(low, low + k):
compare_swap(arr, i, i + k, ascending)
bitonic_merge_network(arr, low, k, ascending)
bitonic_merge_network(arr, low + k, k, ascending)
def bitonic_sort(arr: List[int]) -> List[int]:
result = arr.copy()
n = len(result)
padded_n = 1
while padded_n < n:
padded_n *= 2
result.extend([float('inf')] * (padded_n - n))
for size in [2**i for i in range(1, int(math.log2(padded_n)) + 1)]:
for low in range(0, padded_n, 2 * size):
bitonic_merge_network(result, low, size, True)
bitonic_merge_network(result, low + size, size, False)
return result[:n]20.7 算法比较
20.7.1 高级排序算法对比
| 算法 | 时间复杂度 | 空间 | 稳定性 | 特点 |
|---|---|---|---|---|
| TimSort | $O(n) \sim O(n \log n)$ | $O(n)$ | 稳定 | 自适应,Python默认 |
| Introsort | $O(n \log n)$ | $O(\log n)$ | 不稳定 | C++ STL默认 |
| 外部归并排序 | $O(n \log n)$ I/O | $O(M)$ | 稳定 | 大数据排序 |
| 并行归并排序 | $O(\log n)$ 时间 | $O(n)$ | 稳定 | 多核并行 |
20.7.2 选择指南
| 场景 | 推荐算法 | 原因 |
|---|---|---|
| 通用排序 | TimSort | 自适应,稳定 |
| 内存受限 | Introsort | 原地,高效 |
| 大数据 | 外部排序 | 磁盘友好 |
| 多核环境 | 并行排序 | 利用并行性 |
20.8 本章小结
本章介绍了高级排序算法:
- 混合排序:结合多种策略,适应不同输入
- TimSort:自适应、稳定,Python默认排序
- Introsort:快速排序与堆排序混合,C++默认
- 外部排序:处理大规模数据的磁盘排序
- 并行排序:利用多处理器加速排序
参考文献
- Peters, T. (2002). Timsort. Python source code.
- Musser, D. R. (1997). Introspective sorting and selection algorithms. Software: Practice and Experience, 27(8), 983-993.
- Knuth, D. E. (1998). The Art of Computer Programming, Volume 3: Sorting and Searching. Addison-Wesley.
- Ajtai, M., Komlós, J., & Szemerédi, E. (1983). An $O(n \log n)$ sorting network. STOC, 1-9.
- Cormen, T. H., et al. (2009). Introduction to Algorithms, 3rd ed. MIT Press.
下一章:第21章 线性时间排序