第13章 堆与优先队列理论
本章导读
堆是一种高效的优先队列实现,其核心思想是利用完全二叉树的数组表示实现 $O(\log n)$ 的插入和删除操作。本章从形式化角度深入分析堆的数学基础,包括堆性质的数学定义、堆操作的正确性证明、堆排序的复杂度分析,以及堆在算法设计中的应用。
学习目标
完成本章学习后,读者将能够:
- 掌握堆性质的形式化定义与不变式
- 理解堆操作的正确性证明与复杂度分析
- 掌握buildHeap的线性时间证明
- 理解堆排序的正确性与稳定性分析
- 掌握堆在Top-K、合并有序序列等问题中的应用
13.1 堆的形式化定义
13.1.1 堆性质的数学定义
定义 13.1(二叉堆) 二叉堆是一个数组 $A[1..n]$,可以被看作一棵完全二叉树,满足堆性质。
定义 13.2(最大堆性质) 对于最大堆,每个节点 $i$(除根外)满足:
$$A[parent(i)] \geq A[i]$$
即父节点的值不小于子节点的值。
定义 13.3(最小堆性质) 对于最小堆,每个节点 $i$(除根外)满足:
$$A[parent(i)] \leq A[i]$$
即父节点的值不大于子节点的值。
定义 13.4(堆不变式) 堆不变式定义为:
$$Invariant_{max}(A) = \forall i \in [2, n]: A[parent(i)] \geq A[i]$$
$$Invariant_{min}(A) = \forall i \in [2, n]: A[parent(i)] \leq A[i]$$
定理 13.1(堆的根性质) 在最大堆中,根节点存储最大值;在最小堆中,根节点存储最小值。
证明:由堆性质,根节点 $\geq$(或 $\leq$)所有子节点。递归地,子节点 $\geq$(或 $\leq$)其后代。因此根节点 $\geq$(或 $\leq$)所有节点。∎
13.1.2 堆的数组表示
定义 13.5(堆索引计算) 对于数组下标从 $0$ 开始的堆:
- $parent(i) = \lfloor(i-1)/2\rfloor$
- $left(i) = 2i + 1$
- $right(i) = 2i + 2$
定理 13.2(堆的高度) $n$ 个元素的堆的高度为 $h = \lfloor \log_2 n \rfloor$。
证明:堆是完全二叉树。高度为 $h$ 的完全二叉树节点数 $n$ 满足:
$$2^h \leq n < 2^{h+1}$$
因此 $h \leq \log_2 n < h + 1$,即 $h = \lfloor \log_2 n \rfloor$。∎
定理 13.3(叶子节点索引) 在 $n$ 个元素的堆中,叶子节点的索引范围为 $\lfloor n/2 \rfloor, \ldots, n-1$。
证明:节点 $i$ 是叶子节点当且仅当 $left(i) \geq n$,即 $2i + 1 \geq n$,即 $i \geq \lceil n/2 \rceil - 1$。
由于索引从 $0$ 开始,叶子节点索引从 $\lfloor n/2 \rfloor$ 开始。∎
13.1.3 堆ADT规范
ADT 13.1(MaxHeap)
ADT MaxHeap[T: Ordered]
Types: MaxHeap[T]
Operations:
empty: → MaxHeap[T]
insert: MaxHeap[T] × T → MaxHeap[T]
extractMax: MaxHeap[T] → T × MaxHeap[T]
findMax: MaxHeap[T] → T
size: MaxHeap[T] → ℕ
isEmpty: MaxHeap[T] → Boolean
Axioms:
∀h: MaxHeap[T], ∀x: T
(A1) isEmpty(empty) = true
(A2) isEmpty(insert(h, x)) = false
(A3) findMax(insert(empty, x)) = x
(A4) findMax(insert(h, x)) = max(x, findMax(h))
(A5) size(empty) = 0
(A6) size(insert(h, x)) = size(h) + 1
(A7) extractMax(insert(empty, x)) = (x, empty)
(A8) 堆不变式在所有操作后保持13.2 堆操作与正确性证明
13.2.1 下沉操作(Heapify)
算法 13.1(MAX-HEAPIFY)
MAX-HEAPIFY(A, i)
l = LEFT(i)
r = RIGHT(i)
largest = i
if l ≤ A.heap-size and A[l] > A[i]
largest = l
if r ≤ A.heap-size and A[r] > A[largest]
largest = r
if largest ≠ i
exchange A[i] with A[largest]
MAX-HEAPIFY(A, largest)定理 13.4(Heapify正确性) 若节点 $i$ 的左右子树都是最大堆,则MAX-HEAPIFY(A, i)执行后,以 $i$ 为根的子树是最大堆。
证明:算法比较 $A[i]$ 与其子节点的值,将最大值交换到根位置。
- 若 $A[i]$ 已经是最大,无需交换,子树保持最大堆性质
- 若某个子节点更大,交换后 $A[i]$ 变为最大值,被交换的子节点可能违反堆性质,递归修复
由于每次递归树的高度减少1,且子树原本是最大堆,递归最终终止,整个子树恢复最大堆性质。∎
定理 13.5(Heapify复杂度) MAX-HEAPIFY的时间复杂度为 $O(\log n)$,更精确地为 $O(h)$,其中 $h$ 是节点 $i$ 的高度。
证明:每次递归调用将问题规模从高度 $h$ 减少到 $h-1$。最多递归 $h$ 次,每次常数时间。故总时间 $O(h)$。∎
13.2.2 建堆操作
算法 13.2(BUILD-MAX-HEAP)
BUILD-MAX-HEAP(A)
A.heap-size = A.length
for i = ⌊A.length/2⌋ downto 1
MAX-HEAPIFY(A, i)定理 13.6(BUILD-MAX-HEAP复杂度) BUILD-MAX-HEAP的时间复杂度为 $O(n)$。
证明:分析每个节点调用Heapify的代价。
高度为 $h$ 的节点最多有 $\lceil n/2^{h+1} \rceil$ 个,每个代价 $O(h)$。
总代价:
$$T(n) = \sum_{h=0}^{\lfloor \log n \rfloor} \lceil \frac{n}{2^{h+1}} \rceil \cdot O(h)$$
$$= O\left(n \sum_{h=0}^{\lfloor \log n \rfloor} \frac{h}{2^h}\right)$$
由于 $\sum_{h=0}^{\infty} \frac{h}{2^h} = 2$,故 $T(n) = O(n)$。∎
13.2.3 插入操作
算法 13.3(HEAP-INSERT)
HEAP-INSERT(A, key)
A.heap-size = A.heap-size + 1
A[A.heap-size] = -∞
HEAP-INCREASE-KEY(A, A.heap-size, key)
HEAP-INCREASE-KEY(A, i, key)
if key < A[i]
error "new key is smaller than current key"
A[i] = key
while i > 1 and A[PARENT(i)] < A[i]
exchange A[i] with A[PARENT(i)]
i = PARENT(i)定理 13.7(插入正确性) HEAP-INSERT将新元素插入正确位置,保持最大堆性质。
证明:新元素先放在叶子位置,然后上浮到正确位置。
上浮过程中,每次交换将较大元素上移。由于路径上的父节点原本 $\geq$ 其子树,交换后父节点仍然 $\geq$ 另一子树。
最终,新元素到达满足堆性质的位置。∎
定理 13.8(插入复杂度) HEAP-INSERT的时间复杂度为 $O(\log n)$。
证明:上浮操作最多经过从叶子到根的路径,长度为 $O(\log n)$。∎
13.2.4 堆实现
from typing import TypeVar, Generic, List, Optional, Callable
T = TypeVar('T')
class BinaryHeap(Generic[T]):
"""
二叉堆实现
时间复杂度:
- insert: O(log n)
- extract: O(log n)
- peek: O(1)
- build_heap: O(n)
空间复杂度:O(n)
"""
def __init__(self, comparator: Callable[[T, T], bool] = lambda a, b: a < b):
"""
Args:
comparator: 比较函数,a < b 返回True表示最小堆
"""
self._heap: List[T] = []
self._compare = comparator
def _parent(self, i: int) -> int:
return (i - 1) // 2
def _left(self, i: int) -> int:
return 2 * i + 1
def _right(self, i: int) -> int:
return 2 * i + 2
def _sift_up(self, i: int) -> None:
"""上浮操作"""
while i > 0:
parent = self._parent(i)
if self._compare(self._heap[parent], self._heap[i]):
break
self._heap[i], self._heap[parent] = self._heap[parent], self._heap[i]
i = parent
def _sift_down(self, i: int) -> None:
"""下沉操作"""
n = len(self._heap)
while True:
extreme = i
left = self._left(i)
right = self._right(i)
if left < n and self._compare(self._heap[left], self._heap[extreme]):
extreme = left
if right < n and self._compare(self._heap[right], self._heap[extreme]):
extreme = right
if extreme == i:
break
self._heap[i], self._heap[extreme] = self._heap[extreme], self._heap[i]
i = extreme
def push(self, value: T) -> None:
"""插入元素 - O(log n)"""
self._heap.append(value)
self._sift_up(len(self._heap) - 1)
def pop(self) -> T:
"""弹出堆顶元素 - O(log n)"""
if not self._heap:
raise IndexError("pop from empty heap")
result = self._heap[0]
last = self._heap.pop()
if self._heap:
self._heap[0] = last
self._sift_down(0)
return result
def peek(self) -> Optional[T]:
"""查看堆顶元素 - O(1)"""
return self._heap[0] if self._heap else None
def replace(self, value: T) -> Optional[T]:
"""替换堆顶元素并返回原堆顶 - O(log n)"""
if not self._heap:
self.push(value)
return None
result = self._heap[0]
self._heap[0] = value
self._sift_down(0)
return result
def heapify(self, items: List[T]) -> None:
"""从列表建堆 - O(n)"""
self._heap = items[:]
n = len(self._heap)
for i in range(n // 2 - 1, -1, -1):
self._sift_down(i)
def is_valid_heap(self) -> bool:
"""验证堆性质"""
n = len(self._heap)
for i in range(n):
left = self._left(i)
right = self._right(i)
if left < n and not self._compare(self._heap[i], self._heap[left]):
return False
if right < n and not self._compare(self._heap[i], self._heap[right]):
return False
return True
def __len__(self) -> int:
return len(self._heap)
def __bool__(self) -> bool:
return bool(self._heap)
def __repr__(self) -> str:
return f"BinaryHeap({self._heap})"
class MinHeap(BinaryHeap[T]):
"""最小堆"""
def __init__(self):
super().__init__(comparator=lambda a, b: a < b)
class MaxHeap(BinaryHeap[T]):
"""最大堆"""
def __init__(self):
super().__init__(comparator=lambda a, b: a > b)13.3 堆排序理论
13.3.1 堆排序算法
算法 13.4(HEAPSORT)
HEAPSORT(A)
BUILD-MAX-HEAP(A)
for i = A.length downto 2
exchange A[1] with A[i]
A.heap-size = A.heap-size - 1
MAX-HEAPIFY(A, 1)定理 13.9(堆排序正确性) HEAPSORT输出有序序列。
证明:初始时,BUILD-MAX-HEAP使整个数组满足最大堆性质。
每次迭代:
- 将最大元素(根)与最后一个元素交换
- 缩小堆大小
- 恢复堆性质
循环不变式:每次迭代开始时,$A[i+1..n]$ 包含已排序的最大 $n-i$ 个元素,$A[1..i]$ 是最大堆。
由循环不变式,结束时 $A[1..n]$ 有序。∎
定理 13.10(堆排序复杂度) HEAPSORT的时间复杂度为 $O(n \log n)$。
证明:BUILD-MAX-HEAP为 $O(n)$。$n-1$ 次迭代,每次MAX-HEAPIFY为 $O(\log n)$。
总时间:$O(n) + (n-1) \cdot O(\log n) = O(n \log n)$。∎
13.3.2 堆排序实现
def heap_sort(arr: List[T]) -> List[T]:
"""
堆排序
时间复杂度:O(n log n)
空间复杂度:O(1) 原地
特点:
- 不稳定排序
- 原地排序
- 最坏情况也是 O(n log n)
"""
def sift_down(heap: List[T], i: int, n: int) -> None:
largest = i
left = 2 * i + 1
right = 2 * i + 2
if left < n and heap[left] > heap[largest]:
largest = left
if right < n and heap[right] > heap[largest]:
largest = right
if largest != i:
heap[i], heap[largest] = heap[largest], heap[i]
sift_down(heap, largest, n)
n = len(arr)
for i in range(n // 2 - 1, -1, -1):
sift_down(arr, i, n)
for i in range(n - 1, 0, -1):
arr[0], arr[i] = arr[i], arr[0]
sift_down(arr, 0, i)
return arr13.3.3 堆排序与快速排序比较
定理 13.11(堆排序vs快速排序)
| 特性 | 堆排序 | 快速排序 |
|---|---|---|
| 最坏时间 | $O(n \log n)$ | $O(n^2)$ |
| 平均时间 | $O(n \log n)$ | $O(n \log n)$ |
| 空间 | $O(1)$ | $O(\log n)$ |
| 稳定性 | 不稳定 | 不稳定 |
| 缓存友好 | 差 | 好 |
证明:堆排序的访问模式是跳跃的(父子节点索引相差较大),导致缓存命中率低。快速排序的分区操作顺序访问数组,缓存友好。∎
13.4 优先队列应用
13.4.1 Top-K问题
问题 13.1 找出数组中最大的 $k$ 个元素。
定理 13.12(Top-K最优算法) 使用大小为 $k$ 的最小堆可以在 $O(n \log k)$ 时间内解决Top-K问题。
证明:维护一个大小为 $k$ 的最小堆。
- 遍历数组,若元素大于堆顶,替换堆顶
- 每次操作 $O(\log k)$,共 $n$ 次
总时间 $O(n \log k)$,当 $k \ll n$ 时优于排序的 $O(n \log n)$。∎
import heapq
from typing import List, Tuple
def top_k_largest(nums: List[int], k: int) -> List[int]:
"""
找出最大的k个元素
时间复杂度:O(n log k)
空间复杂度:O(k)
"""
if k <= 0:
return []
if k >= len(nums):
return sorted(nums, reverse=True)
min_heap = nums[:k]
heapq.heapify(min_heap)
for num in nums[k:]:
if num > min_heap[0]:
heapq.heapreplace(min_heap, num)
return sorted(min_heap, reverse=True)
def top_k_smallest(nums: List[int], k: int) -> List[int]:
"""找出最小的k个元素"""
if k <= 0:
return []
if k >= len(nums):
return sorted(nums)
max_heap = [-x for x in nums[:k]]
heapq.heapify(max_heap)
for num in nums[k:]:
if num < -max_heap[0]:
heapq.heapreplace(max_heap, -num)
return sorted([-x for x in max_heap])
def top_k_frequent(nums: List[int], k: int) -> List[int]:
"""
前K个高频元素
时间复杂度:O(n log k)
"""
from collections import Counter
counter = Counter(nums)
return [item for item, _ in counter.most_common(k)]13.4.2 合并有序序列
问题 13.2 合并 $k$ 个有序链表/数组。
定理 13.13(合并k路有序序列) 使用大小为 $k$ 的最小堆可以在 $O(n \log k)$ 时间内合并 $k$ 个有序序列,其中 $n$ 为总元素数。
def merge_k_sorted_lists(lists: List[List[int]]) -> List[int]:
"""
合并k个有序列表
时间复杂度:O(n log k)
空间复杂度:O(k)
"""
import heapq
min_heap = []
result = []
for i, lst in enumerate(lists):
if lst:
heapq.heappush(min_heap, (lst[0], i, 0))
while min_heap:
val, list_idx, elem_idx = heapq.heappop(min_heap)
result.append(val)
if elem_idx + 1 < len(lists[list_idx]):
next_val = lists[list_idx][elem_idx + 1]
heapq.heappush(min_heap, (next_val, list_idx, elem_idx + 1))
return result13.4.3 中位数维护
问题 13.3 动态维护数据流的中位数。
定理 13.14(双堆中位数算法) 使用一个最大堆和一个最小堆可以在 $O(\log n)$ 时间内维护中位数。
class MedianFinder:
"""
中位数查找器
使用两个堆:
- max_heap: 存储较小的一半
- min_heap: 存储较大的一半
时间复杂度:
- addNum: O(log n)
- findMedian: O(1)
"""
def __init__(self):
self._max_heap = []
self._min_heap = []
def addNum(self, num: int) -> None:
import heapq
heapq.heappush(self._max_heap, -num)
heapq.heappush(self._min_heap, -heapq.heappop(self._max_heap))
if len(self._min_heap) > len(self._max_heap):
heapq.heappush(self._max_heap, -heapq.heappop(self._min_heap))
def findMedian(self) -> float:
if len(self._max_heap) > len(self._min_heap):
return -self._max_heap[0]
return (-self._max_heap[0] + self._min_heap[0]) / 213.5 高级堆结构
13.5.1 二项堆
定义 13.6(二项树) 二项树 $B_k$ 递归定义为:
- $B_0$ 是单节点
- $B_k$ 由两棵 $B_{k-1}$ 组成,其中一棵的根是另一棵的根的左子节点
定义 13.7(二项堆) 二项堆是二项树的集合,满足:
- 每棵二项树满足最小堆性质
- 任意两棵二项树度数不同
定理 13.15(二项堆操作复杂度)
| 操作 | 时间复杂度 |
|---|---|
| insert | $O(\log n)$ 最坏,$O(1)$ 均摊 |
| findMin | $O(\log n)$ |
| extractMin | $O(\log n)$ |
| union | $O(\log n)$ |
13.5.2 斐波那契堆
定义 13.8(斐波那契堆) 斐波那契堆是堆有序树的集合,支持懒惰合并。
定理 13.16(斐波那契堆均摊复杂度)
| 操作 | 均摊时间 |
|---|---|
| insert | $O(1)$ |
| findMin | $O(1)$ |
| decreaseKey | $O(1)$ |
| extractMin | $O(\log n)$ |
证明:使用势能方法。定义势能函数 $\Phi(H) = t(H) + 2m(H)$,其中 $t(H)$ 是树的数量,$m(H)$ 是标记节点的数量。
分析各操作的势能变化,可得上述均摊复杂度。∎
13.5.3 左偏堆
class LeftistNode(Generic[T]):
"""左偏堆节点"""
__slots__ = ['value', 'left', 'right', 'npl']
def __init__(self, value: T):
self.value = value
self.left: Optional['LeftistNode[T]'] = None
self.right: Optional['LeftistNode[T]'] = None
self.npl: int = 0
class LeftistHeap(Generic[T]):
"""
左偏堆
特点:
- 支持高效合并 O(log n)
- 左偏性质:左子树的npl >= 右子树的npl
- npl (null path length): 到最近空节点的距离
时间复杂度:
- merge: O(log n)
- insert: O(log n)
- extractMin: O(log n)
"""
def __init__(self):
self._root: Optional[LeftistNode[T]] = None
self._size: int = 0
@staticmethod
def _npl(node: Optional[LeftistNode[T]]) -> int:
return node.npl if node else -1
def _merge(self, h1: Optional[LeftistNode[T]], h2: Optional[LeftistNode[T]]) -> Optional[LeftistNode[T]]:
if not h1:
return h2
if not h2:
return h1
if h1.value > h2.value:
h1, h2 = h2, h1
h1.right = self._merge(h1.right, h2)
if self._npl(h1.left) < self._npl(h1.right):
h1.left, h1.right = h1.right, h1.left
h1.npl = self._npl(h1.right) + 1
return h1
def push(self, value: T) -> None:
new_node = LeftistNode(value)
self._root = self._merge(self._root, new_node)
self._size += 1
def pop(self) -> T:
if not self._root:
raise IndexError("pop from empty heap")
result = self._root.value
self._root = self._merge(self._root.left, self._root.right)
self._size -= 1
return result
def peek(self) -> Optional[T]:
return self._root.value if self._root else None
def merge(self, other: 'LeftistHeap[T]') -> None:
"""合并另一个堆"""
self._root = self._merge(self._root, other._root)
self._size += other._size
other._root = None
other._size = 0
def __len__(self) -> int:
return self._size13.6 本章小结
13.6.1 核心定理总结
| 定理 | 内容 | 应用 |
|---|---|---|
| 定理 13.1 | 堆的根性质 | 优先队列 |
| 定理 13.6 | buildHeap线性时间 | 堆构造 |
| 定理 13.9 | 堆排序正确性 | 排序算法 |
| 定理 13.12 | Top-K最优算法 | 选择问题 |
| 定理 13.16 | 斐波那契堆均摊复杂度 | 高级应用 |
13.6.2 堆类型对比
| 堆类型 | insert | extractMin | merge | decreaseKey |
|---|---|---|---|---|
| 二叉堆 | $O(\log n)$ | $O(\log n)$ | $O(n)$ | $O(\log n)$ |
| 二项堆 | $O(\log n)$/$O(1)$* | $O(\log n)$ | $O(\log n)$ | $O(\log n)$ |
| 斐波那契堆 | $O(1)$* | $O(\log n)$* | $O(1)$* | $O(1)$* |
| 左偏堆 | $O(\log n)$ | $O(\log n)$ | $O(\log n)$ | - |
*均摊复杂度
13.6.3 应用场景
| 场景 | 推荐堆类型 | 原因 |
|---|---|---|
| 简单优先队列 | 二叉堆 | 实现简单,常数因子小 |
| 频繁合并 | 二项堆/左偏堆 | 高效合并 |
| Dijkstra算法 | 斐波那契堆 | decreaseKey高效 |
| Top-K问题 | 大小为K的堆 | 空间高效 |
13.7 习题
理论题
证明:BUILD-MAX-HEAP的时间复杂度为 $O(n)$。
证明:堆排序是不稳定排序。
分析二叉堆insert操作的最好、最坏、平均情况。
证明:使用大小为 $k$ 的堆解决Top-K问题的时间复杂度为 $O(n \log k)$。
设计题
设计一个支持 $O(1)$ 时间获取中位数的堆结构。
实现一个支持持久化的堆(修改不破坏原堆)。
实现题
实现一个支持删除任意元素的优先队列。
实现一个双端优先队列(同时支持获取最大和最小)。
13.8 参考文献
Cormen, T. H., Leiserson, C. E., Rivest, R. L., & Stein, C. (2022). Introduction to Algorithms (4th ed.). MIT Press. Chapter 6: Heapsort.
Floyd, R. W. (1964). Algorithm 245: Treesort 3. Communications of the ACM, 7(12), 701.
Williams, J. W. J. (1964). Algorithm 232: Heapsort. Communications of the ACM, 7(6), 347-348.
Fredman, M. L., & Tarjan, R. E. (1987). Fibonacci heaps and their uses in improved network optimization algorithms. Journal of the ACM, 34(3), 596-615.
Vuillemin, J. (1978). A data structure for manipulating priority queues. Communications of the ACM, 21(4), 309-315.
Crane, C. A. (1972). Linear lists and priority queues as balanced binary trees. PhD thesis, Stanford University.
下一章:第14章 平衡树理论